Spaces:
Running
Running
""" | |
Enhanced OCR Engine for GAIA Agent - Phase 6 | |
Handles multi-orientation text recognition, rotated/distorted text, and advanced OCR | |
""" | |
import logging | |
import numpy as np | |
from typing import Dict, Any, List, Optional, Tuple | |
from pathlib import Path | |
import tempfile | |
import os | |
# Image processing | |
try: | |
from PIL import Image, ImageEnhance, ImageFilter, ImageOps | |
PIL_AVAILABLE = True | |
except ImportError: | |
PIL_AVAILABLE = False | |
# OCR engine | |
try: | |
import pytesseract | |
PYTESSERACT_AVAILABLE = True | |
except ImportError: | |
PYTESSERACT_AVAILABLE = False | |
# Computer vision for advanced processing | |
try: | |
import cv2 | |
CV2_AVAILABLE = True | |
except ImportError: | |
CV2_AVAILABLE = False | |
logger = logging.getLogger(__name__) | |
class EnhancedOCREngine: | |
""" | |
Enhanced OCR engine for complex text recognition scenarios. | |
Features: | |
- Multi-orientation text recognition (0°, 90°, 180°, 270°) | |
- Rotated and distorted text handling | |
- Multi-language OCR support | |
- Text quality enhancement and preprocessing | |
- Confidence scoring for OCR results | |
- Advanced text extraction from complex layouts | |
""" | |
def __init__(self): | |
"""Initialize the enhanced OCR engine.""" | |
self.name = "enhanced_ocr_engine" | |
self.description = "Enhanced OCR for multi-orientation text, rotated/distorted text, and complex layouts" | |
# Check dependencies | |
self.available = PIL_AVAILABLE and PYTESSERACT_AVAILABLE | |
if not self.available: | |
missing = [] | |
if not PIL_AVAILABLE: | |
missing.append("PIL/Pillow") | |
if not PYTESSERACT_AVAILABLE: | |
missing.append("pytesseract") | |
logger.warning(f"⚠️ Enhanced OCR Engine not available - missing: {', '.join(missing)}") | |
return | |
# Test tesseract installation | |
try: | |
pytesseract.get_tesseract_version() | |
logger.info("✅ Tesseract OCR engine detected") | |
except Exception as e: | |
logger.warning(f"⚠️ Tesseract not properly installed: {e}") | |
self.available = False | |
return | |
# OCR configurations for different scenarios | |
self.ocr_configs = { | |
'default': '--oem 3 --psm 6', | |
'single_line': '--oem 3 --psm 8', | |
'single_word': '--oem 3 --psm 7', | |
'sparse_text': '--oem 3 --psm 11', | |
'single_char': '--oem 3 --psm 10', | |
'vertical_text': '--oem 3 --psm 5', | |
'uniform_block': '--oem 3 --psm 6' | |
} | |
# Supported orientations | |
self.orientations = [0, 90, 180, 270] | |
# Language codes for multi-language support | |
self.supported_languages = [ | |
'eng', 'ara', 'chi_sim', 'chi_tra', 'fra', 'deu', 'spa', 'rus', | |
'jpn', 'kor', 'hin', 'tha', 'vie', 'heb', 'tur', 'pol', 'nld', | |
'ita', 'por', 'swe', 'dan', 'nor', 'fin', 'ces', 'hun', 'ron' | |
] | |
logger.info("✅ Enhanced OCR Engine initialized") | |
def preprocess_image(self, image: Image.Image, enhancement_level: str = 'medium') -> Image.Image: | |
""" | |
Preprocess image for better OCR results. | |
Args: | |
image: PIL Image object | |
enhancement_level: 'light', 'medium', 'heavy' | |
Returns: | |
Preprocessed PIL Image | |
""" | |
if not isinstance(image, Image.Image): | |
return image | |
try: | |
# Convert to RGB if necessary | |
if image.mode != 'RGB': | |
image = image.convert('RGB') | |
# Apply enhancements based on level | |
if enhancement_level in ['medium', 'heavy']: | |
# Enhance contrast | |
enhancer = ImageEnhance.Contrast(image) | |
image = enhancer.enhance(1.2) | |
# Enhance sharpness | |
enhancer = ImageEnhance.Sharpness(image) | |
image = enhancer.enhance(1.1) | |
if enhancement_level == 'heavy': | |
# Additional heavy processing | |
# Reduce noise | |
image = image.filter(ImageFilter.MedianFilter(size=3)) | |
# Enhance brightness slightly | |
enhancer = ImageEnhance.Brightness(image) | |
image = enhancer.enhance(1.05) | |
# Convert to grayscale for better OCR | |
image = ImageOps.grayscale(image) | |
# Increase contrast for text | |
enhancer = ImageEnhance.Contrast(image) | |
image = enhancer.enhance(1.3) | |
return image | |
except Exception as e: | |
logger.warning(f"Image preprocessing failed: {e}") | |
return image | |
def rotate_image(self, image: Image.Image, angle: int) -> Image.Image: | |
""" | |
Rotate image by specified angle. | |
Args: | |
image: PIL Image object | |
angle: Rotation angle in degrees | |
Returns: | |
Rotated PIL Image | |
""" | |
try: | |
if angle == 0: | |
return image | |
# Rotate image | |
rotated = image.rotate(-angle, expand=True, fillcolor='white') | |
return rotated | |
except Exception as e: | |
logger.warning(f"Image rotation failed: {e}") | |
return image | |
def detect_text_orientation(self, image: Image.Image) -> Dict[str, Any]: | |
""" | |
Detect the orientation of text in the image. | |
Args: | |
image: PIL Image object | |
Returns: | |
Dictionary with orientation detection results | |
""" | |
result = { | |
'best_orientation': 0, | |
'confidence': 0.0, | |
'orientations_tested': [], | |
'method': 'ocr_confidence' | |
} | |
if not self.available: | |
return result | |
try: | |
best_confidence = 0 | |
best_orientation = 0 | |
orientation_results = [] | |
# Test each orientation | |
for angle in self.orientations: | |
rotated_image = self.rotate_image(image, angle) | |
preprocessed = self.preprocess_image(rotated_image, 'light') | |
# Get OCR data with confidence | |
try: | |
data = pytesseract.image_to_data( | |
preprocessed, | |
config=self.ocr_configs['default'], | |
output_type=pytesseract.Output.DICT | |
) | |
# Calculate average confidence for detected text | |
confidences = [int(conf) for conf in data['conf'] if int(conf) > 0] | |
avg_confidence = sum(confidences) / len(confidences) if confidences else 0 | |
orientation_results.append({ | |
'angle': angle, | |
'confidence': avg_confidence, | |
'text_blocks': len(confidences) | |
}) | |
if avg_confidence > best_confidence: | |
best_confidence = avg_confidence | |
best_orientation = angle | |
except Exception as e: | |
logger.warning(f"OCR failed for orientation {angle}: {e}") | |
orientation_results.append({ | |
'angle': angle, | |
'confidence': 0, | |
'text_blocks': 0 | |
}) | |
result['best_orientation'] = best_orientation | |
result['confidence'] = best_confidence | |
result['orientations_tested'] = orientation_results | |
except Exception as e: | |
logger.warning(f"Orientation detection failed: {e}") | |
return result | |
def extract_text_with_confidence(self, image: Image.Image, config: str = 'default', | |
languages: List[str] = None) -> Dict[str, Any]: | |
""" | |
Extract text from image with confidence scores. | |
Args: | |
image: PIL Image object | |
config: OCR configuration key | |
languages: List of language codes to use | |
Returns: | |
Dictionary with text extraction results | |
""" | |
result = { | |
'text': '', | |
'confidence': 0.0, | |
'word_confidences': [], | |
'bounding_boxes': [], | |
'languages_used': languages or ['eng'] | |
} | |
if not self.available: | |
return result | |
try: | |
# Prepare language string | |
lang_string = '+'.join(languages) if languages else 'eng' | |
# Get OCR configuration | |
ocr_config = self.ocr_configs.get(config, self.ocr_configs['default']) | |
ocr_config += f' -l {lang_string}' | |
# Extract text with detailed data | |
data = pytesseract.image_to_data( | |
image, | |
config=ocr_config, | |
output_type=pytesseract.Output.DICT | |
) | |
# Process results | |
words = [] | |
confidences = [] | |
boxes = [] | |
for i in range(len(data['text'])): | |
text = data['text'][i].strip() | |
conf = int(data['conf'][i]) | |
if text and conf > 0: | |
words.append(text) | |
confidences.append(conf) | |
boxes.append({ | |
'x': data['left'][i], | |
'y': data['top'][i], | |
'width': data['width'][i], | |
'height': data['height'][i], | |
'text': text, | |
'confidence': conf | |
}) | |
# Combine results | |
result['text'] = ' '.join(words) | |
result['confidence'] = sum(confidences) / len(confidences) if confidences else 0 | |
result['word_confidences'] = confidences | |
result['bounding_boxes'] = boxes | |
except Exception as e: | |
logger.warning(f"Text extraction failed: {e}") | |
return result | |
def process_multi_orientation_ocr(self, image: Image.Image, | |
auto_detect_orientation: bool = True) -> Dict[str, Any]: | |
""" | |
Process OCR with multiple orientations and return best result. | |
Args: | |
image: PIL Image object | |
auto_detect_orientation: Whether to auto-detect best orientation | |
Returns: | |
Dictionary with best OCR results | |
""" | |
result = { | |
'text': '', | |
'confidence': 0.0, | |
'best_orientation': 0, | |
'orientation_results': [], | |
'preprocessing_applied': True | |
} | |
if not self.available: | |
return result | |
try: | |
# Preprocess image | |
preprocessed = self.preprocess_image(image, 'medium') | |
if auto_detect_orientation: | |
# Detect best orientation first | |
orientation_info = self.detect_text_orientation(preprocessed) | |
best_angle = orientation_info['best_orientation'] | |
# Process with best orientation | |
rotated = self.rotate_image(preprocessed, best_angle) | |
ocr_result = self.extract_text_with_confidence(rotated) | |
result.update(ocr_result) | |
result['best_orientation'] = best_angle | |
result['orientation_results'] = orientation_info['orientations_tested'] | |
else: | |
# Try all orientations and pick best | |
best_confidence = 0 | |
best_result = None | |
best_angle = 0 | |
orientation_results = [] | |
for angle in self.orientations: | |
rotated = self.rotate_image(preprocessed, angle) | |
ocr_result = self.extract_text_with_confidence(rotated) | |
orientation_results.append({ | |
'angle': angle, | |
'confidence': ocr_result['confidence'], | |
'text_length': len(ocr_result['text']), | |
'word_count': len(ocr_result['text'].split()) | |
}) | |
if ocr_result['confidence'] > best_confidence: | |
best_confidence = ocr_result['confidence'] | |
best_result = ocr_result | |
best_angle = angle | |
if best_result: | |
result.update(best_result) | |
result['best_orientation'] = best_angle | |
result['orientation_results'] = orientation_results | |
except Exception as e: | |
logger.error(f"Multi-orientation OCR failed: {e}") | |
return result | |
def process_image_file(self, image_path: str, **kwargs) -> Dict[str, Any]: | |
""" | |
Process an image file with enhanced OCR. | |
Args: | |
image_path: Path to image file | |
**kwargs: Additional arguments for OCR processing | |
Returns: | |
Dictionary with OCR results | |
""" | |
result = { | |
'success': False, | |
'error': '', | |
'text': '', | |
'confidence': 0.0 | |
} | |
if not self.available: | |
result['error'] = 'OCR engine not available' | |
return result | |
try: | |
# Load image | |
image = Image.open(image_path) | |
# Process with multi-orientation OCR | |
ocr_result = self.process_multi_orientation_ocr(image, **kwargs) | |
result['success'] = True | |
result.update(ocr_result) | |
except Exception as e: | |
result['error'] = str(e) | |
logger.error(f"Image file processing failed: {e}") | |
return result | |
def enhance_text_quality(self, text: str) -> str: | |
""" | |
Enhance OCR text quality by fixing common errors. | |
Args: | |
text: Raw OCR text | |
Returns: | |
Enhanced text | |
""" | |
if not text: | |
return text | |
# Common OCR error corrections | |
corrections = { | |
# Number/letter confusions | |
'0': 'O', # Context-dependent | |
'1': 'l', # Context-dependent | |
'5': 'S', # Context-dependent | |
'8': 'B', # Context-dependent | |
# Common character mistakes | |
'rn': 'm', | |
'cl': 'd', | |
'vv': 'w', | |
# Punctuation fixes | |
' ,': ',', | |
' .': '.', | |
' !': '!', | |
' ?': '?', | |
} | |
enhanced = text | |
# Apply basic corrections | |
for wrong, right in corrections.items(): | |
if wrong in enhanced: | |
# Apply context-aware corrections | |
enhanced = enhanced.replace(wrong, right) | |
# Clean up extra spaces | |
enhanced = ' '.join(enhanced.split()) | |
return enhanced | |
def get_enhanced_ocr_tools() -> List[EnhancedOCREngine]: | |
"""Get list of enhanced OCR tools.""" | |
try: | |
ocr_engine = EnhancedOCREngine() | |
if ocr_engine.available: | |
return [ocr_engine] | |
else: | |
logger.warning("⚠️ Enhanced OCR engine not available") | |
return [] | |
except Exception as e: | |
logger.error(f"❌ Failed to create enhanced OCR engine: {e}") | |
return [] |