Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Request | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.templating import Jinja2Templates | |
from fastapi.responses import HTMLResponse, JSONResponse | |
from fastapi.middleware.cors import CORSMiddleware | |
from typing import Optional | |
import requests | |
import os | |
import io | |
import tempfile | |
import logging | |
import subprocess | |
# Configuration du logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
app = FastAPI() | |
# Configuration CORS | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Configuration des fichiers statiques | |
app.mount("/static", StaticFiles(directory="frontend"), name="static") | |
templates = Jinja2Templates(directory="frontend") | |
async def serve_frontend(request: Request): | |
return templates.TemplateResponse("index.html", {"request": request}) | |
# Configuration Hugging Face | |
HF_API_KEY = os.getenv("HF_API_KEY", "") | |
HEADERS = {"Authorization": f"Bearer {HF_API_KEY}"} if HF_API_KEY else {} | |
# Configuration Hugging Face | |
HF_MODELS = { | |
"summary": "facebook/bart-large-cnn", | |
# "qa": "deepset/roberta-base-squad2" # <- ancien modèle commenté | |
"qa": "HPAI-BSC/Llama3-Aloe-8B-Alpha" # <- nouveau modèle | |
} | |
HF_API_URL = "https://api-inference.huggingface.co/models/" | |
def query_huggingface(model: str, payload: dict): | |
try: | |
api_url = f"{HF_API_URL}{model}" | |
logger.info(f"Requête à {api_url}") | |
response = requests.post( | |
api_url, | |
headers=HEADERS, | |
json=payload, | |
timeout=60 | |
) | |
if response.status_code != 200: | |
logger.error(f"Erreur API Hugging Face: {response.status_code}, {response.text}") | |
return {"error": f"Erreur API: {response.status_code}"} | |
return response.json() | |
except Exception as e: | |
logger.error(f"Erreur API: {str(e)}") | |
return {"error": str(e)} | |
async def convert_to_text(file: UploadFile): | |
"""Convertit différents formats de fichiers en texte avec gestion robuste des erreurs""" | |
try: | |
# Vérification du type de fichier | |
if not file.filename: | |
return "Aucun fichier fourni" | |
ext = os.path.splitext(file.filename)[1].lower() | |
# Lecture du contenu | |
content = await file.read() | |
# Traitement des fichiers texte | |
if ext == '.txt': | |
return content.decode('utf-8', errors='replace') | |
# Traitement des PDF avec pdftotext | |
elif ext == '.pdf': | |
try: | |
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as tmp_pdf: | |
tmp_pdf.write(content) | |
tmp_pdf.flush() | |
tmp_pdf_path = tmp_pdf.name | |
try: | |
result = subprocess.run( | |
["pdftotext", tmp_pdf_path, "-"], | |
capture_output=True, | |
text=True, | |
timeout=30 | |
) | |
os.unlink(tmp_pdf_path) # Supprimer le fichier temporaire | |
if result.returncode == 0: | |
return result.stdout | |
else: | |
error_msg = result.stderr or "Erreur inconnue lors de la conversion PDF" | |
logger.error(f"PDF conversion failed: {error_msg}") | |
return f"Erreur de conversion PDF: {error_msg}" | |
except: | |
# S'assurer que le fichier temporaire est supprimé en cas d'erreur | |
if os.path.exists(tmp_pdf_path): | |
os.unlink(tmp_pdf_path) | |
raise | |
except FileNotFoundError: | |
logger.warning("pdftotext non installé") | |
return "Conversion PDF non disponible (pdftotext requis)" | |
except subprocess.TimeoutExpired: | |
return "Timeout lors de la conversion PDF" | |
# Traitement des fichiers Word avec pandoc | |
elif ext in ('.docx', '.doc'): | |
try: | |
with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp_doc: | |
tmp_doc.write(content) | |
tmp_doc.flush() | |
tmp_doc_path = tmp_doc.name | |
try: | |
result = subprocess.run( | |
["pandoc", "-t", "plain", tmp_doc_path], | |
capture_output=True, | |
text=True, | |
timeout=30 | |
) | |
os.unlink(tmp_doc_path) # Supprimer le fichier temporaire | |
if result.returncode == 0: | |
return result.stdout | |
else: | |
error_msg = result.stderr or "Erreur inconnue lors de la conversion DOCX" | |
logger.error(f"DOCX conversion failed: {error_msg}") | |
return f"Erreur de conversion DOCX: {error_msg}" | |
except: | |
# S'assurer que le fichier temporaire est supprimé en cas d'erreur | |
if os.path.exists(tmp_doc_path): | |
os.unlink(tmp_doc_path) | |
raise | |
except FileNotFoundError: | |
logger.warning("pandoc non installé") | |
return "Conversion DOCX non disponible (pandoc requis)" | |
except subprocess.TimeoutExpired: | |
return "Timeout lors de la conversion DOCX" | |
else: | |
return f"Format de fichier non supporté: {ext}" | |
except Exception as e: | |
logger.error(f"Erreur de conversion: {str(e)}") | |
return f"Erreur lors de la conversion du fichier: {str(e)}" | |
async def summarize_document(file: UploadFile = File(...)): | |
"""Endpoint pour résumer des documents avec gestion améliorée des PDF""" | |
try: | |
logger.info(f"Traitement du fichier: {file.filename}") | |
text = await convert_to_text(file) | |
if not text or not text.strip(): | |
raise HTTPException(400, "Fichier vide ou problème de conversion") | |
# Si le texte est un message d'erreur | |
if text.startswith(("Erreur de conversion", "Conversion", "Format non supporté")): | |
return { | |
"filename": file.filename, | |
"summary": text, # Retourne le message d'erreur comme "résumé" | |
"text_length": len(text), | |
"warning": True | |
} | |
# Limite la taille pour l'API | |
text_to_process = text[:3000] # Réduire pour plus de fiabilité | |
response = query_huggingface(HF_MODELS["summary"], { | |
"inputs": text_to_process, | |
"parameters": {"max_length": 150, "min_length": 30} | |
}) | |
if "error" in response: | |
logger.error(f"Erreur de l'API HF: {response['error']}") | |
return { | |
"filename": file.filename, | |
"summary": f"Erreur lors de la génération du résumé: {response['error']}", | |
"text_length": len(text), | |
"warning": True | |
} | |
# Gérer différents formats de réponse possibles | |
summary_text = "" | |
if isinstance(response, list) and len(response) > 0: | |
if isinstance(response[0], dict) and "summary_text" in response[0]: | |
summary_text = response[0]["summary_text"] | |
elif isinstance(response[0], str): | |
summary_text = response[0] | |
elif isinstance(response, dict) and "summary_text" in response: | |
summary_text = response["summary_text"] | |
if not summary_text: | |
summary_text = "Le modèle n'a pas pu générer de résumé. Essayez avec un texte plus court ou plus clair." | |
return { | |
"filename": file.filename, | |
"summary": summary_text, | |
"text_length": len(text), | |
"warning": False | |
} | |
except HTTPException: | |
raise | |
except Exception as e: | |
logger.error(f"Erreur dans summarize: {str(e)}") | |
raise HTTPException(500, f"Erreur interne: {str(e)}") | |
async def answer_question( | |
question: str = Form(...), | |
file: Optional[UploadFile] = File(None) | |
): | |
"""Endpoint pour répondre à des questions basées sur un document""" | |
try: | |
logger.info(f"Question reçue: {question}") | |
context = "" | |
if file: | |
logger.info(f"Traitement du fichier: {file.filename}") | |
context = await convert_to_text(file) | |
# Si le contexte est un message d'erreur | |
if context.startswith(("Erreur de conversion", "Conversion", "Format non supporté")): | |
return { | |
"question": question, | |
"answer": f"Problème avec le document: {context}", | |
"warning": True | |
} | |
# Si aucun fichier fourni, on répond juste à la question | |
if not context or not context.strip(): | |
context = "Pas de contexte disponible." | |
# Limite la taille du contexte pour l'API | |
context_to_process = context[:3000] # Réduire pour plus de fiabilité | |
response = query_huggingface(HF_MODELS["qa"], { | |
"inputs": { | |
"question": question, | |
"context": context_to_process | |
} | |
}) | |
if "error" in response: | |
logger.error(f"Erreur de l'API HF: {response['error']}") | |
return { | |
"question": question, | |
"answer": f"Erreur lors de l'analyse: {response['error']}", | |
"warning": True | |
} | |
# Gérer différents formats de réponse possibles | |
answer = "" | |
if isinstance(response, dict): | |
if "answer" in response: | |
answer = response["answer"] | |
elif "answer_text" in response: | |
answer = response["answer_text"] | |
elif "answers" in response and len(response["answers"]) > 0: | |
answer = response["answers"][0]["text"] | |
if not answer: | |
answer = "Je n'ai pas trouvé de réponse précise à cette question dans le document fourni." | |
return { | |
"question": question, | |
"answer": answer, | |
"warning": False | |
} | |
except Exception as e: | |
logger.error(f"Erreur dans answer-question: {str(e)}") | |
raise HTTPException(500, f"Erreur interne: {str(e)}") | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=7860) |