Spaces:
Running
Running
""" | |
Comprehensive Test Suite for Enhanced File Handler | |
Tests all aspects of file handling including: | |
- File type detection | |
- Path resolution | |
- Base64 decoding | |
- File validation | |
- Metadata extraction | |
- Error handling | |
""" | |
import os | |
import tempfile | |
import base64 | |
import json | |
import pytest | |
from pathlib import Path | |
from unittest.mock import patch, mock_open | |
# Import the file handler | |
import sys | |
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
from utils.file_handler import ( | |
EnhancedFileHandler, | |
FileType, | |
FileFormat, | |
FileInfo, | |
ProcessedFile, | |
get_file_handler, | |
process_file, | |
validate_file_exists, | |
get_file_type, | |
cleanup_temp_files | |
) | |
class TestFileTypeDetection: | |
"""Test file type and format detection.""" | |
def test_image_detection(self): | |
"""Test image file type detection.""" | |
handler = EnhancedFileHandler() | |
test_cases = [ | |
("test.png", FileType.IMAGE, FileFormat.PNG), | |
("test.jpg", FileType.IMAGE, FileFormat.JPG), | |
("test.jpeg", FileType.IMAGE, FileFormat.JPEG), | |
("test.gif", FileType.IMAGE, FileFormat.GIF), | |
("test.bmp", FileType.IMAGE, FileFormat.BMP), | |
("test.webp", FileType.IMAGE, FileFormat.WEBP), | |
] | |
for filename, expected_type, expected_format in test_cases: | |
file_type, file_format = handler.detect_file_type(filename) | |
assert file_type == expected_type | |
assert file_format == expected_format | |
def test_audio_detection(self): | |
"""Test audio file type detection.""" | |
handler = EnhancedFileHandler() | |
test_cases = [ | |
("test.mp3", FileType.AUDIO, FileFormat.MP3), | |
("test.wav", FileType.AUDIO, FileFormat.WAV), | |
("test.m4a", FileType.AUDIO, FileFormat.M4A), | |
("test.flac", FileType.AUDIO, FileFormat.FLAC), | |
("test.ogg", FileType.AUDIO, FileFormat.OGG), | |
] | |
for filename, expected_type, expected_format in test_cases: | |
file_type, file_format = handler.detect_file_type(filename) | |
assert file_type == expected_type | |
assert file_format == expected_format | |
def test_document_detection(self): | |
"""Test document file type detection.""" | |
handler = EnhancedFileHandler() | |
test_cases = [ | |
("test.pdf", FileType.DOCUMENT, FileFormat.PDF), | |
("test.docx", FileType.DOCUMENT, FileFormat.DOCX), | |
("test.doc", FileType.DOCUMENT, FileFormat.DOC), | |
("test.txt", FileType.DOCUMENT, FileFormat.TXT), | |
("test.rtf", FileType.DOCUMENT, FileFormat.RTF), | |
] | |
for filename, expected_type, expected_format in test_cases: | |
file_type, file_format = handler.detect_file_type(filename) | |
assert file_type == expected_type | |
assert file_format == expected_format | |
def test_data_detection(self): | |
"""Test data file type detection.""" | |
handler = EnhancedFileHandler() | |
test_cases = [ | |
("test.csv", FileType.DATA, FileFormat.CSV), | |
("test.xlsx", FileType.DATA, FileFormat.XLSX), | |
("test.xls", FileType.DATA, FileFormat.XLS), | |
("test.json", FileType.DATA, FileFormat.JSON), | |
("test.xml", FileType.DATA, FileFormat.XML), | |
] | |
for filename, expected_type, expected_format in test_cases: | |
file_type, file_format = handler.detect_file_type(filename) | |
assert file_type == expected_type | |
assert file_format == expected_format | |
def test_code_detection(self): | |
"""Test code file type detection.""" | |
handler = EnhancedFileHandler() | |
test_cases = [ | |
("test.py", FileType.CODE, FileFormat.PY), | |
("test.js", FileType.CODE, FileFormat.JS), | |
("test.html", FileType.CODE, FileFormat.HTML), | |
("test.css", FileType.CODE, FileFormat.CSS), | |
] | |
for filename, expected_type, expected_format in test_cases: | |
file_type, file_format = handler.detect_file_type(filename) | |
assert file_type == expected_type | |
assert file_format == expected_format | |
def test_unknown_detection(self): | |
"""Test unknown file type detection.""" | |
handler = EnhancedFileHandler() | |
file_type, file_format = handler.detect_file_type("test.unknown") | |
assert file_type == FileType.UNKNOWN | |
assert file_format == FileFormat.UNKNOWN | |
class TestPathResolution: | |
"""Test file path resolution.""" | |
def setup_method(self): | |
"""Set up test environment.""" | |
self.temp_dir = tempfile.mkdtemp() | |
self.test_file = os.path.join(self.temp_dir, "test.txt") | |
# Create test file | |
with open(self.test_file, 'w') as f: | |
f.write("Test content") | |
def teardown_method(self): | |
"""Clean up test environment.""" | |
if os.path.exists(self.test_file): | |
os.unlink(self.test_file) | |
os.rmdir(self.temp_dir) | |
def test_absolute_path_resolution(self): | |
"""Test absolute path resolution.""" | |
handler = EnhancedFileHandler() | |
# Test existing absolute path | |
resolved = handler.resolve_file_path(self.test_file) | |
assert resolved == os.path.abspath(self.test_file) | |
# Test non-existing absolute path | |
non_existing = "/non/existing/path.txt" | |
resolved = handler.resolve_file_path(non_existing) | |
assert resolved is None | |
def test_relative_path_resolution(self): | |
"""Test relative path resolution.""" | |
handler = EnhancedFileHandler(base_paths=[self.temp_dir]) | |
# Test existing relative path | |
relative_path = "test.txt" | |
resolved = handler.resolve_file_path(relative_path) | |
assert resolved == os.path.abspath(self.test_file) | |
# Test non-existing relative path | |
non_existing = "non_existing.txt" | |
resolved = handler.resolve_file_path(non_existing) | |
assert resolved is None | |
def test_current_directory_variations(self): | |
"""Test current directory path variations.""" | |
handler = EnhancedFileHandler() | |
# Create test file in current directory | |
current_test_file = "current_test.txt" | |
with open(current_test_file, 'w') as f: | |
f.write("Test") | |
try: | |
# Test various current directory formats | |
variations = [ | |
current_test_file, | |
f"./{current_test_file}", | |
] | |
for variation in variations: | |
resolved = handler.resolve_file_path(variation) | |
assert resolved is not None | |
assert os.path.exists(resolved) | |
finally: | |
if os.path.exists(current_test_file): | |
os.unlink(current_test_file) | |
class TestBase64Handling: | |
"""Test base64 content handling.""" | |
def test_base64_detection(self): | |
"""Test base64 content detection.""" | |
handler = EnhancedFileHandler() | |
# Test data URL format | |
data_url = "" | |
assert handler.is_base64_encoded(data_url) | |
# Test plain base64 | |
plain_b64 = "SGVsbG8gV29ybGQ=" # "Hello World" in base64 | |
assert handler.is_base64_encoded(plain_b64) | |
# Test non-base64 | |
regular_text = "This is not base64" | |
assert not handler.is_base64_encoded(regular_text) | |
def test_base64_decoding(self): | |
"""Test base64 content decoding.""" | |
handler = EnhancedFileHandler() | |
# Test data URL decoding | |
data_url = "data:text/plain;base64,SGVsbG8gV29ybGQ=" | |
decoded_bytes, mime_type = handler.decode_base64_file(data_url) | |
assert decoded_bytes == b"Hello World" | |
assert mime_type == "text/plain" | |
# Test plain base64 decoding | |
plain_b64 = "SGVsbG8gV29ybGQ=" | |
decoded_bytes, mime_type = handler.decode_base64_file(plain_b64) | |
assert decoded_bytes == b"Hello World" | |
assert mime_type is None | |
def test_invalid_base64_handling(self): | |
"""Test handling of invalid base64 content.""" | |
handler = EnhancedFileHandler() | |
invalid_b64 = "This is not valid base64!" | |
# Invalid base64 should be processed as a file path and fail gracefully | |
processed = handler.process_file_input(invalid_b64) | |
# Should fail to find the file but not raise an exception | |
assert not processed.info.exists | |
assert processed.info.error is not None | |
assert "Could not resolve file path" in processed.info.error | |
class TestFileValidation: | |
"""Test file validation functionality.""" | |
def setup_method(self): | |
"""Set up test environment.""" | |
self.temp_dir = tempfile.mkdtemp() | |
self.test_file = os.path.join(self.temp_dir, "test.txt") | |
# Create test file | |
with open(self.test_file, 'w') as f: | |
f.write("Test content") | |
def teardown_method(self): | |
"""Clean up test environment.""" | |
if os.path.exists(self.test_file): | |
os.unlink(self.test_file) | |
os.rmdir(self.temp_dir) | |
def test_valid_file_validation(self): | |
"""Test validation of valid files.""" | |
handler = EnhancedFileHandler() | |
is_valid, error = handler.validate_file(self.test_file) | |
assert is_valid | |
assert error is None | |
def test_non_existing_file_validation(self): | |
"""Test validation of non-existing files.""" | |
handler = EnhancedFileHandler() | |
non_existing = "/non/existing/file.txt" | |
is_valid, error = handler.validate_file(non_existing) | |
assert not is_valid | |
assert "does not exist" in error | |
def test_directory_validation(self): | |
"""Test validation of directories (should fail).""" | |
handler = EnhancedFileHandler() | |
is_valid, error = handler.validate_file(self.temp_dir) | |
assert not is_valid | |
assert "not a file" in error | |
def test_empty_file_validation(self): | |
"""Test validation of empty files.""" | |
handler = EnhancedFileHandler() | |
empty_file = os.path.join(self.temp_dir, "empty.txt") | |
with open(empty_file, 'w') as f: | |
pass # Create empty file | |
try: | |
is_valid, error = handler.validate_file(empty_file) | |
assert not is_valid | |
assert "empty" in error | |
finally: | |
os.unlink(empty_file) | |
class TestFileProcessing: | |
"""Test complete file processing workflow.""" | |
def setup_method(self): | |
"""Set up test environment.""" | |
self.temp_dir = tempfile.mkdtemp() | |
self.test_file = os.path.join(self.temp_dir, "test.txt") | |
# Create test file | |
with open(self.test_file, 'w') as f: | |
f.write("Test content for processing") | |
def teardown_method(self): | |
"""Clean up test environment.""" | |
if os.path.exists(self.test_file): | |
os.unlink(self.test_file) | |
os.rmdir(self.temp_dir) | |
# Clean up any temp files | |
cleanup_temp_files() | |
def test_file_path_processing(self): | |
"""Test processing file by path.""" | |
handler = EnhancedFileHandler(base_paths=[self.temp_dir]) | |
# Test absolute path | |
processed = handler.process_file_input(self.test_file) | |
assert processed.info.exists | |
assert processed.info.error is None | |
assert processed.info.file_type == FileType.DOCUMENT | |
assert processed.info.file_format == FileFormat.TXT | |
assert processed.content == b"Test content for processing" | |
assert not processed.cleanup_required | |
# Test relative path | |
processed = handler.process_file_input("test.txt") | |
assert processed.info.exists | |
assert processed.info.error is None | |
assert processed.content == b"Test content for processing" | |
def test_base64_processing(self): | |
"""Test processing base64 content.""" | |
handler = EnhancedFileHandler() | |
# Create base64 content | |
test_content = "Hello World from base64" | |
b64_content = base64.b64encode(test_content.encode()).decode() | |
data_url = f"data:text/plain;base64,{b64_content}" | |
processed = handler.process_file_input(data_url) | |
assert processed.info.exists | |
assert processed.info.is_base64 | |
assert processed.info.error is None | |
assert processed.info.mime_type == "text/plain" | |
assert processed.content == test_content.encode() | |
assert processed.cleanup_required | |
assert processed.temp_path is not None | |
def test_bytes_processing(self): | |
"""Test processing raw bytes content.""" | |
handler = EnhancedFileHandler() | |
test_bytes = b"Raw bytes content" | |
processed = handler.process_file_input(test_bytes) | |
assert processed.info.exists | |
assert processed.info.error is None | |
assert processed.content == test_bytes | |
assert processed.cleanup_required | |
assert processed.temp_path is not None | |
def test_invalid_input_processing(self): | |
"""Test processing invalid inputs.""" | |
handler = EnhancedFileHandler() | |
# Test non-existing file | |
processed = handler.process_file_input("/non/existing/file.txt") | |
assert not processed.info.exists | |
assert processed.info.error is not None | |
assert "Could not resolve" in processed.info.error | |
# Test invalid type | |
processed = handler.process_file_input(123) | |
assert not processed.info.exists | |
assert processed.info.error is not None | |
assert "Unsupported file input type" in processed.info.error | |
class TestMetadataExtraction: | |
"""Test file metadata extraction.""" | |
def setup_method(self): | |
"""Set up test environment.""" | |
self.temp_dir = tempfile.mkdtemp() | |
self.test_file = os.path.join(self.temp_dir, "test.txt") | |
# Create test file | |
with open(self.test_file, 'w') as f: | |
f.write("Test content for metadata") | |
def teardown_method(self): | |
"""Clean up test environment.""" | |
if os.path.exists(self.test_file): | |
os.unlink(self.test_file) | |
os.rmdir(self.temp_dir) | |
def test_basic_metadata_extraction(self): | |
"""Test basic file metadata extraction.""" | |
handler = EnhancedFileHandler() | |
metadata = handler.get_file_metadata(self.test_file) | |
assert 'size_bytes' in metadata | |
assert 'created_time' in metadata | |
assert 'modified_time' in metadata | |
assert 'permissions' in metadata | |
assert 'content_hash' in metadata | |
assert metadata['size_bytes'] > 0 | |
assert len(metadata['content_hash']) == 32 # MD5 hash length | |
def test_non_existing_file_metadata(self): | |
"""Test metadata extraction for non-existing file.""" | |
handler = EnhancedFileHandler() | |
metadata = handler.get_file_metadata("/non/existing/file.txt") | |
assert metadata == {} | |
class TestConvenienceFunctions: | |
"""Test convenience functions.""" | |
def setup_method(self): | |
"""Set up test environment.""" | |
self.temp_dir = tempfile.mkdtemp() | |
self.test_file = os.path.join(self.temp_dir, "test.txt") | |
# Create test file | |
with open(self.test_file, 'w') as f: | |
f.write("Test content") | |
def teardown_method(self): | |
"""Clean up test environment.""" | |
if os.path.exists(self.test_file): | |
os.unlink(self.test_file) | |
os.rmdir(self.temp_dir) | |
cleanup_temp_files() | |
def test_process_file_function(self): | |
"""Test process_file convenience function.""" | |
processed = process_file(self.test_file) | |
assert processed.info.exists | |
assert processed.info.error is None | |
assert processed.content == b"Test content" | |
def test_validate_file_exists_function(self): | |
"""Test validate_file_exists convenience function.""" | |
# Test existing file | |
assert validate_file_exists(self.test_file) | |
# Test non-existing file | |
assert not validate_file_exists("/non/existing/file.txt") | |
def test_get_file_type_function(self): | |
"""Test get_file_type convenience function.""" | |
file_type, file_format = get_file_type("test.png") | |
assert file_type == FileType.IMAGE | |
assert file_format == FileFormat.PNG | |
def test_cleanup_temp_files_function(self): | |
"""Test cleanup_temp_files convenience function.""" | |
# Create some temp files through processing | |
test_bytes = b"Temporary content" | |
processed = process_file(test_bytes) | |
assert processed.temp_path is not None | |
assert os.path.exists(processed.temp_path) | |
# Clean up | |
cleanup_temp_files() | |
# Verify cleanup | |
assert not os.path.exists(processed.temp_path) | |
class TestErrorHandling: | |
"""Test error handling scenarios.""" | |
def test_permission_denied_handling(self): | |
"""Test handling of permission denied errors.""" | |
handler = EnhancedFileHandler() | |
# This test might not work on all systems | |
# We'll mock the permission check | |
with patch('os.access', return_value=False): | |
with patch('os.path.exists', return_value=True): | |
with patch('os.path.isfile', return_value=True): | |
is_valid, error = handler.validate_file("/some/file.txt") | |
assert not is_valid | |
assert "not readable" in error | |
def test_corrupted_file_handling(self): | |
"""Test handling of corrupted files.""" | |
handler = EnhancedFileHandler() | |
# Create a file that looks like an image but isn't | |
temp_dir = tempfile.mkdtemp() | |
fake_image = os.path.join(temp_dir, "fake.png") | |
try: | |
with open(fake_image, 'w') as f: | |
f.write("This is not a real PNG file") | |
# This should detect the corruption during validation | |
is_valid, error = handler.validate_file(fake_image) | |
# The validation might pass basic checks but fail on image verification | |
# depending on PIL availability | |
finally: | |
if os.path.exists(fake_image): | |
os.unlink(fake_image) | |
os.rmdir(temp_dir) | |
def test_exception_handling_in_processing(self): | |
"""Test exception handling during file processing.""" | |
handler = EnhancedFileHandler() | |
# Test with malformed input that should trigger exceptions | |
with patch('builtins.open', side_effect=IOError("Mocked IO error")): | |
processed = handler.process_file_input("some_file.txt") | |
assert not processed.info.exists | |
assert processed.info.error is not None | |
class TestIntegration: | |
"""Integration tests for complete workflows.""" | |
def test_complete_image_workflow(self): | |
"""Test complete image processing workflow.""" | |
handler = EnhancedFileHandler() | |
# Create a simple test image (1x1 pixel PNG) | |
image_data = base64.b64decode( | |
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8/5+hHgAHggJ/PchI7wAAAABJRU5ErkJggg==" | |
) | |
processed = handler.process_file_input(image_data) | |
assert processed.info.exists | |
# The file type detection from bytes content may not work perfectly | |
# Just check that it processes without error | |
assert processed.info.exists | |
assert processed.content == image_data | |
assert processed.cleanup_required | |
# Clean up | |
handler.cleanup_temp_files() | |
def test_complete_text_workflow(self): | |
"""Test complete text file processing workflow.""" | |
# Create temporary text file | |
temp_dir = tempfile.mkdtemp() | |
text_file = os.path.join(temp_dir, "sample.txt") | |
try: | |
with open(text_file, 'w') as f: | |
f.write("Sample text content for testing") | |
handler = EnhancedFileHandler(base_paths=[temp_dir]) | |
# Test by absolute path | |
processed = handler.process_file_input(text_file) | |
assert processed.info.exists | |
assert processed.info.file_type == FileType.DOCUMENT | |
assert processed.info.file_format == FileFormat.TXT | |
assert b"Sample text content" in processed.content | |
assert not processed.cleanup_required | |
# Test by relative path | |
processed = handler.process_file_input("sample.txt") | |
assert processed.info.exists | |
assert processed.content == b"Sample text content for testing" | |
finally: | |
if os.path.exists(text_file): | |
os.unlink(text_file) | |
os.rmdir(temp_dir) | |
if __name__ == "__main__": | |
# Run tests | |
pytest.main([__file__, "-v"]) |