""" Code Analysis Tool for GAIA Agent Python code parsing, analysis, and execution flow prediction. Features: - Python code parsing and AST analysis - Dependency detection and import analysis - Execution flow analysis and variable tracking - Output prediction and result estimation - Code optimization suggestions - Error detection and debugging assistance """ import ast import logging import re import sys import inspect import importlib from typing import Dict, Any, List, Optional, Set, Tuple, Union from pathlib import Path import json logger = logging.getLogger(__name__) class CodeStructureAnalyzer: """Analyze Python code structure and components.""" def __init__(self): """Initialize the code structure analyzer.""" self.builtin_functions = set(dir(__builtins__)) self.standard_modules = { 'math', 'os', 'sys', 'json', 'csv', 'datetime', 'time', 'random', 'collections', 'itertools', 'functools', 'operator', 'string', 're', 'urllib', 'http', 'pathlib', 'typing', 'decimal', 'fractions', 'statistics', 'cmath' } def analyze_code_structure(self, code: str) -> Dict[str, Any]: """ Analyze the structure of Python code. Args: code: Python code to analyze Returns: Dictionary with code structure information """ try: tree = ast.parse(code) analysis = { 'imports': self._extract_imports(tree), 'functions': self._extract_functions(tree), 'classes': self._extract_classes(tree), 'variables': self._extract_variables(tree), 'constants': self._extract_constants(tree), 'control_flow': self._analyze_control_flow(tree), 'complexity': self._calculate_complexity(tree), 'dependencies': self._analyze_dependencies(tree), 'potential_outputs': self._predict_outputs(tree), 'syntax_valid': True } return analysis except SyntaxError as e: return { 'syntax_valid': False, 'syntax_error': str(e), 'line_number': e.lineno, 'error_text': e.text } except Exception as e: logger.error(f"Code analysis failed: {e}") return { 'syntax_valid': False, 'analysis_error': str(e) } def _extract_imports(self, tree: ast.AST) -> List[Dict[str, Any]]: """Extract import statements from AST.""" imports = [] for node in ast.walk(tree): if isinstance(node, ast.Import): for alias in node.names: imports.append({ 'type': 'import', 'module': alias.name, 'alias': alias.asname, 'is_standard': alias.name.split('.')[0] in self.standard_modules }) elif isinstance(node, ast.ImportFrom): module = node.module or '' for alias in node.names: imports.append({ 'type': 'from_import', 'module': module, 'name': alias.name, 'alias': alias.asname, 'is_standard': module.split('.')[0] in self.standard_modules }) return imports def _extract_functions(self, tree: ast.AST) -> List[Dict[str, Any]]: """Extract function definitions from AST.""" functions = [] for node in ast.walk(tree): if isinstance(node, ast.FunctionDef): functions.append({ 'name': node.name, 'args': [arg.arg for arg in node.args.args], 'defaults': len(node.args.defaults), 'returns': ast.unparse(node.returns) if node.returns else None, 'docstring': ast.get_docstring(node), 'line_number': node.lineno, 'is_async': False }) elif isinstance(node, ast.AsyncFunctionDef): functions.append({ 'name': node.name, 'args': [arg.arg for arg in node.args.args], 'defaults': len(node.args.defaults), 'returns': ast.unparse(node.returns) if node.returns else None, 'docstring': ast.get_docstring(node), 'line_number': node.lineno, 'is_async': True }) return functions def _extract_classes(self, tree: ast.AST) -> List[Dict[str, Any]]: """Extract class definitions from AST.""" classes = [] for node in ast.walk(tree): if isinstance(node, ast.ClassDef): methods = [] for item in node.body: if isinstance(item, ast.FunctionDef): methods.append({ 'name': item.name, 'args': [arg.arg for arg in item.args.args], 'is_property': any( isinstance(d, ast.Name) and d.id == 'property' for d in item.decorator_list ) }) classes.append({ 'name': node.name, 'bases': [ast.unparse(base) for base in node.bases], 'methods': methods, 'docstring': ast.get_docstring(node), 'line_number': node.lineno }) return classes def _extract_variables(self, tree: ast.AST) -> List[Dict[str, Any]]: """Extract variable assignments from AST.""" variables = [] for node in ast.walk(tree): if isinstance(node, ast.Assign): for target in node.targets: if isinstance(target, ast.Name): variables.append({ 'name': target.id, 'type': 'assignment', 'value': ast.unparse(node.value), 'line_number': node.lineno }) elif isinstance(node, ast.AnnAssign) and node.target: if isinstance(node.target, ast.Name): variables.append({ 'name': node.target.id, 'type': 'annotated_assignment', 'annotation': ast.unparse(node.annotation), 'value': ast.unparse(node.value) if node.value else None, 'line_number': node.lineno }) return variables def _extract_constants(self, tree: ast.AST) -> List[Dict[str, Any]]: """Extract constant values from AST.""" constants = [] for node in ast.walk(tree): if isinstance(node, ast.Constant): constants.append({ 'value': node.value, 'type': type(node.value).__name__, 'line_number': node.lineno }) return constants def _analyze_control_flow(self, tree: ast.AST) -> Dict[str, Any]: """Analyze control flow structures.""" control_flow = { 'if_statements': 0, 'for_loops': 0, 'while_loops': 0, 'try_except': 0, 'with_statements': 0, 'comprehensions': 0, 'max_nesting_depth': 0 } def calculate_depth(node, current_depth=0): max_depth = current_depth for child in ast.iter_child_nodes(node): if isinstance(child, (ast.If, ast.For, ast.While, ast.Try, ast.With)): child_depth = calculate_depth(child, current_depth + 1) max_depth = max(max_depth, child_depth) else: child_depth = calculate_depth(child, current_depth) max_depth = max(max_depth, child_depth) return max_depth for node in ast.walk(tree): if isinstance(node, ast.If): control_flow['if_statements'] += 1 elif isinstance(node, ast.For): control_flow['for_loops'] += 1 elif isinstance(node, ast.While): control_flow['while_loops'] += 1 elif isinstance(node, ast.Try): control_flow['try_except'] += 1 elif isinstance(node, ast.With): control_flow['with_statements'] += 1 elif isinstance(node, (ast.ListComp, ast.DictComp, ast.SetComp, ast.GeneratorExp)): control_flow['comprehensions'] += 1 control_flow['max_nesting_depth'] = calculate_depth(tree) return control_flow def _calculate_complexity(self, tree: ast.AST) -> Dict[str, int]: """Calculate code complexity metrics.""" complexity = { 'cyclomatic_complexity': 1, # Base complexity 'lines_of_code': len(ast.unparse(tree).split('\n')), 'number_of_nodes': len(list(ast.walk(tree))) } # Calculate cyclomatic complexity for node in ast.walk(tree): if isinstance(node, (ast.If, ast.While, ast.For, ast.ExceptHandler)): complexity['cyclomatic_complexity'] += 1 elif isinstance(node, ast.BoolOp): complexity['cyclomatic_complexity'] += len(node.values) - 1 return complexity def _analyze_dependencies(self, tree: ast.AST) -> Dict[str, Any]: """Analyze code dependencies.""" dependencies = { 'external_modules': set(), 'standard_modules': set(), 'builtin_functions': set(), 'undefined_names': set() } # Track defined names defined_names = set() # Extract imports for node in ast.walk(tree): if isinstance(node, ast.Import): for alias in node.names: module_name = alias.name.split('.')[0] if module_name in self.standard_modules: dependencies['standard_modules'].add(alias.name) else: dependencies['external_modules'].add(alias.name) defined_names.add(alias.asname or alias.name) elif isinstance(node, ast.ImportFrom): module = node.module or '' module_name = module.split('.')[0] if module_name in self.standard_modules: dependencies['standard_modules'].add(module) else: dependencies['external_modules'].add(module) for alias in node.names: defined_names.add(alias.asname or alias.name) # Track function and class definitions elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): defined_names.add(node.name) # Track variable assignments elif isinstance(node, ast.Assign): for target in node.targets: if isinstance(target, ast.Name): defined_names.add(target.id) # Find undefined names for node in ast.walk(tree): if isinstance(node, ast.Name) and isinstance(node.ctx, ast.Load): if (node.id not in defined_names and node.id not in self.builtin_functions and not node.id.startswith('_')): dependencies['undefined_names'].add(node.id) elif node.id in self.builtin_functions: dependencies['builtin_functions'].add(node.id) # Convert sets to lists for JSON serialization for key in dependencies: dependencies[key] = list(dependencies[key]) return dependencies def _predict_outputs(self, tree: ast.AST) -> List[Dict[str, Any]]: """Predict potential outputs from code.""" outputs = [] for node in ast.walk(tree): # Look for print statements if isinstance(node, ast.Call): if isinstance(node.func, ast.Name) and node.func.id == 'print': outputs.append({ 'type': 'print', 'line_number': node.lineno, 'args': [ast.unparse(arg) for arg in node.args] }) # Look for return statements elif isinstance(node, ast.Return): outputs.append({ 'type': 'return', 'line_number': node.lineno, 'value': ast.unparse(node.value) if node.value else None }) # Look for expressions that might produce output elif isinstance(node, ast.Expr): # Check if it's a standalone expression that would be printed in REPL if not isinstance(node.value, ast.Call): outputs.append({ 'type': 'expression', 'line_number': node.lineno, 'expression': ast.unparse(node.value) }) return outputs class ExecutionFlowAnalyzer: """Analyze execution flow and predict behavior.""" def __init__(self): """Initialize execution flow analyzer.""" pass def analyze_execution_flow(self, code: str) -> Dict[str, Any]: """ Analyze the execution flow of Python code. Args: code: Python code to analyze Returns: Execution flow analysis """ try: tree = ast.parse(code) analysis = { 'execution_order': self._determine_execution_order(tree), 'variable_lifecycle': self._track_variable_lifecycle(tree), 'function_calls': self._extract_function_calls(tree), 'potential_errors': self._detect_potential_errors(tree), 'performance_notes': self._analyze_performance(tree), 'final_result_prediction': self._predict_final_result(tree, code) } return analysis except Exception as e: logger.error(f"Execution flow analysis failed: {e}") return {'error': str(e)} def _determine_execution_order(self, tree: ast.AST) -> List[Dict[str, Any]]: """Determine the order of code execution.""" execution_order = [] for i, node in enumerate(tree.body): if isinstance(node, ast.FunctionDef): execution_order.append({ 'step': i + 1, 'type': 'function_definition', 'name': node.name, 'line': node.lineno }) elif isinstance(node, ast.ClassDef): execution_order.append({ 'step': i + 1, 'type': 'class_definition', 'name': node.name, 'line': node.lineno }) elif isinstance(node, ast.Import): modules = [alias.name for alias in node.names] execution_order.append({ 'step': i + 1, 'type': 'import', 'modules': modules, 'line': node.lineno }) elif isinstance(node, ast.ImportFrom): execution_order.append({ 'step': i + 1, 'type': 'from_import', 'module': node.module, 'names': [alias.name for alias in node.names], 'line': node.lineno }) elif isinstance(node, ast.Assign): execution_order.append({ 'step': i + 1, 'type': 'assignment', 'targets': [ast.unparse(target) for target in node.targets], 'value': ast.unparse(node.value), 'line': node.lineno }) elif isinstance(node, ast.Expr): execution_order.append({ 'step': i + 1, 'type': 'expression', 'expression': ast.unparse(node.value), 'line': node.lineno }) else: execution_order.append({ 'step': i + 1, 'type': type(node).__name__.lower(), 'line': node.lineno }) return execution_order def _track_variable_lifecycle(self, tree: ast.AST) -> Dict[str, Dict[str, Any]]: """Track variable definitions, modifications, and usage.""" variables = {} for node in ast.walk(tree): if isinstance(node, ast.Assign): for target in node.targets: if isinstance(target, ast.Name): var_name = target.id if var_name not in variables: variables[var_name] = { 'first_assignment': node.lineno, 'assignments': [], 'usages': [] } variables[var_name]['assignments'].append({ 'line': node.lineno, 'value': ast.unparse(node.value) }) elif isinstance(node, ast.Name) and isinstance(node.ctx, ast.Load): var_name = node.id if var_name in variables: variables[var_name]['usages'].append(node.lineno) return variables def _extract_function_calls(self, tree: ast.AST) -> List[Dict[str, Any]]: """Extract all function calls in execution order.""" function_calls = [] for node in ast.walk(tree): if isinstance(node, ast.Call): call_info = { 'line': node.lineno, 'args': [ast.unparse(arg) for arg in node.args], 'kwargs': {kw.arg: ast.unparse(kw.value) for kw in node.keywords} } if isinstance(node.func, ast.Name): call_info['function'] = node.func.id call_info['type'] = 'simple_call' elif isinstance(node.func, ast.Attribute): call_info['function'] = ast.unparse(node.func) call_info['type'] = 'method_call' else: call_info['function'] = ast.unparse(node.func) call_info['type'] = 'complex_call' function_calls.append(call_info) return function_calls def _detect_potential_errors(self, tree: ast.AST) -> List[Dict[str, Any]]: """Detect potential runtime errors.""" potential_errors = [] for node in ast.walk(tree): # Division by zero if isinstance(node, ast.BinOp) and isinstance(node.op, ast.Div): if isinstance(node.right, ast.Constant) and node.right.value == 0: potential_errors.append({ 'type': 'division_by_zero', 'line': node.lineno, 'message': 'Division by zero detected' }) # Undefined variable usage (basic check) elif isinstance(node, ast.Name) and isinstance(node.ctx, ast.Load): # This is a simplified check - would need more sophisticated analysis pass # Index out of bounds (basic patterns) elif isinstance(node, ast.Subscript): if isinstance(node.slice, ast.Constant): potential_errors.append({ 'type': 'potential_index_error', 'line': node.lineno, 'message': 'Potential index out of bounds' }) return potential_errors def _analyze_performance(self, tree: ast.AST) -> List[str]: """Analyze potential performance issues.""" performance_notes = [] for node in ast.walk(tree): # Nested loops if isinstance(node, ast.For): for child in ast.walk(node): if isinstance(child, ast.For) and child != node: performance_notes.append( f"Nested loops detected at line {node.lineno} - consider optimization" ) break # List comprehensions vs loops elif isinstance(node, ast.ListComp): performance_notes.append( f"List comprehension at line {node.lineno} - good for performance" ) return performance_notes def _predict_final_result(self, tree: ast.AST, code: str) -> Dict[str, Any]: """Predict the final result of code execution.""" prediction = { 'has_return_statement': False, 'has_print_statements': False, 'last_expression': None, 'predicted_output_type': 'none' } # Check for return statements for node in ast.walk(tree): if isinstance(node, ast.Return): prediction['has_return_statement'] = True if node.value: prediction['return_value'] = ast.unparse(node.value) elif isinstance(node, ast.Call): if isinstance(node.func, ast.Name) and node.func.id == 'print': prediction['has_print_statements'] = True # Check last statement if tree.body: last_stmt = tree.body[-1] if isinstance(last_stmt, ast.Expr): prediction['last_expression'] = ast.unparse(last_stmt.value) prediction['predicted_output_type'] = 'expression_result' elif isinstance(last_stmt, ast.Return): prediction['predicted_output_type'] = 'return_value' if prediction['has_print_statements']: prediction['predicted_output_type'] = 'printed_output' return prediction class CodeAnalyzerTool: """AGNO-compatible code analysis tool.""" def __init__(self): """Initialize the code analyzer tool.""" self.structure_analyzer = CodeStructureAnalyzer() self.flow_analyzer = ExecutionFlowAnalyzer() self.available = True logger.info("CodeAnalyzerTool initialized") def analyze_python_code(self, code: str) -> str: """ Analyze Python code structure and execution flow. Args: code: Python code to analyze Returns: Formatted analysis report """ try: # Analyze code structure structure = self.structure_analyzer.analyze_code_structure(code) if not structure.get('syntax_valid', False): return f"Syntax Error: {structure.get('syntax_error', 'Unknown syntax error')}" # Analyze execution flow flow = self.flow_analyzer.analyze_execution_flow(code) # Format report report = "Code Analysis Report\n" report += "=" * 50 + "\n\n" # Structure analysis report += "STRUCTURE ANALYSIS:\n" report += f"- Functions: {len(structure['functions'])}\n" report += f"- Classes: {len(structure['classes'])}\n" report += f"- Variables: {len(structure['variables'])}\n" report += f"- Imports: {len(structure['imports'])}\n" report += f"- Complexity: {structure['complexity']['cyclomatic_complexity']}\n\n" # Dependencies if structure['dependencies']['external_modules']: report += f"External Dependencies: {', '.join(structure['dependencies']['external_modules'])}\n" # Execution flow if 'execution_order' in flow: report += f"\nEXECUTION STEPS: {len(flow['execution_order'])}\n" # Predicted output if 'final_result_prediction' in flow: pred = flow['final_result_prediction'] report += f"\nPREDICTED OUTPUT TYPE: {pred['predicted_output_type']}\n" if pred.get('last_expression'): report += f"Last Expression: {pred['last_expression']}\n" # Potential issues if 'potential_errors' in flow and flow['potential_errors']: report += "\nPOTENTIAL ISSUES:\n" for error in flow['potential_errors']: report += f"- Line {error['line']}: {error['message']}\n" return report except Exception as e: return f"Analysis failed: {e}" def predict_code_output(self, code: str) -> str: """ Predict the output of Python code without executing it. Args: code: Python code to analyze Returns: Predicted output description """ try: structure = self.structure_analyzer.analyze_code_structure(code) flow = self.flow_analyzer.analyze_execution_flow(code) if not structure.get('syntax_valid', False): return f"Cannot predict output - syntax error: {structure.get('syntax_error')}" prediction = "Output Prediction:\n" prediction += "-" * 30 + "\n" # Check for print statements if structure['potential_outputs']: print_outputs = [out for out in structure['potential_outputs'] if out['type'] == 'print'] if print_outputs: prediction += f"Print statements: {len(print_outputs)}\n" for out in print_outputs[:3]: # Show first 3 prediction += f" Line {out['line_number']}: print({', '.join(out['args'])})\n" # Check for return statements returns = [out for out in structure['potential_outputs'] if out['type'] == 'return'] if returns: prediction += f"Return statements: {len(returns)}\n" for ret in returns[:3]: prediction += f" Line {ret['line_number']}: return {ret['value']}\n" # Check for expressions expressions = [out for out in structure['potential_outputs'] if out['type'] == 'expression'] if expressions: prediction += f"Final expression: {expressions[-1]['expression']}\n" # Final result prediction if 'final_result_prediction' in flow: pred = flow['final_result_prediction'] prediction += f"\nFinal result type: {pred['predicted_output_type']}\n" return prediction except Exception as e: return f"Prediction failed: {e}" def detect_code_dependencies(self, code: str) -> str: """ Detect dependencies and imports required by code. Args: code: Python code to analyze Returns: Dependencies report """ try: structure = self.structure_analyzer.analyze_code_structure(code) if not structure.get('syntax_valid', False): return f"Cannot analyze dependencies - syntax error: {structure.get('syntax_error')}" deps = structure['dependencies'] report = "Dependencies Analysis:\n" report += "-" * 30 + "\n" if deps['standard_modules']: report += f"Standard library modules: {', '.join(deps['standard_modules'])}\n" if deps['external_modules']: report += f"External modules: {', '.join(deps['external_modules'])}\n" if deps['builtin_functions']: report += f"Built-in functions used: {', '.join(deps['builtin_functions'])}\n" if deps['undefined_names']: report += f"Undefined names (potential issues): {', '.join(deps['undefined_names'])}\n" return report except Exception as e: return f"Dependency analysis failed: {e}" def suggest_code_optimizations(self, code: str) -> str: """ Suggest optimizations for Python code. Args: code: Python code to analyze Returns: Optimization suggestions """ try: structure = self.structure_analyzer.analyze_code_structure(code) flow = self.flow_analyzer.analyze_execution_flow(code) suggestions = "Code Optimization Suggestions:\n" suggestions += "-" * 40 + "\n" # Complexity suggestions complexity = structure['complexity']['cyclomatic_complexity'] if complexity > 10: suggestions += f"- High complexity ({complexity}) - consider breaking into smaller functions\n" # Control flow suggestions control = structure['control_flow'] if control['max_nesting_depth'] > 3: suggestions += f"- Deep nesting ({control['max_nesting_depth']} levels) - consider refactoring\n" # Performance notes from flow analysis if 'performance_notes' in flow: for note in flow['performance_notes']: suggestions += f"- {note}\n" # Import suggestions deps = structure['dependencies'] if len(deps['external_modules']) > 5: suggestions += "- Many external dependencies - consider reducing for better portability\n" if not suggestions.strip().endswith(":\n" + "-" * 40): return suggestions else: return suggestions + "No specific optimizations suggested - code looks good!\n" except Exception as e: return f"Optimization analysis failed: {e}" def get_code_analyzer_tools(): """Get code analyzer tools for AGNO registration.""" tool = CodeAnalyzerTool() return [ { 'name': 'analyze_python_code', 'function': tool.analyze_python_code, 'description': 'Analyze Python code structure, complexity, and execution flow' }, { 'name': 'predict_code_output', 'function': tool.predict_code_output, 'description': 'Predict the output of Python code without executing it' }, { 'name': 'detect_code_dependencies', 'function': tool.detect_code_dependencies, 'description': 'Detect dependencies and imports required by Python code' }, { 'name': 'suggest_code_optimizations', 'function': tool.suggest_code_optimizations, 'description': 'Suggest optimizations and improvements for Python code' } ] if __name__ == "__main__": # Test the code analyzer tool = CodeAnalyzerTool() test_code = """ import math import numpy as np def calculate_result(x, y): result = math.sqrt(x**2 + y**2) return result * math.pi data = [1, 2, 3, 4, 5] mean_value = np.mean(data) final_result = calculate_result(mean_value, 2.5) print(f"Final result: {final_result}") final_result """ print("Testing CodeAnalyzerTool:") print("=" * 50) analysis = tool.analyze_python_code(test_code) print(analysis) print("\n" + "=" * 50) prediction = tool.predict_code_output(test_code) print(prediction)