Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
File Handling Debug Test - Phase 2 Emergency Recovery | |
This test reproduces the "Error file not found" issues from GAIA evaluation | |
and debugs the file handling integration with the main agent. | |
Test Cases: | |
1. Excel file processing (sales data analysis) | |
2. Audio file processing (transcription) | |
3. Document processing (PDF/text files) | |
4. Python code file execution | |
5. Base64 file handling | |
6. File path resolution issues | |
""" | |
import os | |
import sys | |
import tempfile | |
import base64 | |
import json | |
import logging | |
from pathlib import Path | |
# Add the current directory to Python path | |
sys.path.append(os.path.dirname(os.path.abspath(__file__))) | |
# Import the file handler and agent | |
from utils.file_handler import ( | |
EnhancedFileHandler, | |
FileType, | |
FileFormat, | |
ProcessedFile, | |
process_file, | |
validate_file_exists | |
) | |
from agents.fixed_enhanced_unified_agno_agent import FixedGAIAAgent | |
# Set up logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
class FileHandlingDebugger: | |
"""Debug file handling issues in GAIA evaluation context.""" | |
def __init__(self): | |
"""Initialize the debugger.""" | |
self.file_handler = EnhancedFileHandler() | |
self.agent = None | |
self.test_files = {} | |
self.results = {} | |
# Try to initialize the agent | |
try: | |
self.agent = FixedGAIAAgent() | |
logger.info("β GAIA Agent initialized for testing") | |
except Exception as e: | |
logger.warning(f"β οΈ Could not initialize GAIA Agent: {e}") | |
def create_test_files(self): | |
"""Create test files for debugging.""" | |
logger.info("π Creating test files...") | |
# Create temporary directory | |
self.temp_dir = tempfile.mkdtemp(prefix="gaia_file_test_") | |
logger.info(f"π Test directory: {self.temp_dir}") | |
# 1. Create Excel-like CSV file (simulating Excel data) | |
excel_data = """Item,Category,Sales,Price | |
Burger,Food,150,8.99 | |
Fries,Food,200,3.49 | |
Coke,Drink,180,2.99 | |
Sprite,Drink,120,2.99 | |
Chicken,Food,90,12.99 | |
Water,Drink,75,1.99""" | |
excel_file = os.path.join(self.temp_dir, "sales_data.csv") | |
with open(excel_file, 'w') as f: | |
f.write(excel_data) | |
self.test_files['excel'] = excel_file | |
logger.info(f"π Created Excel test file: {excel_file}") | |
# 2. Create text document | |
doc_content = """This is a test document for GAIA evaluation. | |
It contains multiple lines of text that should be processed correctly. | |
The document discusses various topics including: | |
- File handling capabilities | |
- Text processing | |
- Document analysis | |
- Content extraction""" | |
doc_file = os.path.join(self.temp_dir, "test_document.txt") | |
with open(doc_file, 'w') as f: | |
f.write(doc_content) | |
self.test_files['document'] = doc_file | |
logger.info(f"π Created document test file: {doc_file}") | |
# 3. Create Python code file | |
python_code = """# Test Python code for GAIA evaluation | |
def calculate_factorial(n): | |
if n <= 1: | |
return 1 | |
return n * calculate_factorial(n - 1) | |
def calculate_power(base, exponent): | |
return base ** exponent | |
# Main calculations | |
result1 = calculate_factorial(5) | |
result2 = calculate_power(2, 8) | |
final_result = result1 + result2 | |
print(f"Factorial of 5: {result1}") | |
print(f"2 to the power of 8: {result2}") | |
print(f"Final result: {final_result}") | |
""" | |
python_file = os.path.join(self.temp_dir, "test_code.py") | |
with open(python_file, 'w') as f: | |
f.write(python_code) | |
self.test_files['python'] = python_file | |
logger.info(f"π Created Python test file: {python_file}") | |
# 4. Create JSON data file | |
json_data = { | |
"users": [ | |
{"id": 1, "name": "Alice", "age": 30, "city": "New York"}, | |
{"id": 2, "name": "Bob", "age": 25, "city": "San Francisco"}, | |
{"id": 3, "name": "Charlie", "age": 35, "city": "Chicago"} | |
], | |
"metadata": { | |
"total_users": 3, | |
"created_date": "2024-01-01", | |
"version": "1.0" | |
} | |
} | |
json_file = os.path.join(self.temp_dir, "test_data.json") | |
with open(json_file, 'w') as f: | |
json.dump(json_data, f, indent=2) | |
self.test_files['json'] = json_file | |
logger.info(f"π Created JSON test file: {json_file}") | |
# 5. Create base64 encoded content | |
base64_content = base64.b64encode(b"Hello World from base64 encoding!").decode() | |
self.test_files['base64'] = f"data:text/plain;base64,{base64_content}" | |
logger.info(f"π Created base64 test content") | |
return self.test_files | |
def test_file_handler_basic(self): | |
"""Test basic file handler functionality.""" | |
logger.info("\nπ§ͺ Testing File Handler Basic Functionality") | |
results = {} | |
for file_type, file_path in self.test_files.items(): | |
if file_type == 'base64': | |
continue # Skip base64 for basic tests | |
logger.info(f"\nπ Testing {file_type}: {file_path}") | |
try: | |
# Test file existence validation | |
exists = validate_file_exists(file_path) | |
logger.info(f" β File exists: {exists}") | |
# Test file type detection | |
detected_type, detected_format = self.file_handler.detect_file_type(file_path) | |
logger.info(f" π Detected type: {detected_type.value}, format: {detected_format.value}") | |
# Test path resolution | |
resolved_path = self.file_handler.resolve_file_path(file_path) | |
logger.info(f" ποΈ Resolved path: {resolved_path}") | |
# Test file validation | |
is_valid, error = self.file_handler.validate_file(file_path) | |
logger.info(f" β Valid: {is_valid}, Error: {error}") | |
# Test file processing | |
processed = process_file(file_path) | |
logger.info(f" π Processed successfully: {processed.info.exists}") | |
logger.info(f" π Content length: {len(processed.content) if processed.content else 0}") | |
results[file_type] = { | |
'exists': exists, | |
'detected_type': detected_type.value, | |
'detected_format': detected_format.value, | |
'resolved_path': resolved_path, | |
'is_valid': is_valid, | |
'validation_error': error, | |
'processed_successfully': processed.info.exists, | |
'content_length': len(processed.content) if processed.content else 0, | |
'processing_error': processed.info.error | |
} | |
except Exception as e: | |
logger.error(f" β Error testing {file_type}: {e}") | |
results[file_type] = {'error': str(e)} | |
self.results['basic_file_handler'] = results | |
return results | |
def test_base64_handling(self): | |
"""Test base64 file handling.""" | |
logger.info("\nπ§ͺ Testing Base64 File Handling") | |
base64_content = self.test_files['base64'] | |
logger.info(f"π Testing base64 content: {base64_content[:50]}...") | |
try: | |
# Test base64 detection | |
is_base64 = self.file_handler.is_base64_encoded(base64_content) | |
logger.info(f" β Is base64: {is_base64}") | |
# Test base64 decoding | |
decoded_bytes, mime_type = self.file_handler.decode_base64_file(base64_content) | |
logger.info(f" π Decoded length: {len(decoded_bytes)}") | |
logger.info(f" π MIME type: {mime_type}") | |
logger.info(f" π Decoded content: {decoded_bytes.decode()}") | |
# Test processing base64 as file input | |
processed = process_file(base64_content) | |
logger.info(f" π Processed successfully: {processed.info.exists}") | |
logger.info(f" π Is base64: {processed.info.is_base64}") | |
logger.info(f" π Temp path: {processed.temp_path}") | |
result = { | |
'is_base64': is_base64, | |
'decoded_length': len(decoded_bytes), | |
'mime_type': mime_type, | |
'decoded_content': decoded_bytes.decode(), | |
'processed_successfully': processed.info.exists, | |
'is_base64_processed': processed.info.is_base64, | |
'temp_path_created': processed.temp_path is not None, | |
'processing_error': processed.info.error | |
} | |
except Exception as e: | |
logger.error(f" β Error testing base64: {e}") | |
result = {'error': str(e)} | |
self.results['base64_handling'] = result | |
return result | |
def test_path_resolution_edge_cases(self): | |
"""Test edge cases in path resolution.""" | |
logger.info("\nπ§ͺ Testing Path Resolution Edge Cases") | |
test_cases = [ | |
# Existing files with different path formats | |
self.test_files['document'], | |
os.path.basename(self.test_files['document']), # Just filename | |
f"./{os.path.basename(self.test_files['document'])}", # Relative with ./ | |
# Non-existing files | |
"/non/existing/file.txt", | |
"non_existing_file.txt", | |
"./non_existing_file.txt", | |
# Edge cases | |
"", | |
".", | |
"..", | |
"/", | |
] | |
results = {} | |
for i, test_path in enumerate(test_cases): | |
logger.info(f"\nπ Test case {i+1}: '{test_path}'") | |
try: | |
# Test with different base paths | |
handler_with_temp = EnhancedFileHandler(base_paths=[self.temp_dir]) | |
resolved = handler_with_temp.resolve_file_path(test_path) | |
exists = os.path.exists(test_path) if test_path else False | |
logger.info(f" ποΈ Resolved: {resolved}") | |
logger.info(f" β Original exists: {exists}") | |
results[f'case_{i+1}'] = { | |
'input_path': test_path, | |
'resolved_path': resolved, | |
'original_exists': exists, | |
'resolution_successful': resolved is not None | |
} | |
except Exception as e: | |
logger.error(f" β Error: {e}") | |
results[f'case_{i+1}'] = { | |
'input_path': test_path, | |
'error': str(e) | |
} | |
self.results['path_resolution_edge_cases'] = results | |
return results | |
def test_agent_integration(self): | |
"""Test file handling integration with the GAIA agent.""" | |
logger.info("\nπ§ͺ Testing Agent Integration") | |
if not self.agent or not self.agent.available: | |
logger.warning("β οΈ GAIA Agent not available for integration testing") | |
return {'error': 'Agent not available'} | |
# Test questions that simulate GAIA evaluation scenarios | |
test_scenarios = [ | |
{ | |
'question': 'What is the total sales from food items in the attached file?', | |
'files': [self.test_files['excel']], | |
'expected_type': 'numerical' | |
}, | |
{ | |
'question': 'What is the content of the attached document?', | |
'files': [self.test_files['document']], | |
'expected_type': 'text' | |
}, | |
{ | |
'question': 'What is the final numeric output from the attached Python code?', | |
'files': [self.test_files['python']], | |
'expected_type': 'numerical' | |
}, | |
{ | |
'question': 'How many users are in the attached JSON file?', | |
'files': [self.test_files['json']], | |
'expected_type': 'numerical' | |
}, | |
{ | |
'question': 'What is the content of the base64 encoded data?', | |
'files': [self.test_files['base64']], | |
'expected_type': 'text' | |
} | |
] | |
results = {} | |
for i, scenario in enumerate(test_scenarios): | |
logger.info(f"\nπ― Scenario {i+1}: {scenario['question']}") | |
logger.info(f"π Files: {scenario['files']}") | |
try: | |
# Test file processing by agent | |
response = self.agent(scenario['question'], scenario['files']) | |
logger.info(f" π€ Agent response: {response}") | |
results[f'scenario_{i+1}'] = { | |
'question': scenario['question'], | |
'files': scenario['files'], | |
'expected_type': scenario['expected_type'], | |
'response': response, | |
'success': response != 'unknown' and response != '' | |
} | |
except Exception as e: | |
logger.error(f" β Error in scenario {i+1}: {e}") | |
results[f'scenario_{i+1}'] = { | |
'question': scenario['question'], | |
'files': scenario['files'], | |
'error': str(e) | |
} | |
self.results['agent_integration'] = results | |
return results | |
def test_file_not_found_reproduction(self): | |
"""Reproduce the specific 'Error file not found' issue.""" | |
logger.info("\nπ§ͺ Reproducing 'Error file not found' Issue") | |
# Test various file input formats that might cause issues | |
problematic_inputs = [ | |
# Different path formats | |
"sales_data.csv", # Just filename | |
"./sales_data.csv", # Relative path | |
"data/sales_data.csv", # Subdirectory | |
"/tmp/sales_data.csv", # Absolute path (non-existing) | |
# File info dictionaries (simulating GAIA format) | |
{"path": "sales_data.csv", "type": "excel"}, | |
{"filename": "sales_data.csv", "content_type": "application/vnd.ms-excel"}, | |
# Base64 without proper format | |
"SGVsbG8gV29ybGQ=", # Plain base64 | |
# Empty/None inputs | |
"", | |
None, | |
] | |
results = {} | |
for i, file_input in enumerate(problematic_inputs): | |
if file_input is None: | |
continue | |
logger.info(f"\nπ Testing problematic input {i+1}: {file_input}") | |
try: | |
# Test with file handler | |
processed = self.file_handler.process_file_input(file_input) | |
logger.info(f" π Exists: {processed.info.exists}") | |
logger.info(f" π Error: {processed.info.error}") | |
logger.info(f" π Path: {processed.info.path}") | |
# Test with agent if available | |
agent_response = None | |
if self.agent and self.agent.available: | |
try: | |
agent_response = self.agent("What is in this file?", [file_input]) | |
logger.info(f" π€ Agent response: {agent_response}") | |
except Exception as e: | |
logger.info(f" π€ Agent error: {e}") | |
agent_response = f"Error: {e}" | |
results[f'input_{i+1}'] = { | |
'input': str(file_input), | |
'file_exists': processed.info.exists, | |
'file_error': processed.info.error, | |
'file_path': processed.info.path, | |
'agent_response': agent_response | |
} | |
except Exception as e: | |
logger.error(f" β Error with input {i+1}: {e}") | |
results[f'input_{i+1}'] = { | |
'input': str(file_input), | |
'error': str(e) | |
} | |
self.results['file_not_found_reproduction'] = results | |
return results | |
def run_all_tests(self): | |
"""Run all debugging tests.""" | |
logger.info("π Starting File Handling Debug Tests") | |
# Create test files | |
self.create_test_files() | |
# Run all tests | |
self.test_file_handler_basic() | |
self.test_base64_handling() | |
self.test_path_resolution_edge_cases() | |
self.test_file_not_found_reproduction() | |
self.test_agent_integration() | |
# Generate summary report | |
self.generate_summary_report() | |
# Cleanup | |
self.cleanup() | |
def generate_summary_report(self): | |
"""Generate a summary report of all test results.""" | |
logger.info("\nπ Generating Summary Report") | |
report = { | |
'test_summary': { | |
'total_tests': len(self.results), | |
'test_directory': self.temp_dir, | |
'test_files_created': len(self.test_files) | |
}, | |
'results': self.results | |
} | |
# Save report to file | |
report_file = os.path.join(self.temp_dir, "file_handling_debug_report.json") | |
with open(report_file, 'w') as f: | |
json.dump(report, f, indent=2, default=str) | |
logger.info(f"π Report saved to: {report_file}") | |
# Print summary | |
logger.info("\nπ Test Summary:") | |
for test_name, test_results in self.results.items(): | |
logger.info(f" {test_name}: {len(test_results)} test cases") | |
# Count successes and failures | |
if isinstance(test_results, dict): | |
successes = sum(1 for result in test_results.values() | |
if isinstance(result, dict) and not result.get('error')) | |
failures = sum(1 for result in test_results.values() | |
if isinstance(result, dict) and result.get('error')) | |
logger.info(f" β Successes: {successes}") | |
logger.info(f" β Failures: {failures}") | |
return report | |
def cleanup(self): | |
"""Clean up test files and temporary resources.""" | |
logger.info("\nπ§Ή Cleaning up test files...") | |
try: | |
# Clean up temp files from file handler | |
if hasattr(self.file_handler, 'cleanup_temp_files'): | |
self.file_handler.cleanup_temp_files() | |
# Remove test files | |
for file_path in self.test_files.values(): | |
if isinstance(file_path, str) and os.path.exists(file_path): | |
os.unlink(file_path) | |
# Remove temp directory | |
if hasattr(self, 'temp_dir') and os.path.exists(self.temp_dir): | |
os.rmdir(self.temp_dir) | |
logger.info("β Cleanup completed") | |
except Exception as e: | |
logger.warning(f"β οΈ Cleanup warning: {e}") | |
def main(): | |
"""Main function to run the file handling debug tests.""" | |
print("π§ File Handling Debug Test - Phase 2 Emergency Recovery") | |
print("=" * 60) | |
debugger = FileHandlingDebugger() | |
debugger.run_all_tests() | |
print("\nβ File handling debug tests completed!") | |
print("Check the logs above for detailed results.") | |
if __name__ == "__main__": | |
main() |