from fastapi import FastAPI, HTTPException from transformers import pipeline import langdetect import logging import os from typing import Optional import re from functools import lru_cache import asyncio # Set environment variables for Hugging Face cache os.environ["HF_HOME"] = "/app/cache" os.environ["TRANSFORMERS_CACHE"] = "/app/cache" # Environment configuration USE_8BIT = False try: import bitsandbytes # hanya untuk memastikan modul tersedia USE_8BIT = os.getenv("USE_QUANTIZATION", "0") == "1" except ImportError: USE_8BIT = False DEVICE = int(os.getenv("DEVICE", "-1")) # -1 for CPU, 0+ for GPU MAX_TEXT_LENGTH = int(os.getenv("MAX_TEXT_LENGTH", "5000")) app = FastAPI() # Configure logging with timestamp and level logging.basicConfig( format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO, handlers=[ logging.handlers.RotatingFileHandler("/app/logs/app.log", maxBytes=1000000, backupCount=1), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Map of supported language models MODEL_MAP = { "th": "Helsinki-NLP/opus-mt-th-en", "ja": "Helsinki-NLP/opus-mt-ja-en", "zh": "Helsinki-NLP/opus-mt-zh-en", "vi": "Helsinki-NLP/opus-mt-vi-en", } # List of terms to protect from translation PROTECTED_TERMS = ["2030 Aspirations", "Griffith"] # Cache for translators to avoid reloading models unnecessarily translators = {} def get_translator(lang: str): """Load or retrieve cached translator for the given language.""" if lang not in translators: logger.info(f"Loading model for {lang} from {MODEL_MAP[lang]}...") try: translators[lang] = pipeline( "translation", model=MODEL_MAP[lang], device=DEVICE, model_kwargs={"load_in_8bit": USE_8BIT} ) logger.info(f"Model for {lang} loaded successfully.") except Exception as e: logger.error(f"Failed to load model for {lang}: {str(e)}") raise return translators[lang] @lru_cache(maxsize=100) def detect_language(text: str) -> str: """Cached language detection to reduce overhead for repeated inputs.""" try: detected_lang = langdetect.detect(text) logger.debug(f"langdetect raw result: '{detected_lang}' for text: '{text[:50]}...'") if detected_lang.startswith('zh'): logger.debug(f"Normalizing '{detected_lang}' to 'zh' for Mandarin.") return 'zh' final_lang = detected_lang if detected_lang in MODEL_MAP else "en" logger.debug(f"Final determined language: '{final_lang}'. (Based on raw detected: '{detected_lang}')") return final_lang except Exception as e: logger.warning(f"Language detection FAILED for text: '{text[:50]}...'. Error: {str(e)}. Defaulting to English.") return "en" def protect_terms(text: str, protected_terms: list) -> tuple[str, dict]: """Replace protected terms with placeholders using regex for efficiency.""" modified_text = text replacements = {} for i, term in enumerate(protected_terms): placeholder = f"__PROTECTED_{i}__" replacements[placeholder] = term modified_text = re.sub(r'\b' + re.escape(term) + r'\b', placeholder, modified_text) if replacements: logger.debug(f"Protected terms replaced: {replacements}") return modified_text, replacements def restore_terms(text: str, replacements: dict) -> str: """Restore protected terms in the translated text.""" restored_text = text for placeholder, term in replacements.items(): restored_text = restored_text.replace(placeholder, term) return restored_text @app.post("/translate") async def translate(text: str, source_lang_override: Optional[str] = None): """ Translate text to English, preserving protected terms like '2030 Aspirations'. Automatically detects source language or uses override. """ if not text: raise HTTPException(status_code=400, detail="Text input is required.") if len(text) > MAX_TEXT_LENGTH: raise HTTPException( status_code=413, detail=f"Text too long. Max allowed length: {MAX_TEXT_LENGTH}." ) try: # Determine source language if source_lang_override and source_lang_override in MODEL_MAP: source_lang = source_lang_override logger.debug(f"Source language overridden by user to: '{source_lang_override}'.") else: source_lang = await asyncio.to_thread(detect_language, text) logger.debug(f"Determined source language for translation: '{source_lang}'.") # If source language is English, return original text if source_lang == "en": logger.debug("Source language is English or unrecognized, returning original text.") return {"translated_text": text} # Get translator (lazy-loaded) translator = get_translator(source_lang) if not translator: logger.error(f"No translator found for language: '{source_lang}'.") raise HTTPException( status_code=400, detail=f"Translation not supported for language: {source_lang}." ) # Protect terms before translation modified_text, replacements = protect_terms(text, PROTECTED_TERMS) logger.debug(f"Text after protecting terms: '{modified_text[:50]}...'") # Perform translation in a thread to avoid blocking the event loop logger.debug(f"Translating text from '{source_lang}' to English...") result = await asyncio.to_thread(translator, modified_text, max_length=512, num_beams=4) translated_text = result[0]["translation_text"] logger.debug(f"Translation successful. Original: '{modified_text[:50]}...', Translated: '{translated_text[:50]}...'") # Restore protected terms final_text = restore_terms(translated_text, replacements) logger.debug(f"Final translated text with restored terms: '{final_text[:50]}...'") return {"translated_text": final_text} except HTTPException as e: raise e except Exception as e: logger.error(f"An unexpected error occurred during processing: {str(e)}", exc_info=True) raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}")