""" Data Analysis Engine for GAIA Agent - Phase 4 Advanced data analysis capabilities for Excel and structured data Features: - Statistical analysis of Excel data - Data aggregation and summarization - Financial calculations and reporting - Category-based filtering (food vs drinks) - Currency formatting and precision handling - Data validation and quality checks """ import logging import pandas as pd import numpy as np from typing import Dict, Any, List, Optional, Union, Tuple from decimal import Decimal, ROUND_HALF_UP import re from datetime import datetime, date logger = logging.getLogger(__name__) class DataAnalysisEngine: """Advanced data analysis engine for GAIA evaluation tasks.""" def __init__(self): """Initialize the data analysis engine.""" self.available = True self.analysis_cache = {} def analyze_financial_data(self, data: Union[pd.DataFrame, List[Dict]], sales_columns: List[str] = None, category_columns: List[str] = None, filters: Dict[str, Any] = None) -> Dict[str, Any]: """ Perform comprehensive financial data analysis. Args: data: DataFrame or list of dictionaries containing the data sales_columns: Columns containing sales/financial data category_columns: Columns containing category information filters: Dictionary of filters to apply Returns: Comprehensive financial analysis results """ try: # Convert to DataFrame if needed if isinstance(data, list): df = pd.DataFrame(data) else: df = data.copy() if df.empty: return {"error": "No data provided for analysis"} # Auto-detect columns if not provided if sales_columns is None: sales_columns = self._detect_sales_columns(df) if category_columns is None: category_columns = self._detect_category_columns(df) # Apply filters filtered_df = self._apply_filters(df, filters) if filters else df # Perform analysis analysis_results = { "total_records": len(df), "filtered_records": len(filtered_df), "sales_analysis": self._analyze_sales_data(filtered_df, sales_columns), "category_analysis": self._analyze_categories(filtered_df, category_columns, sales_columns), "statistical_summary": self._generate_statistical_summary(filtered_df, sales_columns), "data_quality": self._assess_data_quality(filtered_df), "filters_applied": filters or {}, "columns_analyzed": { "sales_columns": sales_columns, "category_columns": category_columns } } return analysis_results except Exception as e: logger.error(f"❌ Financial data analysis failed: {e}") return {"error": f"Analysis failed: {str(e)}"} def calculate_category_totals(self, data: Union[pd.DataFrame, List[Dict]], category_column: str, sales_column: str, include_categories: List[str] = None, exclude_categories: List[str] = None) -> Dict[str, Any]: """ Calculate totals by category with inclusion/exclusion filters. Args: data: DataFrame or list of dictionaries category_column: Column containing categories sales_column: Column containing sales amounts include_categories: Categories to include exclude_categories: Categories to exclude Returns: Category totals and analysis """ try: # Convert to DataFrame if needed if isinstance(data, list): df = pd.DataFrame(data) else: df = data.copy() if df.empty or category_column not in df.columns or sales_column not in df.columns: return {"error": "Required columns not found in data"} # Clean and prepare data df[category_column] = df[category_column].astype(str).str.strip() df[sales_column] = pd.to_numeric(df[sales_column], errors='coerce') # Remove rows with invalid sales data df = df.dropna(subset=[sales_column]) # Apply category filters if include_categories: mask = df[category_column].str.lower().isin([cat.lower() for cat in include_categories]) df = df[mask] if exclude_categories: mask = ~df[category_column].str.lower().isin([cat.lower() for cat in exclude_categories]) df = df[mask] # Calculate totals by category category_totals = df.groupby(category_column)[sales_column].agg([ 'sum', 'count', 'mean', 'min', 'max' ]).round(2) # Calculate overall total overall_total = df[sales_column].sum() # Prepare results results = { "overall_total": float(overall_total), "formatted_total": self._format_currency(overall_total), "category_breakdown": {}, "summary": { "total_categories": len(category_totals), "total_items": len(df), "average_per_item": float(df[sales_column].mean()) if len(df) > 0 else 0 }, "filters_applied": { "include_categories": include_categories, "exclude_categories": exclude_categories } } # Add category breakdown for category, stats in category_totals.iterrows(): results["category_breakdown"][category] = { "total": float(stats['sum']), "formatted_total": self._format_currency(stats['sum']), "count": int(stats['count']), "average": float(stats['mean']), "min": float(stats['min']), "max": float(stats['max']), "percentage_of_total": float((stats['sum'] / overall_total * 100)) if overall_total > 0 else 0 } return results except Exception as e: logger.error(f"❌ Category totals calculation failed: {e}") return {"error": f"Calculation failed: {str(e)}"} def detect_food_vs_drinks(self, data: Union[pd.DataFrame, List[Dict]], category_columns: List[str] = None) -> Dict[str, Any]: """ Detect and categorize items as food vs drinks. Args: data: DataFrame or list of dictionaries category_columns: Columns to analyze for food/drink classification Returns: Classification results with food and drink items """ try: # Convert to DataFrame if needed if isinstance(data, list): df = pd.DataFrame(data) else: df = data.copy() if df.empty: return {"error": "No data provided"} # Auto-detect category columns if not provided if category_columns is None: category_columns = self._detect_category_columns(df) # Food and drink keywords food_keywords = [ 'burger', 'sandwich', 'pizza', 'salad', 'fries', 'chicken', 'beef', 'pork', 'fish', 'pasta', 'rice', 'bread', 'soup', 'steak', 'wings', 'nuggets', 'taco', 'burrito', 'wrap', 'hot dog', 'sub', 'panini', 'quesadilla', 'breakfast', 'lunch', 'dinner', 'appetizer', 'dessert', 'cake', 'pie', 'food', 'meal', 'dish', 'entree', 'side' ] drink_keywords = [ 'drink', 'beverage', 'soda', 'cola', 'pepsi', 'coke', 'sprite', 'fanta', 'coffee', 'tea', 'latte', 'cappuccino', 'espresso', 'mocha', 'juice', 'water', 'milk', 'shake', 'smoothie', 'beer', 'wine', 'cocktail', 'martini', 'whiskey', 'vodka', 'rum', 'gin', 'lemonade', 'iced tea', 'hot chocolate', 'energy drink' ] classification_results = { "food_items": [], "drink_items": [], "unclassified_items": [], "classification_summary": {} } # Analyze each category column for col in category_columns: if col not in df.columns: continue unique_items = df[col].dropna().unique() for item in unique_items: item_str = str(item).lower() # Check for food keywords is_food = any(keyword in item_str for keyword in food_keywords) # Check for drink keywords is_drink = any(keyword in item_str for keyword in drink_keywords) if is_food and not is_drink: classification_results["food_items"].append(str(item)) elif is_drink and not is_food: classification_results["drink_items"].append(str(item)) else: classification_results["unclassified_items"].append(str(item)) # Remove duplicates classification_results["food_items"] = list(set(classification_results["food_items"])) classification_results["drink_items"] = list(set(classification_results["drink_items"])) classification_results["unclassified_items"] = list(set(classification_results["unclassified_items"])) # Generate summary classification_results["classification_summary"] = { "total_items": len(classification_results["food_items"]) + len(classification_results["drink_items"]) + len(classification_results["unclassified_items"]), "food_count": len(classification_results["food_items"]), "drink_count": len(classification_results["drink_items"]), "unclassified_count": len(classification_results["unclassified_items"]), "classification_confidence": ( (len(classification_results["food_items"]) + len(classification_results["drink_items"])) / max(1, len(classification_results["food_items"]) + len(classification_results["drink_items"]) + len(classification_results["unclassified_items"])) ) * 100 } return classification_results except Exception as e: logger.error(f"❌ Food vs drinks detection failed: {e}") return {"error": f"Detection failed: {str(e)}"} def _detect_sales_columns(self, df: pd.DataFrame) -> List[str]: """Detect columns that likely contain sales/financial data.""" sales_keywords = [ 'sales', 'amount', 'total', 'price', 'cost', 'revenue', 'value', 'sum', 'subtotal', 'grand total', 'net', 'gross' ] sales_columns = [] for col in df.columns: col_lower = str(col).lower() # Check for sales keywords in column name if any(keyword in col_lower for keyword in sales_keywords): if pd.api.types.is_numeric_dtype(df[col]): sales_columns.append(col) continue # Check if column contains numeric data that looks like currency if pd.api.types.is_numeric_dtype(df[col]): values = df[col].dropna() if len(values) > 0: # Check if values are positive and in reasonable range for currency if values.min() >= 0 and values.max() < 1000000: # Check if values have decimal places (common for currency) decimal_count = sum(1 for v in values if v != int(v)) if decimal_count > len(values) * 0.1: # 10% have decimals sales_columns.append(col) return sales_columns def _detect_category_columns(self, df: pd.DataFrame) -> List[str]: """Detect columns that likely contain category/classification data.""" category_keywords = [ 'category', 'type', 'item', 'product', 'name', 'description', 'class', 'group', 'kind', 'menu', 'food', 'drink' ] category_columns = [] for col in df.columns: col_lower = str(col).lower() # Check for category keywords if any(keyword in col_lower for keyword in category_keywords): if df[col].dtype == 'object': # Text column category_columns.append(col) continue # Check if column contains text with reasonable variety if df[col].dtype == 'object': unique_count = df[col].nunique() total_count = len(df[col].dropna()) # Good category column has some variety but not too much if total_count > 0 and 2 <= unique_count <= total_count * 0.5: category_columns.append(col) return category_columns def _apply_filters(self, df: pd.DataFrame, filters: Dict[str, Any]) -> pd.DataFrame: """Apply filters to the dataframe.""" filtered_df = df.copy() try: for column, filter_value in filters.items(): if column not in df.columns: continue if isinstance(filter_value, dict): # Range filter if 'min' in filter_value: filtered_df = filtered_df[filtered_df[column] >= filter_value['min']] if 'max' in filter_value: filtered_df = filtered_df[filtered_df[column] <= filter_value['max']] elif isinstance(filter_value, list): # Include filter filtered_df = filtered_df[filtered_df[column].isin(filter_value)] else: # Exact match filter filtered_df = filtered_df[filtered_df[column] == filter_value] return filtered_df except Exception as e: logger.error(f"❌ Failed to apply filters: {e}") return df def _analyze_sales_data(self, df: pd.DataFrame, sales_columns: List[str]) -> Dict[str, Any]: """Analyze sales data columns.""" sales_analysis = {} for col in sales_columns: if col not in df.columns: continue values = df[col].dropna() if len(values) == 0: continue sales_analysis[col] = { "total": float(values.sum()), "formatted_total": self._format_currency(values.sum()), "count": len(values), "average": float(values.mean()), "median": float(values.median()), "min": float(values.min()), "max": float(values.max()), "std_dev": float(values.std()) if len(values) > 1 else 0 } # Calculate overall totals if multiple sales columns if len(sales_analysis) > 1: overall_total = sum(analysis["total"] for analysis in sales_analysis.values()) sales_analysis["overall"] = { "total": overall_total, "formatted_total": self._format_currency(overall_total) } return sales_analysis def _analyze_categories(self, df: pd.DataFrame, category_columns: List[str], sales_columns: List[str]) -> Dict[str, Any]: """Analyze category distributions and their sales performance.""" category_analysis = {} for cat_col in category_columns: if cat_col not in df.columns: continue category_stats = { "unique_categories": df[cat_col].nunique(), "category_distribution": df[cat_col].value_counts().to_dict(), "sales_by_category": {} } # Analyze sales by category for sales_col in sales_columns: if sales_col not in df.columns: continue sales_by_cat = df.groupby(cat_col)[sales_col].agg([ 'sum', 'count', 'mean' ]).round(2) category_stats["sales_by_category"][sales_col] = {} for category, stats in sales_by_cat.iterrows(): category_stats["sales_by_category"][sales_col][category] = { "total": float(stats['sum']), "formatted_total": self._format_currency(stats['sum']), "count": int(stats['count']), "average": float(stats['mean']) } category_analysis[cat_col] = category_stats return category_analysis def _generate_statistical_summary(self, df: pd.DataFrame, sales_columns: List[str]) -> Dict[str, Any]: """Generate comprehensive statistical summary.""" summary = { "data_shape": df.shape, "missing_values": df.isnull().sum().to_dict(), "data_types": df.dtypes.astype(str).to_dict(), "numeric_summary": {} } # Detailed analysis for sales columns for col in sales_columns: if col in df.columns and pd.api.types.is_numeric_dtype(df[col]): values = df[col].dropna() if len(values) > 0: summary["numeric_summary"][col] = { "count": len(values), "mean": float(values.mean()), "std": float(values.std()) if len(values) > 1 else 0, "min": float(values.min()), "25%": float(values.quantile(0.25)), "50%": float(values.quantile(0.50)), "75%": float(values.quantile(0.75)), "max": float(values.max()), "sum": float(values.sum()) } return summary def _assess_data_quality(self, df: pd.DataFrame) -> Dict[str, Any]: """Assess data quality and identify potential issues.""" quality_assessment = { "completeness": {}, "consistency": {}, "validity": {}, "overall_score": 0 } # Completeness check total_cells = df.shape[0] * df.shape[1] missing_cells = df.isnull().sum().sum() completeness_score = ((total_cells - missing_cells) / total_cells) * 100 if total_cells > 0 else 0 quality_assessment["completeness"] = { "score": completeness_score, "missing_percentage": (missing_cells / total_cells) * 100 if total_cells > 0 else 0, "columns_with_missing": df.columns[df.isnull().any()].tolist() } # Consistency check (for numeric columns) numeric_columns = df.select_dtypes(include=[np.number]).columns consistency_issues = [] for col in numeric_columns: values = df[col].dropna() if len(values) > 0: # Check for negative values in sales data if 'sales' in col.lower() or 'amount' in col.lower(): if (values < 0).any(): consistency_issues.append(f"{col}: Contains negative values") # Check for extreme outliers q1, q3 = values.quantile([0.25, 0.75]) iqr = q3 - q1 outliers = values[(values < q1 - 3*iqr) | (values > q3 + 3*iqr)] if len(outliers) > 0: consistency_issues.append(f"{col}: Contains {len(outliers)} extreme outliers") quality_assessment["consistency"] = { "issues": consistency_issues, "score": max(0, 100 - len(consistency_issues) * 10) } # Overall quality score quality_assessment["overall_score"] = ( completeness_score * 0.6 + quality_assessment["consistency"]["score"] * 0.4 ) return quality_assessment def _format_currency(self, amount: float, currency: str = "USD", decimal_places: int = 2) -> str: """Format amount as currency with specified decimal places.""" try: # Round to specified decimal places rounded_amount = Decimal(str(amount)).quantize( Decimal('0.' + '0' * decimal_places), rounding=ROUND_HALF_UP ) if currency.upper() == "USD": return f"${rounded_amount:.{decimal_places}f}" else: return f"{rounded_amount:.{decimal_places}f} {currency}" except Exception as e: logger.error(f"❌ Failed to format currency: {e}") return f"{amount:.{decimal_places}f}" def get_data_analysis_engine_tools() -> List[Any]: """Get data analysis engine tools for AGNO integration.""" from .base_tool import BaseTool class DataAnalysisEngineTool(BaseTool): """Data analysis engine tool for GAIA agent.""" def __init__(self): super().__init__( name="data_analysis_engine", description="Advanced data analysis for financial and categorical data" ) self.engine = DataAnalysisEngine() def execute(self, data: Union[pd.DataFrame, List[Dict]], analysis_type: str = "financial", **kwargs) -> Dict[str, Any]: """Execute data analysis.""" try: if analysis_type == "financial": return self.engine.analyze_financial_data(data, **kwargs) elif analysis_type == "category_totals": return self.engine.calculate_category_totals(data, **kwargs) elif analysis_type == "food_vs_drinks": return self.engine.detect_food_vs_drinks(data, **kwargs) else: return {"error": f"Unknown analysis type: {analysis_type}"} except Exception as e: return {"error": f"Data analysis failed: {str(e)}"} return [DataAnalysisEngineTool()]