""" Mathematical Engine for GAIA Agent Advanced mathematical computation capabilities with symbolic mathematics. Features: - Symbolic mathematics with SymPy - Numerical computations with high precision - Statistical analysis and probability - Equation solving and optimization - Mathematical expression parsing and evaluation - Formula manipulation and simplification """ import logging import math import cmath import decimal import fractions import statistics import re from typing import Dict, Any, Optional, Union, List, Tuple import json # Mathematical libraries try: import numpy as np NUMPY_AVAILABLE = True except ImportError: NUMPY_AVAILABLE = False try: import scipy from scipy import stats, optimize, integrate, linalg, special SCIPY_AVAILABLE = True except ImportError: SCIPY_AVAILABLE = False try: import sympy as sp from sympy import ( symbols, Symbol, solve, diff, integrate as sp_integrate, simplify, expand, factor, limit, series, Matrix, pi, E, I, sin, cos, tan, exp, log, sqrt, Abs, oo, zoo, nan, Rational, Float, Integer, Poly, roots, cancel, apart, together, collect, trigsimp, powsimp, radsimp, logcombine ) SYMPY_AVAILABLE = True except ImportError: SYMPY_AVAILABLE = False logger = logging.getLogger(__name__) class MathematicalExpressionParser: """Parse and evaluate mathematical expressions safely.""" def __init__(self): """Initialize the expression parser.""" self.safe_functions = { # Basic math functions 'abs': abs, 'round': round, 'min': min, 'max': max, 'sum': sum, 'pow': pow, # Math module functions 'sqrt': math.sqrt, 'exp': math.exp, 'log': math.log, 'log10': math.log10, 'log2': math.log2, 'sin': math.sin, 'cos': math.cos, 'tan': math.tan, 'asin': math.asin, 'acos': math.acos, 'atan': math.atan, 'atan2': math.atan2, 'sinh': math.sinh, 'cosh': math.cosh, 'tanh': math.tanh, 'asinh': math.asinh, 'acosh': math.acosh, 'atanh': math.atanh, 'degrees': math.degrees, 'radians': math.radians, 'ceil': math.ceil, 'floor': math.floor, 'trunc': math.trunc, 'factorial': math.factorial, 'gcd': math.gcd, 'gamma': math.gamma, 'lgamma': math.lgamma, # Constants 'pi': math.pi, 'e': math.e, 'tau': math.tau, 'inf': math.inf, 'nan': math.nan, } # Add numpy functions if available if NUMPY_AVAILABLE: self.safe_functions.update({ 'array': np.array, 'zeros': np.zeros, 'ones': np.ones, 'arange': np.arange, 'linspace': np.linspace, 'mean': np.mean, 'median': np.median, 'std': np.std, 'var': np.var, 'percentile': np.percentile, 'dot': np.dot, 'cross': np.cross, 'norm': np.linalg.norm, }) def parse_expression(self, expression: str) -> Any: """ Parse and evaluate a mathematical expression safely. Args: expression: Mathematical expression as string Returns: Evaluated result """ try: # Clean the expression cleaned_expr = self._clean_expression(expression) # Evaluate using safe functions result = eval(cleaned_expr, {"__builtins__": {}}, self.safe_functions) return result except Exception as e: logger.error(f"Failed to parse expression '{expression}': {e}") raise ValueError(f"Invalid mathematical expression: {e}") def _clean_expression(self, expression: str) -> str: """Clean and validate mathematical expression.""" # Remove whitespace cleaned = expression.strip() # Replace common mathematical notation replacements = { '^': '**', # Power operator '×': '*', # Multiplication '÷': '/', # Division '√': 'sqrt', # Square root } for old, new in replacements.items(): cleaned = cleaned.replace(old, new) return cleaned class SymbolicMathEngine: """Symbolic mathematics engine using SymPy.""" def __init__(self): """Initialize symbolic math engine.""" self.available = SYMPY_AVAILABLE if not self.available: logger.warning("SymPy not available - symbolic math features disabled") def solve_equation(self, equation: str, variable: str = 'x') -> List[Any]: """ Solve an equation symbolically. Args: equation: Equation as string (e.g., "x**2 - 4 = 0") variable: Variable to solve for Returns: List of solutions """ if not self.available: raise RuntimeError("SymPy not available for symbolic solving") try: # Create symbol var = symbols(variable) # Parse equation if '=' in equation: left, right = equation.split('=', 1) expr = sp.sympify(left.strip()) - sp.sympify(right.strip()) else: expr = sp.sympify(equation) # Solve equation solutions = solve(expr, var) return [float(sol.evalf()) if sol.is_real else complex(sol.evalf()) for sol in solutions] except Exception as e: logger.error(f"Failed to solve equation '{equation}': {e}") raise ValueError(f"Could not solve equation: {e}") def differentiate(self, expression: str, variable: str = 'x', order: int = 1) -> str: """ Compute derivative of an expression. Args: expression: Mathematical expression variable: Variable to differentiate with respect to order: Order of derivative Returns: Derivative as string """ if not self.available: raise RuntimeError("SymPy not available for differentiation") try: var = symbols(variable) expr = sp.sympify(expression) derivative = diff(expr, var, order) return str(derivative) except Exception as e: logger.error(f"Failed to differentiate '{expression}': {e}") raise ValueError(f"Could not compute derivative: {e}") def integrate(self, expression: str, variable: str = 'x', limits: Optional[Tuple[float, float]] = None) -> str: """ Compute integral of an expression. Args: expression: Mathematical expression variable: Variable to integrate with respect to limits: Integration limits (a, b) for definite integral Returns: Integral as string or numerical value """ if not self.available: raise RuntimeError("SymPy not available for integration") try: var = symbols(variable) expr = sp.sympify(expression) if limits: # Definite integral result = sp_integrate(expr, (var, limits[0], limits[1])) return float(result.evalf()) if result.is_real else str(result) else: # Indefinite integral result = sp_integrate(expr, var) return str(result) except Exception as e: logger.error(f"Failed to integrate '{expression}': {e}") raise ValueError(f"Could not compute integral: {e}") def simplify_expression(self, expression: str) -> str: """ Simplify a mathematical expression. Args: expression: Mathematical expression to simplify Returns: Simplified expression as string """ if not self.available: raise RuntimeError("SymPy not available for simplification") try: expr = sp.sympify(expression) simplified = simplify(expr) return str(simplified) except Exception as e: logger.error(f"Failed to simplify '{expression}': {e}") raise ValueError(f"Could not simplify expression: {e}") def factor_expression(self, expression: str) -> str: """ Factor a mathematical expression. Args: expression: Mathematical expression to factor Returns: Factored expression as string """ if not self.available: raise RuntimeError("SymPy not available for factoring") try: expr = sp.sympify(expression) factored = factor(expr) return str(factored) except Exception as e: logger.error(f"Failed to factor '{expression}': {e}") raise ValueError(f"Could not factor expression: {e}") def expand_expression(self, expression: str) -> str: """ Expand a mathematical expression. Args: expression: Mathematical expression to expand Returns: Expanded expression as string """ if not self.available: raise RuntimeError("SymPy not available for expansion") try: expr = sp.sympify(expression) expanded = expand(expr) return str(expanded) except Exception as e: logger.error(f"Failed to expand '{expression}': {e}") raise ValueError(f"Could not expand expression: {e}") class NumericalMathEngine: """Numerical mathematics engine using NumPy and SciPy.""" def __init__(self): """Initialize numerical math engine.""" self.numpy_available = NUMPY_AVAILABLE self.scipy_available = SCIPY_AVAILABLE if not self.numpy_available: logger.warning("NumPy not available - numerical features limited") if not self.scipy_available: logger.warning("SciPy not available - advanced numerical features disabled") def compute_statistics(self, data: List[float]) -> Dict[str, float]: """ Compute comprehensive statistics for numerical data. Args: data: List of numerical values Returns: Dictionary of statistical measures """ if not data: raise ValueError("Empty data provided") try: # Convert to numpy array if available if self.numpy_available: arr = np.array(data, dtype=float) # Ensure float type stats_dict = { 'count': len(data), 'mean': float(np.mean(arr)), 'median': float(np.median(arr)), 'std': float(np.std(arr, ddof=1)), # Sample standard deviation 'variance': float(np.var(arr, ddof=1)), # Sample variance 'min': float(np.min(arr)), 'max': float(np.max(arr)), 'sum': float(np.sum(arr)), 'range': float(np.max(arr) - np.min(arr)), 'q1': float(np.percentile(arr, 25)), 'q3': float(np.percentile(arr, 75)), 'iqr': float(np.percentile(arr, 75) - np.percentile(arr, 25)) } # Add SciPy statistics if available if self.scipy_available: try: mode_result = stats.mode(arr, keepdims=True) mode_value = float(mode_result.mode[0]) if len(mode_result.mode) > 0 else None stats_dict.update({ 'skewness': float(stats.skew(arr)), 'kurtosis': float(stats.kurtosis(arr)), 'mode': mode_value }) except Exception as scipy_error: logger.debug(f"SciPy statistics failed: {scipy_error}") # Add basic mode calculation fallback try: unique, counts = np.unique(arr, return_counts=True) mode_idx = np.argmax(counts) stats_dict['mode'] = float(unique[mode_idx]) except: stats_dict['mode'] = None else: # Fallback to built-in statistics stats_dict = { 'count': len(data), 'mean': statistics.mean(data), 'median': statistics.median(data), 'std': statistics.stdev(data) if len(data) > 1 else 0, 'variance': statistics.variance(data) if len(data) > 1 else 0, 'min': min(data), 'max': max(data), 'sum': sum(data), 'range': max(data) - min(data) } try: stats_dict['mode'] = statistics.mode(data) except statistics.StatisticsError: stats_dict['mode'] = None return stats_dict except Exception as e: logger.error(f"Failed to compute statistics: {e}") raise ValueError(f"Could not compute statistics: {e}") def solve_linear_system(self, A: List[List[float]], b: List[float]) -> List[float]: """ Solve linear system Ax = b. Args: A: Coefficient matrix b: Right-hand side vector Returns: Solution vector """ if not self.numpy_available: raise RuntimeError("NumPy required for linear system solving") try: A_array = np.array(A) b_array = np.array(b) solution = np.linalg.solve(A_array, b_array) return solution.tolist() except Exception as e: logger.error(f"Failed to solve linear system: {e}") raise ValueError(f"Could not solve linear system: {e}") def find_roots(self, coefficients: List[float]) -> List[complex]: """ Find roots of a polynomial. Args: coefficients: Polynomial coefficients (highest degree first) Returns: List of roots (complex numbers) """ if not self.numpy_available: raise RuntimeError("NumPy required for root finding") try: roots = np.roots(coefficients) return [complex(root) for root in roots] except Exception as e: logger.error(f"Failed to find roots: {e}") raise ValueError(f"Could not find polynomial roots: {e}") def numerical_integration(self, func_str: str, a: float, b: float, method: str = 'quad') -> float: """ Perform numerical integration. Args: func_str: Function as string (e.g., "x**2 + 1") a: Lower limit b: Upper limit method: Integration method Returns: Integral value """ if not self.scipy_available: raise RuntimeError("SciPy required for numerical integration") try: # Create function from string def func(x): return eval(func_str, {"x": x, "math": math, "np": np}) if method == 'quad': result, _ = integrate.quad(func, a, b) else: raise ValueError(f"Unknown integration method: {method}") return float(result) except Exception as e: logger.error(f"Failed numerical integration: {e}") raise ValueError(f"Could not perform numerical integration: {e}") class MathematicalEngine: """Comprehensive mathematical engine combining symbolic and numerical capabilities.""" def __init__(self): """Initialize the mathematical engine.""" self.parser = MathematicalExpressionParser() self.symbolic = SymbolicMathEngine() self.numerical = NumericalMathEngine() self.available = True logger.info("MathematicalEngine initialized") logger.info(f"Symbolic math available: {self.symbolic.available}") logger.info(f"NumPy available: {self.numerical.numpy_available}") logger.info(f"SciPy available: {self.numerical.scipy_available}") def evaluate_expression(self, expression: str, variables: Optional[Dict[str, float]] = None, precision: int = 15) -> Dict[str, Any]: """ Evaluate a mathematical expression with high precision. Args: expression: Mathematical expression to evaluate variables: Dictionary of variable values (e.g., {"x": 5, "y": 3}) precision: Decimal precision for results Returns: Dictionary with success status and result """ try: # Try symbolic evaluation first for exact results if self.symbolic.available: try: # Pre-process expression to handle common mathematical constants processed_expr = expression.replace('e**', 'E**').replace('e^', 'E^') # Handle standalone 'e' that should be Euler's number import re processed_expr = re.sub(r'\be\b', 'E', processed_expr) expr = sp.sympify(processed_expr) # Substitute variables if provided if variables: for var_name, var_value in variables.items(): var_symbol = symbols(var_name) expr = expr.subs(var_symbol, var_value) result = expr.evalf(precision) # Always try to convert to numerical value if result.is_real: return {"success": True, "result": float(result)} elif result.is_complex: return {"success": True, "result": complex(result)} elif result.is_number: # Try to extract numerical value from symbolic result try: numerical_result = float(result) return {"success": True, "result": numerical_result} except: pass # If we can't get a numerical result, try to evaluate further try: # Method 1: Substitute symbolic constants with numerical values expr_with_constants = expr.subs([(sp.pi, math.pi), (sp.E, math.e)]) numerical_result = float(expr_with_constants.evalf(precision)) return {"success": True, "result": numerical_result} except: try: # Method 2: Use lambdify to convert to numerical function func = sp.lambdify([], expr, 'math') numerical_result = float(func()) return {"success": True, "result": numerical_result} except: try: # Method 3: Force numerical evaluation with N() numerical_result = float(sp.N(expr, precision)) return {"success": True, "result": numerical_result} except: return {"success": True, "result": str(result)} except: pass # Fallback to numerical evaluation if variables: # Create a safe namespace with variables safe_namespace = self.parser.safe_functions.copy() safe_namespace.update(variables) result = eval(expression, {"__builtins__": {}}, safe_namespace) else: result = self.parser.parse_expression(expression) # Format with specified precision if isinstance(result, float): result = round(result, precision) return {"success": True, "result": result} except Exception as e: logger.error(f"Failed to evaluate expression '{expression}': {e}") return {"success": False, "error": str(e)} def solve_mathematical_problem(self, problem_type: str, **kwargs) -> Any: """ Solve various types of mathematical problems. Args: problem_type: Type of problem to solve **kwargs: Problem-specific parameters Returns: Solution result """ try: if problem_type == "equation": return self.symbolic.solve_equation(kwargs['equation'], kwargs.get('variable', 'x')) elif problem_type == "derivative": return self.symbolic.differentiate( kwargs['expression'], kwargs.get('variable', 'x'), kwargs.get('order', 1) ) elif problem_type == "integral": return self.symbolic.integrate( kwargs['expression'], kwargs.get('variable', 'x'), kwargs.get('limits') ) elif problem_type == "simplify": return self.symbolic.simplify_expression(kwargs['expression']) elif problem_type == "factor": return self.symbolic.factor_expression(kwargs['expression']) elif problem_type == "expand": return self.symbolic.expand_expression(kwargs['expression']) elif problem_type == "statistics": return self.numerical.compute_statistics(kwargs['data']) elif problem_type == "linear_system": return self.numerical.solve_linear_system(kwargs['A'], kwargs['b']) elif problem_type == "polynomial_roots": return self.numerical.find_roots(kwargs['coefficients']) elif problem_type == "numerical_integration": return self.numerical.numerical_integration( kwargs['function'], kwargs['a'], kwargs['b'], kwargs.get('method', 'quad') ) else: raise ValueError(f"Unknown problem type: {problem_type}") except Exception as e: logger.error(f"Failed to solve {problem_type} problem: {e}") raise def compute_derivative(self, expression: str, variable: str = 'x', order: int = 1) -> Dict[str, Any]: """ Compute derivative of an expression. Args: expression: Mathematical expression variable: Variable to differentiate with respect to order: Order of derivative Returns: Dictionary with success status and derivative """ try: derivative = self.symbolic.differentiate(expression, variable, order) return {"success": True, "derivative": derivative} except Exception as e: logger.error(f"Failed to compute derivative of '{expression}': {e}") return {"success": False, "error": str(e)} def compute_integral(self, expression: str, variable: str = 'x', limits: Optional[Tuple[float, float]] = None) -> Dict[str, Any]: """ Compute integral of an expression. Args: expression: Mathematical expression variable: Variable to integrate with respect to limits: Integration limits (a, b) for definite integral Returns: Dictionary with success status and integral """ try: integral = self.symbolic.integrate(expression, variable, limits) return {"success": True, "integral": integral} except Exception as e: logger.error(f"Failed to compute integral of '{expression}': {e}") return {"success": False, "error": str(e)} def solve_equation(self, equation: str, variable: str = 'x') -> Dict[str, Any]: """ Solve an equation symbolically. Args: equation: Equation as string (e.g., "x**2 - 4 = 0") variable: Variable to solve for Returns: Dictionary with success status and solutions """ try: solutions = self.symbolic.solve_equation(equation, variable) return {"success": True, "solutions": solutions} except Exception as e: logger.error(f"Failed to solve equation '{equation}': {e}") return {"success": False, "error": str(e)} def analyze_statistics(self, data: List[float]) -> Dict[str, Any]: """ Compute comprehensive statistics for numerical data. Args: data: List of numerical values Returns: Dictionary with success status and statistical measures """ try: stats = self.numerical.compute_statistics(data) result = {"success": True} # Map field names to match test expectations for key, value in stats.items(): if key == 'std': result['std_dev'] = value else: result[key] = value return result except Exception as e: logger.error(f"Failed to analyze statistics: {e}") return {"success": False, "error": str(e)} def get_capabilities(self) -> Dict[str, Any]: """Get engine capabilities and status.""" return { 'available': self.available, 'symbolic_math': self.symbolic.available, 'numerical_math': self.numerical.numpy_available, 'advanced_numerical': self.numerical.scipy_available, 'supported_operations': [ 'expression_evaluation', 'equation_solving', 'differentiation', 'integration', 'simplification', 'factoring', 'expansion', 'statistics', 'linear_algebra', 'polynomial_operations', 'numerical_integration' ], 'precision': 'up to 50 decimal places (symbolic)', 'libraries': { 'sympy': self.symbolic.available, 'numpy': self.numerical.numpy_available, 'scipy': self.numerical.scipy_available } } # AGNO tool registration class MathematicalEngineTool: """AGNO-compatible mathematical engine tool.""" def __init__(self): """Initialize the tool.""" self.engine = MathematicalEngine() self.available = self.engine.available logger.info("MathematicalEngineTool initialized") def evaluate_mathematical_expression(self, expression: str, precision: int = 15) -> str: """ Evaluate a mathematical expression. Args: expression: Mathematical expression to evaluate precision: Decimal precision Returns: Formatted result """ try: result = self.engine.evaluate_expression(expression, None, precision) if result['success']: return f"Expression: {expression}\nResult: {result['result']}" else: return f"Error evaluating '{expression}': {result['error']}" except Exception as e: return f"Error evaluating '{expression}': {e}" def solve_equation(self, equation: str, variable: str = 'x') -> str: """ Solve an equation symbolically. Args: equation: Equation to solve variable: Variable to solve for Returns: Solutions """ try: solutions = self.engine.solve_mathematical_problem( 'equation', equation=equation, variable=variable ) return f"Equation: {equation}\nSolutions for {variable}: {solutions}" except Exception as e: return f"Error solving equation '{equation}': {e}" def compute_derivative(self, expression: str, variable: str = 'x', order: int = 1) -> str: """ Compute derivative of an expression. Args: expression: Expression to differentiate variable: Variable to differentiate with respect to order: Order of derivative Returns: Derivative """ try: derivative = self.engine.solve_mathematical_problem( 'derivative', expression=expression, variable=variable, order=order ) return f"d^{order}/d{variable}^{order}({expression}) = {derivative}" except Exception as e: return f"Error computing derivative: {e}" def compute_integral(self, expression: str, variable: str = 'x', limits: Optional[str] = None) -> str: """ Compute integral of an expression. Args: expression: Expression to integrate variable: Variable to integrate with respect to limits: Integration limits as "a,b" for definite integral Returns: Integral """ try: limit_tuple = None if limits: a, b = map(float, limits.split(',')) limit_tuple = (a, b) integral = self.engine.solve_mathematical_problem( 'integral', expression=expression, variable=variable, limits=limit_tuple ) if limit_tuple: return f"∫[{limit_tuple[0]} to {limit_tuple[1]}] {expression} d{variable} = {integral}" else: return f"∫ {expression} d{variable} = {integral}" except Exception as e: return f"Error computing integral: {e}" def analyze_data_statistics(self, data: str) -> str: """ Compute statistics for numerical data. Args: data: Comma-separated numerical values Returns: Statistical analysis """ try: # Parse data values = [float(x.strip()) for x in data.split(',') if x.strip()] stats = self.engine.solve_mathematical_problem('statistics', data=values) result = "Statistical Analysis:\n" for key, value in stats.items(): if value is not None: result += f"{key.capitalize()}: {value}\n" return result.strip() except Exception as e: return f"Error analyzing data: {e}" def get_mathematical_engine_tools(): """Get mathematical engine tools for AGNO registration.""" tool = MathematicalEngineTool() return [ { 'name': 'evaluate_mathematical_expression', 'function': tool.evaluate_mathematical_expression, 'description': 'Evaluate mathematical expressions with high precision' }, { 'name': 'solve_equation', 'function': tool.solve_equation, 'description': 'Solve equations symbolically' }, { 'name': 'compute_derivative', 'function': tool.compute_derivative, 'description': 'Compute derivatives of mathematical expressions' }, { 'name': 'compute_integral', 'function': tool.compute_integral, 'description': 'Compute integrals (definite and indefinite)' }, { 'name': 'analyze_data_statistics', 'function': tool.analyze_data_statistics, 'description': 'Perform statistical analysis on numerical data' } ] if __name__ == "__main__": # Test the mathematical engine engine = MathematicalEngine() print("Testing MathematicalEngine:") print("=" * 50) # Test expression evaluation test_expr = "sqrt(2) * pi + e**2" result = engine.evaluate_expression(test_expr) print(f"Expression: {test_expr}") print(f"Result: {result}") print() # Test capabilities capabilities = engine.get_capabilities() print("Engine Capabilities:") print(json.dumps(capabilities, indent=2))