gaia-enhanced-agent / tests /test_file_handler.py
GAIA Agent Deployment
Deploy Complete Enhanced GAIA Agent with Phase 1-6 Improvements
9a6a4dc
"""
Comprehensive Test Suite for Enhanced File Handler
Tests all aspects of file handling including:
- File type detection
- Path resolution
- Base64 decoding
- File validation
- Metadata extraction
- Error handling
"""
import os
import tempfile
import base64
import json
import pytest
from pathlib import Path
from unittest.mock import patch, mock_open
# Import the file handler
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils.file_handler import (
EnhancedFileHandler,
FileType,
FileFormat,
FileInfo,
ProcessedFile,
get_file_handler,
process_file,
validate_file_exists,
get_file_type,
cleanup_temp_files
)
class TestFileTypeDetection:
"""Test file type and format detection."""
def test_image_detection(self):
"""Test image file type detection."""
handler = EnhancedFileHandler()
test_cases = [
("test.png", FileType.IMAGE, FileFormat.PNG),
("test.jpg", FileType.IMAGE, FileFormat.JPG),
("test.jpeg", FileType.IMAGE, FileFormat.JPEG),
("test.gif", FileType.IMAGE, FileFormat.GIF),
("test.bmp", FileType.IMAGE, FileFormat.BMP),
("test.webp", FileType.IMAGE, FileFormat.WEBP),
]
for filename, expected_type, expected_format in test_cases:
file_type, file_format = handler.detect_file_type(filename)
assert file_type == expected_type
assert file_format == expected_format
def test_audio_detection(self):
"""Test audio file type detection."""
handler = EnhancedFileHandler()
test_cases = [
("test.mp3", FileType.AUDIO, FileFormat.MP3),
("test.wav", FileType.AUDIO, FileFormat.WAV),
("test.m4a", FileType.AUDIO, FileFormat.M4A),
("test.flac", FileType.AUDIO, FileFormat.FLAC),
("test.ogg", FileType.AUDIO, FileFormat.OGG),
]
for filename, expected_type, expected_format in test_cases:
file_type, file_format = handler.detect_file_type(filename)
assert file_type == expected_type
assert file_format == expected_format
def test_document_detection(self):
"""Test document file type detection."""
handler = EnhancedFileHandler()
test_cases = [
("test.pdf", FileType.DOCUMENT, FileFormat.PDF),
("test.docx", FileType.DOCUMENT, FileFormat.DOCX),
("test.doc", FileType.DOCUMENT, FileFormat.DOC),
("test.txt", FileType.DOCUMENT, FileFormat.TXT),
("test.rtf", FileType.DOCUMENT, FileFormat.RTF),
]
for filename, expected_type, expected_format in test_cases:
file_type, file_format = handler.detect_file_type(filename)
assert file_type == expected_type
assert file_format == expected_format
def test_data_detection(self):
"""Test data file type detection."""
handler = EnhancedFileHandler()
test_cases = [
("test.csv", FileType.DATA, FileFormat.CSV),
("test.xlsx", FileType.DATA, FileFormat.XLSX),
("test.xls", FileType.DATA, FileFormat.XLS),
("test.json", FileType.DATA, FileFormat.JSON),
("test.xml", FileType.DATA, FileFormat.XML),
]
for filename, expected_type, expected_format in test_cases:
file_type, file_format = handler.detect_file_type(filename)
assert file_type == expected_type
assert file_format == expected_format
def test_code_detection(self):
"""Test code file type detection."""
handler = EnhancedFileHandler()
test_cases = [
("test.py", FileType.CODE, FileFormat.PY),
("test.js", FileType.CODE, FileFormat.JS),
("test.html", FileType.CODE, FileFormat.HTML),
("test.css", FileType.CODE, FileFormat.CSS),
]
for filename, expected_type, expected_format in test_cases:
file_type, file_format = handler.detect_file_type(filename)
assert file_type == expected_type
assert file_format == expected_format
def test_unknown_detection(self):
"""Test unknown file type detection."""
handler = EnhancedFileHandler()
file_type, file_format = handler.detect_file_type("test.unknown")
assert file_type == FileType.UNKNOWN
assert file_format == FileFormat.UNKNOWN
class TestPathResolution:
"""Test file path resolution."""
def setup_method(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
self.test_file = os.path.join(self.temp_dir, "test.txt")
# Create test file
with open(self.test_file, 'w') as f:
f.write("Test content")
def teardown_method(self):
"""Clean up test environment."""
if os.path.exists(self.test_file):
os.unlink(self.test_file)
os.rmdir(self.temp_dir)
def test_absolute_path_resolution(self):
"""Test absolute path resolution."""
handler = EnhancedFileHandler()
# Test existing absolute path
resolved = handler.resolve_file_path(self.test_file)
assert resolved == os.path.abspath(self.test_file)
# Test non-existing absolute path
non_existing = "/non/existing/path.txt"
resolved = handler.resolve_file_path(non_existing)
assert resolved is None
def test_relative_path_resolution(self):
"""Test relative path resolution."""
handler = EnhancedFileHandler(base_paths=[self.temp_dir])
# Test existing relative path
relative_path = "test.txt"
resolved = handler.resolve_file_path(relative_path)
assert resolved == os.path.abspath(self.test_file)
# Test non-existing relative path
non_existing = "non_existing.txt"
resolved = handler.resolve_file_path(non_existing)
assert resolved is None
def test_current_directory_variations(self):
"""Test current directory path variations."""
handler = EnhancedFileHandler()
# Create test file in current directory
current_test_file = "current_test.txt"
with open(current_test_file, 'w') as f:
f.write("Test")
try:
# Test various current directory formats
variations = [
current_test_file,
f"./{current_test_file}",
]
for variation in variations:
resolved = handler.resolve_file_path(variation)
assert resolved is not None
assert os.path.exists(resolved)
finally:
if os.path.exists(current_test_file):
os.unlink(current_test_file)
class TestBase64Handling:
"""Test base64 content handling."""
def test_base64_detection(self):
"""Test base64 content detection."""
handler = EnhancedFileHandler()
# Test data URL format
data_url = "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8/5+hHgAHggJ/PchI7wAAAABJRU5ErkJggg=="
assert handler.is_base64_encoded(data_url)
# Test plain base64
plain_b64 = "SGVsbG8gV29ybGQ=" # "Hello World" in base64
assert handler.is_base64_encoded(plain_b64)
# Test non-base64
regular_text = "This is not base64"
assert not handler.is_base64_encoded(regular_text)
def test_base64_decoding(self):
"""Test base64 content decoding."""
handler = EnhancedFileHandler()
# Test data URL decoding
data_url = "data:text/plain;base64,SGVsbG8gV29ybGQ="
decoded_bytes, mime_type = handler.decode_base64_file(data_url)
assert decoded_bytes == b"Hello World"
assert mime_type == "text/plain"
# Test plain base64 decoding
plain_b64 = "SGVsbG8gV29ybGQ="
decoded_bytes, mime_type = handler.decode_base64_file(plain_b64)
assert decoded_bytes == b"Hello World"
assert mime_type is None
def test_invalid_base64_handling(self):
"""Test handling of invalid base64 content."""
handler = EnhancedFileHandler()
invalid_b64 = "This is not valid base64!"
# Invalid base64 should be processed as a file path and fail gracefully
processed = handler.process_file_input(invalid_b64)
# Should fail to find the file but not raise an exception
assert not processed.info.exists
assert processed.info.error is not None
assert "Could not resolve file path" in processed.info.error
class TestFileValidation:
"""Test file validation functionality."""
def setup_method(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
self.test_file = os.path.join(self.temp_dir, "test.txt")
# Create test file
with open(self.test_file, 'w') as f:
f.write("Test content")
def teardown_method(self):
"""Clean up test environment."""
if os.path.exists(self.test_file):
os.unlink(self.test_file)
os.rmdir(self.temp_dir)
def test_valid_file_validation(self):
"""Test validation of valid files."""
handler = EnhancedFileHandler()
is_valid, error = handler.validate_file(self.test_file)
assert is_valid
assert error is None
def test_non_existing_file_validation(self):
"""Test validation of non-existing files."""
handler = EnhancedFileHandler()
non_existing = "/non/existing/file.txt"
is_valid, error = handler.validate_file(non_existing)
assert not is_valid
assert "does not exist" in error
def test_directory_validation(self):
"""Test validation of directories (should fail)."""
handler = EnhancedFileHandler()
is_valid, error = handler.validate_file(self.temp_dir)
assert not is_valid
assert "not a file" in error
def test_empty_file_validation(self):
"""Test validation of empty files."""
handler = EnhancedFileHandler()
empty_file = os.path.join(self.temp_dir, "empty.txt")
with open(empty_file, 'w') as f:
pass # Create empty file
try:
is_valid, error = handler.validate_file(empty_file)
assert not is_valid
assert "empty" in error
finally:
os.unlink(empty_file)
class TestFileProcessing:
"""Test complete file processing workflow."""
def setup_method(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
self.test_file = os.path.join(self.temp_dir, "test.txt")
# Create test file
with open(self.test_file, 'w') as f:
f.write("Test content for processing")
def teardown_method(self):
"""Clean up test environment."""
if os.path.exists(self.test_file):
os.unlink(self.test_file)
os.rmdir(self.temp_dir)
# Clean up any temp files
cleanup_temp_files()
def test_file_path_processing(self):
"""Test processing file by path."""
handler = EnhancedFileHandler(base_paths=[self.temp_dir])
# Test absolute path
processed = handler.process_file_input(self.test_file)
assert processed.info.exists
assert processed.info.error is None
assert processed.info.file_type == FileType.DOCUMENT
assert processed.info.file_format == FileFormat.TXT
assert processed.content == b"Test content for processing"
assert not processed.cleanup_required
# Test relative path
processed = handler.process_file_input("test.txt")
assert processed.info.exists
assert processed.info.error is None
assert processed.content == b"Test content for processing"
def test_base64_processing(self):
"""Test processing base64 content."""
handler = EnhancedFileHandler()
# Create base64 content
test_content = "Hello World from base64"
b64_content = base64.b64encode(test_content.encode()).decode()
data_url = f"data:text/plain;base64,{b64_content}"
processed = handler.process_file_input(data_url)
assert processed.info.exists
assert processed.info.is_base64
assert processed.info.error is None
assert processed.info.mime_type == "text/plain"
assert processed.content == test_content.encode()
assert processed.cleanup_required
assert processed.temp_path is not None
def test_bytes_processing(self):
"""Test processing raw bytes content."""
handler = EnhancedFileHandler()
test_bytes = b"Raw bytes content"
processed = handler.process_file_input(test_bytes)
assert processed.info.exists
assert processed.info.error is None
assert processed.content == test_bytes
assert processed.cleanup_required
assert processed.temp_path is not None
def test_invalid_input_processing(self):
"""Test processing invalid inputs."""
handler = EnhancedFileHandler()
# Test non-existing file
processed = handler.process_file_input("/non/existing/file.txt")
assert not processed.info.exists
assert processed.info.error is not None
assert "Could not resolve" in processed.info.error
# Test invalid type
processed = handler.process_file_input(123)
assert not processed.info.exists
assert processed.info.error is not None
assert "Unsupported file input type" in processed.info.error
class TestMetadataExtraction:
"""Test file metadata extraction."""
def setup_method(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
self.test_file = os.path.join(self.temp_dir, "test.txt")
# Create test file
with open(self.test_file, 'w') as f:
f.write("Test content for metadata")
def teardown_method(self):
"""Clean up test environment."""
if os.path.exists(self.test_file):
os.unlink(self.test_file)
os.rmdir(self.temp_dir)
def test_basic_metadata_extraction(self):
"""Test basic file metadata extraction."""
handler = EnhancedFileHandler()
metadata = handler.get_file_metadata(self.test_file)
assert 'size_bytes' in metadata
assert 'created_time' in metadata
assert 'modified_time' in metadata
assert 'permissions' in metadata
assert 'content_hash' in metadata
assert metadata['size_bytes'] > 0
assert len(metadata['content_hash']) == 32 # MD5 hash length
def test_non_existing_file_metadata(self):
"""Test metadata extraction for non-existing file."""
handler = EnhancedFileHandler()
metadata = handler.get_file_metadata("/non/existing/file.txt")
assert metadata == {}
class TestConvenienceFunctions:
"""Test convenience functions."""
def setup_method(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
self.test_file = os.path.join(self.temp_dir, "test.txt")
# Create test file
with open(self.test_file, 'w') as f:
f.write("Test content")
def teardown_method(self):
"""Clean up test environment."""
if os.path.exists(self.test_file):
os.unlink(self.test_file)
os.rmdir(self.temp_dir)
cleanup_temp_files()
def test_process_file_function(self):
"""Test process_file convenience function."""
processed = process_file(self.test_file)
assert processed.info.exists
assert processed.info.error is None
assert processed.content == b"Test content"
def test_validate_file_exists_function(self):
"""Test validate_file_exists convenience function."""
# Test existing file
assert validate_file_exists(self.test_file)
# Test non-existing file
assert not validate_file_exists("/non/existing/file.txt")
def test_get_file_type_function(self):
"""Test get_file_type convenience function."""
file_type, file_format = get_file_type("test.png")
assert file_type == FileType.IMAGE
assert file_format == FileFormat.PNG
def test_cleanup_temp_files_function(self):
"""Test cleanup_temp_files convenience function."""
# Create some temp files through processing
test_bytes = b"Temporary content"
processed = process_file(test_bytes)
assert processed.temp_path is not None
assert os.path.exists(processed.temp_path)
# Clean up
cleanup_temp_files()
# Verify cleanup
assert not os.path.exists(processed.temp_path)
class TestErrorHandling:
"""Test error handling scenarios."""
def test_permission_denied_handling(self):
"""Test handling of permission denied errors."""
handler = EnhancedFileHandler()
# This test might not work on all systems
# We'll mock the permission check
with patch('os.access', return_value=False):
with patch('os.path.exists', return_value=True):
with patch('os.path.isfile', return_value=True):
is_valid, error = handler.validate_file("/some/file.txt")
assert not is_valid
assert "not readable" in error
def test_corrupted_file_handling(self):
"""Test handling of corrupted files."""
handler = EnhancedFileHandler()
# Create a file that looks like an image but isn't
temp_dir = tempfile.mkdtemp()
fake_image = os.path.join(temp_dir, "fake.png")
try:
with open(fake_image, 'w') as f:
f.write("This is not a real PNG file")
# This should detect the corruption during validation
is_valid, error = handler.validate_file(fake_image)
# The validation might pass basic checks but fail on image verification
# depending on PIL availability
finally:
if os.path.exists(fake_image):
os.unlink(fake_image)
os.rmdir(temp_dir)
def test_exception_handling_in_processing(self):
"""Test exception handling during file processing."""
handler = EnhancedFileHandler()
# Test with malformed input that should trigger exceptions
with patch('builtins.open', side_effect=IOError("Mocked IO error")):
processed = handler.process_file_input("some_file.txt")
assert not processed.info.exists
assert processed.info.error is not None
class TestIntegration:
"""Integration tests for complete workflows."""
def test_complete_image_workflow(self):
"""Test complete image processing workflow."""
handler = EnhancedFileHandler()
# Create a simple test image (1x1 pixel PNG)
image_data = base64.b64decode(
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8/5+hHgAHggJ/PchI7wAAAABJRU5ErkJggg=="
)
processed = handler.process_file_input(image_data)
assert processed.info.exists
# The file type detection from bytes content may not work perfectly
# Just check that it processes without error
assert processed.info.exists
assert processed.content == image_data
assert processed.cleanup_required
# Clean up
handler.cleanup_temp_files()
def test_complete_text_workflow(self):
"""Test complete text file processing workflow."""
# Create temporary text file
temp_dir = tempfile.mkdtemp()
text_file = os.path.join(temp_dir, "sample.txt")
try:
with open(text_file, 'w') as f:
f.write("Sample text content for testing")
handler = EnhancedFileHandler(base_paths=[temp_dir])
# Test by absolute path
processed = handler.process_file_input(text_file)
assert processed.info.exists
assert processed.info.file_type == FileType.DOCUMENT
assert processed.info.file_format == FileFormat.TXT
assert b"Sample text content" in processed.content
assert not processed.cleanup_required
# Test by relative path
processed = handler.process_file_input("sample.txt")
assert processed.info.exists
assert processed.content == b"Sample text content for testing"
finally:
if os.path.exists(text_file):
os.unlink(text_file)
os.rmdir(temp_dir)
if __name__ == "__main__":
# Run tests
pytest.main([__file__, "-v"])