from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Request from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from fastapi.responses import HTMLResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from typing import Optional import requests import os import io import tempfile import logging import subprocess # Configuration du logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI() # Configuration CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Configuration des fichiers statiques app.mount("/static", StaticFiles(directory="frontend"), name="static") templates = Jinja2Templates(directory="frontend") @app.get("/", response_class=HTMLResponse) async def serve_frontend(request: Request): return templates.TemplateResponse("index.html", {"request": request}) # Configuration Hugging Face HF_API_KEY = os.getenv("HF_API_KEY", "") HEADERS = {"Authorization": f"Bearer {HF_API_KEY}"} if HF_API_KEY else {} # Configuration Hugging Face HF_MODELS = { "summary": "facebook/bart-large-cnn", # "qa": "deepset/roberta-base-squad2" # <- ancien modèle commenté "qa": "HPAI-BSC/Llama3-Aloe-8B-Alpha" # <- nouveau modèle } HF_API_URL = "https://api-inference.huggingface.co/models/" def query_huggingface(model: str, payload: dict): try: api_url = f"{HF_API_URL}{model}" logger.info(f"Requête à {api_url}") response = requests.post( api_url, headers=HEADERS, json=payload, timeout=60 ) if response.status_code != 200: logger.error(f"Erreur API Hugging Face: {response.status_code}, {response.text}") return {"error": f"Erreur API: {response.status_code}"} return response.json() except Exception as e: logger.error(f"Erreur API: {str(e)}") return {"error": str(e)} async def convert_to_text(file: UploadFile): """Convertit différents formats de fichiers en texte avec gestion robuste des erreurs""" try: # Vérification du type de fichier if not file.filename: return "Aucun fichier fourni" ext = os.path.splitext(file.filename)[1].lower() # Lecture du contenu content = await file.read() # Traitement des fichiers texte if ext == '.txt': return content.decode('utf-8', errors='replace') # Traitement des PDF avec pdftotext elif ext == '.pdf': try: with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as tmp_pdf: tmp_pdf.write(content) tmp_pdf.flush() tmp_pdf_path = tmp_pdf.name try: result = subprocess.run( ["pdftotext", tmp_pdf_path, "-"], capture_output=True, text=True, timeout=30 ) os.unlink(tmp_pdf_path) # Supprimer le fichier temporaire if result.returncode == 0: return result.stdout else: error_msg = result.stderr or "Erreur inconnue lors de la conversion PDF" logger.error(f"PDF conversion failed: {error_msg}") return f"Erreur de conversion PDF: {error_msg}" except: # S'assurer que le fichier temporaire est supprimé en cas d'erreur if os.path.exists(tmp_pdf_path): os.unlink(tmp_pdf_path) raise except FileNotFoundError: logger.warning("pdftotext non installé") return "Conversion PDF non disponible (pdftotext requis)" except subprocess.TimeoutExpired: return "Timeout lors de la conversion PDF" # Traitement des fichiers Word avec pandoc elif ext in ('.docx', '.doc'): try: with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp_doc: tmp_doc.write(content) tmp_doc.flush() tmp_doc_path = tmp_doc.name try: result = subprocess.run( ["pandoc", "-t", "plain", tmp_doc_path], capture_output=True, text=True, timeout=30 ) os.unlink(tmp_doc_path) # Supprimer le fichier temporaire if result.returncode == 0: return result.stdout else: error_msg = result.stderr or "Erreur inconnue lors de la conversion DOCX" logger.error(f"DOCX conversion failed: {error_msg}") return f"Erreur de conversion DOCX: {error_msg}" except: # S'assurer que le fichier temporaire est supprimé en cas d'erreur if os.path.exists(tmp_doc_path): os.unlink(tmp_doc_path) raise except FileNotFoundError: logger.warning("pandoc non installé") return "Conversion DOCX non disponible (pandoc requis)" except subprocess.TimeoutExpired: return "Timeout lors de la conversion DOCX" else: return f"Format de fichier non supporté: {ext}" except Exception as e: logger.error(f"Erreur de conversion: {str(e)}") return f"Erreur lors de la conversion du fichier: {str(e)}" @app.post("/summarize", response_class=JSONResponse) async def summarize_document(file: UploadFile = File(...)): """Endpoint pour résumer des documents avec gestion améliorée des PDF""" try: logger.info(f"Traitement du fichier: {file.filename}") text = await convert_to_text(file) if not text or not text.strip(): raise HTTPException(400, "Fichier vide ou problème de conversion") # Si le texte est un message d'erreur if text.startswith(("Erreur de conversion", "Conversion", "Format non supporté")): return { "filename": file.filename, "summary": text, # Retourne le message d'erreur comme "résumé" "text_length": len(text), "warning": True } # Limite la taille pour l'API text_to_process = text[:3000] # Réduire pour plus de fiabilité response = query_huggingface(HF_MODELS["summary"], { "inputs": text_to_process, "parameters": {"max_length": 150, "min_length": 30} }) if "error" in response: logger.error(f"Erreur de l'API HF: {response['error']}") return { "filename": file.filename, "summary": f"Erreur lors de la génération du résumé: {response['error']}", "text_length": len(text), "warning": True } # Gérer différents formats de réponse possibles summary_text = "" if isinstance(response, list) and len(response) > 0: if isinstance(response[0], dict) and "summary_text" in response[0]: summary_text = response[0]["summary_text"] elif isinstance(response[0], str): summary_text = response[0] elif isinstance(response, dict) and "summary_text" in response: summary_text = response["summary_text"] if not summary_text: summary_text = "Le modèle n'a pas pu générer de résumé. Essayez avec un texte plus court ou plus clair." return { "filename": file.filename, "summary": summary_text, "text_length": len(text), "warning": False } except HTTPException: raise except Exception as e: logger.error(f"Erreur dans summarize: {str(e)}") raise HTTPException(500, f"Erreur interne: {str(e)}") @app.post("/answer-question", response_class=JSONResponse) async def answer_question( question: str = Form(...), file: Optional[UploadFile] = File(None) ): """Endpoint pour répondre à des questions basées sur un document""" try: logger.info(f"Question reçue: {question}") context = "" if file: logger.info(f"Traitement du fichier: {file.filename}") context = await convert_to_text(file) # Si le contexte est un message d'erreur if context.startswith(("Erreur de conversion", "Conversion", "Format non supporté")): return { "question": question, "answer": f"Problème avec le document: {context}", "warning": True } # Si aucun fichier fourni, on répond juste à la question if not context or not context.strip(): context = "Pas de contexte disponible." # Limite la taille du contexte pour l'API context_to_process = context[:3000] # Réduire pour plus de fiabilité response = query_huggingface(HF_MODELS["qa"], { "inputs": { "question": question, "context": context_to_process } }) if "error" in response: logger.error(f"Erreur de l'API HF: {response['error']}") return { "question": question, "answer": f"Erreur lors de l'analyse: {response['error']}", "warning": True } # Gérer différents formats de réponse possibles answer = "" if isinstance(response, dict): if "answer" in response: answer = response["answer"] elif "answer_text" in response: answer = response["answer_text"] elif "answers" in response and len(response["answers"]) > 0: answer = response["answers"][0]["text"] if not answer: answer = "Je n'ai pas trouvé de réponse précise à cette question dans le document fourni." return { "question": question, "answer": answer, "warning": False } except Exception as e: logger.error(f"Erreur dans answer-question: {str(e)}") raise HTTPException(500, f"Erreur interne: {str(e)}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)