Spaces:
Running
Running
""" | |
Spreadsheet Formula Evaluator for GAIA Agent - Phase 4 | |
Excel formula parsing, evaluation, and calculation engine | |
Features: | |
- Excel formula parsing and evaluation | |
- Built-in function support (SUM, AVERAGE, COUNT, etc.) | |
- Cell reference resolution | |
- Conditional logic evaluation | |
- Mathematical operations on ranges | |
- Error handling for invalid formulas | |
""" | |
import logging | |
import re | |
import pandas as pd | |
import numpy as np | |
from typing import Dict, Any, List, Optional, Union, Tuple | |
from decimal import Decimal, ROUND_HALF_UP | |
import math | |
logger = logging.getLogger(__name__) | |
class FormulaEvaluator: | |
"""Excel formula evaluator for GAIA data analysis tasks.""" | |
def __init__(self): | |
"""Initialize the formula evaluator.""" | |
self.available = True | |
self.functions = self._init_builtin_functions() | |
self.cell_cache = {} | |
def _init_builtin_functions(self) -> Dict[str, callable]: | |
"""Initialize built-in Excel functions.""" | |
return { | |
'SUM': self._sum, | |
'AVERAGE': self._average, | |
'AVG': self._average, # Alias | |
'COUNT': self._count, | |
'COUNTA': self._counta, | |
'MIN': self._min, | |
'MAX': self._max, | |
'MEDIAN': self._median, | |
'STDEV': self._stdev, | |
'VAR': self._var, | |
'IF': self._if, | |
'AND': self._and, | |
'OR': self._or, | |
'NOT': self._not, | |
'ROUND': self._round, | |
'ABS': self._abs, | |
'SQRT': self._sqrt, | |
'POWER': self._power, | |
'MOD': self._mod, | |
'CONCATENATE': self._concatenate, | |
'LEFT': self._left, | |
'RIGHT': self._right, | |
'MID': self._mid, | |
'LEN': self._len, | |
'UPPER': self._upper, | |
'LOWER': self._lower, | |
'TRIM': self._trim, | |
'SUMIF': self._sumif, | |
'COUNTIF': self._countif, | |
'AVERAGEIF': self._averageif, | |
} | |
def evaluate_formula(self, formula: str, data: pd.DataFrame = None, | |
cell_references: Dict[str, Any] = None) -> Union[float, str, bool, None]: | |
""" | |
Evaluate an Excel formula. | |
Args: | |
formula: Excel formula string (with or without leading =) | |
data: DataFrame containing the data | |
cell_references: Dictionary of cell references and their values | |
Returns: | |
Evaluated result of the formula | |
""" | |
try: | |
# Clean the formula | |
formula = formula.strip() | |
if formula.startswith('='): | |
formula = formula[1:] | |
if not formula: | |
return None | |
# Store data and cell references for function access | |
self.current_data = data | |
self.current_cell_refs = cell_references or {} | |
# Parse and evaluate the formula | |
result = self._parse_and_evaluate(formula) | |
return result | |
except Exception as e: | |
logger.error(f"❌ Formula evaluation failed for '{formula}': {e}") | |
return f"#ERROR: {str(e)}" | |
def evaluate_cell_range(self, range_expr: str, data: pd.DataFrame) -> List[Any]: | |
""" | |
Evaluate a cell range expression (e.g., A1:A10, B2:D5). | |
Args: | |
range_expr: Range expression string | |
data: DataFrame containing the data | |
Returns: | |
List of values in the range | |
""" | |
try: | |
# Parse range expression | |
if ':' in range_expr: | |
start_cell, end_cell = range_expr.split(':') | |
start_row, start_col = self._parse_cell_reference(start_cell) | |
end_row, end_col = self._parse_cell_reference(end_cell) | |
values = [] | |
for row in range(start_row, end_row + 1): | |
for col in range(start_col, end_col + 1): | |
if row < len(data) and col < len(data.columns): | |
value = data.iloc[row, col] | |
if pd.notna(value): | |
values.append(value) | |
return values | |
else: | |
# Single cell reference | |
row, col = self._parse_cell_reference(range_expr) | |
if row < len(data) and col < len(data.columns): | |
value = data.iloc[row, col] | |
return [value] if pd.notna(value) else [] | |
return [] | |
except Exception as e: | |
logger.error(f"❌ Range evaluation failed for '{range_expr}': {e}") | |
return [] | |
def _parse_and_evaluate(self, formula: str) -> Any: | |
"""Parse and evaluate a formula expression.""" | |
# Handle parentheses first | |
while '(' in formula: | |
# Find innermost parentheses | |
start = -1 | |
for i, char in enumerate(formula): | |
if char == '(': | |
start = i | |
elif char == ')' and start != -1: | |
# Evaluate expression inside parentheses | |
inner_expr = formula[start + 1:i] | |
inner_result = self._evaluate_expression(inner_expr) | |
# Replace with result | |
formula = formula[:start] + str(inner_result) + formula[i + 1:] | |
break | |
return self._evaluate_expression(formula) | |
def _evaluate_expression(self, expr: str) -> Any: | |
"""Evaluate a simple expression without parentheses.""" | |
expr = expr.strip() | |
# Check if it's a function call | |
func_match = re.match(r'([A-Z]+)\((.*)\)', expr, re.IGNORECASE) | |
if func_match: | |
func_name = func_match.group(1).upper() | |
args_str = func_match.group(2) | |
return self._evaluate_function(func_name, args_str) | |
# Check if it's a cell reference | |
if re.match(r'^[A-Z]+\d+$', expr, re.IGNORECASE): | |
return self._get_cell_value(expr) | |
# Check if it's a range reference | |
if ':' in expr and re.match(r'^[A-Z]+\d+:[A-Z]+\d+$', expr, re.IGNORECASE): | |
return self.evaluate_cell_range(expr, self.current_data) | |
# Check for arithmetic operations | |
for op in ['+', '-', '*', '/', '^', '=', '<>', '>', '<', '>=', '<=']: | |
if op in expr: | |
return self._evaluate_arithmetic(expr, op) | |
# Try to convert to number | |
try: | |
if '.' in expr: | |
return float(expr) | |
else: | |
return int(expr) | |
except ValueError: | |
pass | |
# Return as string if nothing else works | |
return expr.strip('"\'') | |
def _evaluate_function(self, func_name: str, args_str: str) -> Any: | |
"""Evaluate a function call.""" | |
if func_name not in self.functions: | |
raise ValueError(f"Unknown function: {func_name}") | |
# Parse arguments | |
args = self._parse_function_args(args_str) | |
# Evaluate each argument | |
evaluated_args = [] | |
for arg in args: | |
if isinstance(arg, str): | |
evaluated_args.append(self._evaluate_expression(arg)) | |
else: | |
evaluated_args.append(arg) | |
# Call the function | |
return self.functions[func_name](*evaluated_args) | |
def _parse_function_args(self, args_str: str) -> List[str]: | |
"""Parse function arguments, handling nested functions and ranges.""" | |
if not args_str.strip(): | |
return [] | |
args = [] | |
current_arg = "" | |
paren_depth = 0 | |
in_quotes = False | |
quote_char = None | |
for char in args_str: | |
if char in ['"', "'"] and not in_quotes: | |
in_quotes = True | |
quote_char = char | |
current_arg += char | |
elif char == quote_char and in_quotes: | |
in_quotes = False | |
quote_char = None | |
current_arg += char | |
elif char == '(' and not in_quotes: | |
paren_depth += 1 | |
current_arg += char | |
elif char == ')' and not in_quotes: | |
paren_depth -= 1 | |
current_arg += char | |
elif char == ',' and paren_depth == 0 and not in_quotes: | |
args.append(current_arg.strip()) | |
current_arg = "" | |
else: | |
current_arg += char | |
if current_arg.strip(): | |
args.append(current_arg.strip()) | |
return args | |
def _evaluate_arithmetic(self, expr: str, operator: str) -> Any: | |
"""Evaluate arithmetic expressions.""" | |
parts = expr.split(operator, 1) | |
if len(parts) != 2: | |
raise ValueError(f"Invalid arithmetic expression: {expr}") | |
left = self._evaluate_expression(parts[0].strip()) | |
right = self._evaluate_expression(parts[1].strip()) | |
# Convert to numbers if possible | |
try: | |
left_num = float(left) if not isinstance(left, (int, float)) else left | |
right_num = float(right) if not isinstance(right, (int, float)) else right | |
except (ValueError, TypeError): | |
left_num, right_num = left, right | |
# Perform operation | |
if operator == '+': | |
return left_num + right_num | |
elif operator == '-': | |
return left_num - right_num | |
elif operator == '*': | |
return left_num * right_num | |
elif operator == '/': | |
if right_num == 0: | |
return "#DIV/0!" | |
return left_num / right_num | |
elif operator == '^': | |
return left_num ** right_num | |
elif operator == '=': | |
return left == right | |
elif operator == '<>': | |
return left != right | |
elif operator == '>': | |
return left_num > right_num | |
elif operator == '<': | |
return left_num < right_num | |
elif operator == '>=': | |
return left_num >= right_num | |
elif operator == '<=': | |
return left_num <= right_num | |
else: | |
raise ValueError(f"Unknown operator: {operator}") | |
def _get_cell_value(self, cell_ref: str) -> Any: | |
"""Get value from cell reference.""" | |
if cell_ref in self.current_cell_refs: | |
return self.current_cell_refs[cell_ref] | |
if self.current_data is not None: | |
try: | |
row, col = self._parse_cell_reference(cell_ref) | |
if row < len(self.current_data) and col < len(self.current_data.columns): | |
return self.current_data.iloc[row, col] | |
except Exception: | |
pass | |
return 0 # Default value for missing cells | |
def _parse_cell_reference(self, cell_ref: str) -> Tuple[int, int]: | |
"""Parse cell reference (e.g., A1, B10) to row and column indices.""" | |
match = re.match(r'^([A-Z]+)(\d+)$', cell_ref.upper()) | |
if not match: | |
raise ValueError(f"Invalid cell reference: {cell_ref}") | |
col_letters = match.group(1) | |
row_num = int(match.group(2)) | |
# Convert column letters to index (A=0, B=1, ..., Z=25, AA=26, etc.) | |
col_index = 0 | |
for char in col_letters: | |
col_index = col_index * 26 + (ord(char) - ord('A') + 1) | |
col_index -= 1 # Convert to 0-based index | |
row_index = row_num - 1 # Convert to 0-based index | |
return row_index, col_index | |
# Built-in function implementations | |
def _sum(self, *args) -> float: | |
"""SUM function implementation.""" | |
total = 0 | |
for arg in args: | |
if isinstance(arg, list): | |
total += sum(self._to_number(x) for x in arg if self._is_number(x)) | |
elif self._is_number(arg): | |
total += self._to_number(arg) | |
return total | |
def _average(self, *args) -> float: | |
"""AVERAGE function implementation.""" | |
values = [] | |
for arg in args: | |
if isinstance(arg, list): | |
values.extend([self._to_number(x) for x in arg if self._is_number(x)]) | |
elif self._is_number(arg): | |
values.append(self._to_number(arg)) | |
return sum(values) / len(values) if values else 0 | |
def _count(self, *args) -> int: | |
"""COUNT function implementation (counts numeric values).""" | |
count = 0 | |
for arg in args: | |
if isinstance(arg, list): | |
count += sum(1 for x in arg if self._is_number(x)) | |
elif self._is_number(arg): | |
count += 1 | |
return count | |
def _counta(self, *args) -> int: | |
"""COUNTA function implementation (counts non-empty values).""" | |
count = 0 | |
for arg in args: | |
if isinstance(arg, list): | |
count += sum(1 for x in arg if x is not None and str(x).strip() != '') | |
elif arg is not None and str(arg).strip() != '': | |
count += 1 | |
return count | |
def _min(self, *args) -> float: | |
"""MIN function implementation.""" | |
values = [] | |
for arg in args: | |
if isinstance(arg, list): | |
values.extend([self._to_number(x) for x in arg if self._is_number(x)]) | |
elif self._is_number(arg): | |
values.append(self._to_number(arg)) | |
return min(values) if values else 0 | |
def _max(self, *args) -> float: | |
"""MAX function implementation.""" | |
values = [] | |
for arg in args: | |
if isinstance(arg, list): | |
values.extend([self._to_number(x) for x in arg if self._is_number(x)]) | |
elif self._is_number(arg): | |
values.append(self._to_number(arg)) | |
return max(values) if values else 0 | |
def _median(self, *args) -> float: | |
"""MEDIAN function implementation.""" | |
values = [] | |
for arg in args: | |
if isinstance(arg, list): | |
values.extend([self._to_number(x) for x in arg if self._is_number(x)]) | |
elif self._is_number(arg): | |
values.append(self._to_number(arg)) | |
if not values: | |
return 0 | |
sorted_values = sorted(values) | |
n = len(sorted_values) | |
if n % 2 == 0: | |
return (sorted_values[n//2 - 1] + sorted_values[n//2]) / 2 | |
else: | |
return sorted_values[n//2] | |
def _stdev(self, *args) -> float: | |
"""STDEV function implementation.""" | |
values = [] | |
for arg in args: | |
if isinstance(arg, list): | |
values.extend([self._to_number(x) for x in arg if self._is_number(x)]) | |
elif self._is_number(arg): | |
values.append(self._to_number(arg)) | |
if len(values) < 2: | |
return 0 | |
mean = sum(values) / len(values) | |
variance = sum((x - mean) ** 2 for x in values) / (len(values) - 1) | |
return math.sqrt(variance) | |
def _var(self, *args) -> float: | |
"""VAR function implementation.""" | |
values = [] | |
for arg in args: | |
if isinstance(arg, list): | |
values.extend([self._to_number(x) for x in arg if self._is_number(x)]) | |
elif self._is_number(arg): | |
values.append(self._to_number(arg)) | |
if len(values) < 2: | |
return 0 | |
mean = sum(values) / len(values) | |
return sum((x - mean) ** 2 for x in values) / (len(values) - 1) | |
def _if(self, condition, true_value, false_value) -> Any: | |
"""IF function implementation.""" | |
if self._to_boolean(condition): | |
return true_value | |
else: | |
return false_value | |
def _and(self, *args) -> bool: | |
"""AND function implementation.""" | |
return all(self._to_boolean(arg) for arg in args) | |
def _or(self, *args) -> bool: | |
"""OR function implementation.""" | |
return any(self._to_boolean(arg) for arg in args) | |
def _not(self, value) -> bool: | |
"""NOT function implementation.""" | |
return not self._to_boolean(value) | |
def _round(self, number, digits=0) -> float: | |
"""ROUND function implementation.""" | |
return round(self._to_number(number), int(digits)) | |
def _abs(self, number) -> float: | |
"""ABS function implementation.""" | |
return abs(self._to_number(number)) | |
def _sqrt(self, number) -> float: | |
"""SQRT function implementation.""" | |
num = self._to_number(number) | |
if num < 0: | |
return "#NUM!" | |
return math.sqrt(num) | |
def _power(self, number, power) -> float: | |
"""POWER function implementation.""" | |
return self._to_number(number) ** self._to_number(power) | |
def _mod(self, number, divisor) -> float: | |
"""MOD function implementation.""" | |
return self._to_number(number) % self._to_number(divisor) | |
def _concatenate(self, *args) -> str: | |
"""CONCATENATE function implementation.""" | |
return ''.join(str(arg) for arg in args) | |
def _left(self, text, num_chars) -> str: | |
"""LEFT function implementation.""" | |
return str(text)[:int(num_chars)] | |
def _right(self, text, num_chars) -> str: | |
"""RIGHT function implementation.""" | |
return str(text)[-int(num_chars):] | |
def _mid(self, text, start_num, num_chars) -> str: | |
"""MID function implementation.""" | |
start = int(start_num) - 1 # Excel uses 1-based indexing | |
return str(text)[start:start + int(num_chars)] | |
def _len(self, text) -> int: | |
"""LEN function implementation.""" | |
return len(str(text)) | |
def _upper(self, text) -> str: | |
"""UPPER function implementation.""" | |
return str(text).upper() | |
def _lower(self, text) -> str: | |
"""LOWER function implementation.""" | |
return str(text).lower() | |
def _trim(self, text) -> str: | |
"""TRIM function implementation.""" | |
return str(text).strip() | |
def _sumif(self, range_arg, criteria, sum_range=None) -> float: | |
"""SUMIF function implementation.""" | |
# This is a simplified implementation | |
# In a full implementation, you'd need to handle the range and criteria properly | |
if sum_range is None: | |
sum_range = range_arg | |
if isinstance(range_arg, list) and isinstance(sum_range, list): | |
total = 0 | |
for i, value in enumerate(range_arg): | |
if i < len(sum_range) and self._meets_criteria(value, criteria): | |
if self._is_number(sum_range[i]): | |
total += self._to_number(sum_range[i]) | |
return total | |
return 0 | |
def _countif(self, range_arg, criteria) -> int: | |
"""COUNTIF function implementation.""" | |
if isinstance(range_arg, list): | |
return sum(1 for value in range_arg if self._meets_criteria(value, criteria)) | |
return 0 | |
def _averageif(self, range_arg, criteria, average_range=None) -> float: | |
"""AVERAGEIF function implementation.""" | |
if average_range is None: | |
average_range = range_arg | |
if isinstance(range_arg, list) and isinstance(average_range, list): | |
values = [] | |
for i, value in enumerate(range_arg): | |
if i < len(average_range) and self._meets_criteria(value, criteria): | |
if self._is_number(average_range[i]): | |
values.append(self._to_number(average_range[i])) | |
return sum(values) / len(values) if values else 0 | |
return 0 | |
def _meets_criteria(self, value, criteria) -> bool: | |
"""Check if value meets the given criteria.""" | |
criteria_str = str(criteria) | |
value_str = str(value) | |
# Handle comparison operators | |
if criteria_str.startswith('>='): | |
return self._to_number(value) >= self._to_number(criteria_str[2:]) | |
elif criteria_str.startswith('<='): | |
return self._to_number(value) <= self._to_number(criteria_str[2:]) | |
elif criteria_str.startswith('<>'): | |
return value_str != criteria_str[2:] | |
elif criteria_str.startswith('>'): | |
return self._to_number(value) > self._to_number(criteria_str[1:]) | |
elif criteria_str.startswith('<'): | |
return self._to_number(value) < self._to_number(criteria_str[1:]) | |
elif criteria_str.startswith('='): | |
return value_str == criteria_str[1:] | |
else: | |
# Exact match or wildcard | |
if '*' in criteria_str or '?' in criteria_str: | |
# Simple wildcard matching | |
pattern = criteria_str.replace('*', '.*').replace('?', '.') | |
return re.match(pattern, value_str, re.IGNORECASE) is not None | |
else: | |
return value_str == criteria_str | |
def _is_number(self, value) -> bool: | |
"""Check if value is a number.""" | |
try: | |
float(value) | |
return True | |
except (ValueError, TypeError): | |
return False | |
def _to_number(self, value) -> float: | |
"""Convert value to number.""" | |
if isinstance(value, (int, float)): | |
return float(value) | |
try: | |
return float(value) | |
except (ValueError, TypeError): | |
return 0 | |
def _to_boolean(self, value) -> bool: | |
"""Convert value to boolean.""" | |
if isinstance(value, bool): | |
return value | |
if isinstance(value, (int, float)): | |
return value != 0 | |
if isinstance(value, str): | |
return value.lower() in ['true', '1', 'yes'] | |
return bool(value) | |
def get_formula_evaluator_tools() -> List[Any]: | |
"""Get formula evaluator tools for AGNO integration.""" | |
from .base_tool import BaseTool | |
class FormulaEvaluatorTool(BaseTool): | |
"""Formula evaluator tool for GAIA agent.""" | |
def __init__(self): | |
super().__init__( | |
name="formula_evaluator", | |
description="Evaluate Excel formulas and mathematical expressions" | |
) | |
self.evaluator = FormulaEvaluator() | |
def execute(self, formula: str, data: pd.DataFrame = None, | |
cell_references: Dict[str, Any] = None) -> Dict[str, Any]: | |
"""Execute formula evaluation.""" | |
try: | |
result = self.evaluator.evaluate_formula(formula, data, cell_references) | |
return { | |
"formula": formula, | |
"result": result, | |
"success": True | |
} | |
except Exception as e: | |
return { | |
"formula": formula, | |
"error": f"Formula evaluation failed: {str(e)}", | |
"success": False | |
} | |
return [FormulaEvaluatorTool()] |