Spaces:
Runtime error
Runtime error
from fastapi import FastAPI, HTTPException | |
from transformers import pipeline | |
import langdetect | |
import logging | |
import os | |
from typing import Optional | |
# Set environment variables for Hugging Face cache | |
os.environ["HF_HOME"] = "/app/cache" | |
os.environ["TRANSFORMERS_CACHE"] = "/app/cache" | |
app = FastAPI() | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Map of supported language models | |
MODEL_MAP = { | |
"id": "Helsinki-NLP/opus-mt-id-en", | |
"th": "Helsinki-NLP/opus-mt-th-en", | |
"fr": "Helsinki-NLP/opus-mt-fr-en", | |
"es": "Helsinki-NLP/opus-mt-es-en", | |
"ja": "Helsinki-NLP/opus-mt-ja-en", | |
"zh": "Helsinki-NLP/opus-mt-zh-en", | |
"vi": "Helsinki-NLP/opus-mt-vi-en", | |
} | |
# List of terms to protect from translation | |
PROTECTED_TERMS = ["2030 Aspirations"] | |
translators = {} | |
try: | |
for lang, model_name in MODEL_MAP.items(): | |
logger.info(f"Loading model for {lang} from {model_name}...") | |
translators[lang] = pipeline("translation", model=model_name) | |
logger.info(f"Model for {lang} loaded successfully.") | |
except Exception as e: | |
logger.error(f"Model initialization failed: {str(e)}") | |
raise Exception(f"Model initialization failed: {str(e)}") | |
def detect_language(text: str) -> str: | |
try: | |
detected_lang = langdetect.detect(text) | |
logger.info(f"langdetect raw result: '{detected_lang}' for text: '{text[:50]}...'") | |
if detected_lang.startswith('zh'): | |
logger.info(f"Normalizing '{detected_lang}' to 'zh' for Mandarin.") | |
return 'zh' | |
final_lang = detected_lang if detected_lang in MODEL_MAP else "en" | |
logger.info(f"Final determined language: '{final_lang}'. (Based on raw detected: '{detected_lang}')") | |
return final_lang | |
except Exception as e: | |
logger.warning(f"Language detection FAILED for text: '{text[:50]}...'. Error: {str(e)}. Defaulting to English.") | |
return "en" | |
def protect_terms(text: str, protected_terms: list) -> tuple[str, dict]: | |
""" | |
Replace protected terms with placeholders to prevent translation. | |
Returns the modified text and a dictionary mapping placeholders to original terms. | |
""" | |
modified_text = text | |
replacements = {} | |
for i, term in enumerate(protected_terms): | |
placeholder = f"__PROTECTED_{i}__" | |
replacements[placeholder] = term | |
modified_text = modified_text.replace(term, placeholder) | |
return modified_text, replacements | |
def restore_terms(text: str, replacements: dict) -> str: | |
""" | |
Restore protected terms in the translated text using the replacements dictionary. | |
""" | |
restored_text = text | |
for placeholder, term in replacements.items(): | |
restored_text = restored_text.replace(placeholder, term) | |
return restored_text | |
async def translate(text: str, source_lang_override: Optional[str] = None): | |
""" | |
Translate text to English, preserving protected terms like '2030 Aspirations'. | |
Automatically detects source language or uses override. | |
""" | |
if not text: | |
raise HTTPException(status_code=400, detail="Text input is required.") | |
try: | |
# Determine source language | |
if source_lang_override and source_lang_override in MODEL_MAP: | |
source_lang = source_lang_override | |
logger.info(f"Source language overridden by user to: '{source_lang_override}'.") | |
else: | |
source_lang = detect_language(text) | |
logger.info(f"Determined source language for translation: '{source_lang}'.") | |
# If source language is English, return original text | |
if source_lang == "en": | |
logger.info("Source language is English or unrecognized, returning original text.") | |
return {"translated_text": text} | |
# Get translator | |
translator = translators.get(source_lang) | |
if not translator: | |
logger.error(f"No translator found for language: '{source_lang}'.") | |
raise HTTPException( | |
status_code=400, | |
detail=f"Translation not supported for language: {source_lang}." | |
) | |
# Protect terms before translation | |
modified_text, replacements = protect_terms(text, PROTECTED_TERMS) | |
logger.info(f"Text after protecting terms: '{modified_text[:50]}...'") | |
# Perform translation | |
logger.info(f"Translating text from '{source_lang}' to English...") | |
result = translator(modified_text) | |
translated_text = result[0]["translation_text"] | |
logger.info(f"Translation successful. Original: '{modified_text[:50]}...', Translated: '{translated_text[:50]}...'") | |
# Restore protected terms | |
final_text = restore_terms(translated_text, replacements) | |
logger.info(f"Final translated text with restored terms: '{final_text[:50]}...'") | |
return {"translated_text": final_text} | |
except HTTPException as e: | |
raise e | |
except Exception as e: | |
logger.error(f"An unexpected error occurred during processing: {str(e)}", exc_info=True) | |
raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}") |