anes / backend_fastapi.py
benyelles0mehdi0dris
bdlt model
955b9ed
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Request
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from typing import Optional
import requests
import os
import io
import tempfile
import logging
import subprocess
# Configuration du logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
# Configuration CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Configuration des fichiers statiques
app.mount("/static", StaticFiles(directory="frontend"), name="static")
templates = Jinja2Templates(directory="frontend")
@app.get("/", response_class=HTMLResponse)
async def serve_frontend(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
# Configuration Hugging Face
HF_API_KEY = os.getenv("HF_API_KEY", "")
HEADERS = {"Authorization": f"Bearer {HF_API_KEY}"} if HF_API_KEY else {}
# Configuration Hugging Face
HF_MODELS = {
"summary": "facebook/bart-large-cnn",
# "qa": "deepset/roberta-base-squad2" # <- ancien modèle commenté
"qa": "HPAI-BSC/Llama3-Aloe-8B-Alpha" # <- nouveau modèle
}
HF_API_URL = "https://api-inference.huggingface.co/models/"
def query_huggingface(model: str, payload: dict):
try:
api_url = f"{HF_API_URL}{model}"
logger.info(f"Requête à {api_url}")
response = requests.post(
api_url,
headers=HEADERS,
json=payload,
timeout=60
)
if response.status_code != 200:
logger.error(f"Erreur API Hugging Face: {response.status_code}, {response.text}")
return {"error": f"Erreur API: {response.status_code}"}
return response.json()
except Exception as e:
logger.error(f"Erreur API: {str(e)}")
return {"error": str(e)}
async def convert_to_text(file: UploadFile):
"""Convertit différents formats de fichiers en texte avec gestion robuste des erreurs"""
try:
# Vérification du type de fichier
if not file.filename:
return "Aucun fichier fourni"
ext = os.path.splitext(file.filename)[1].lower()
# Lecture du contenu
content = await file.read()
# Traitement des fichiers texte
if ext == '.txt':
return content.decode('utf-8', errors='replace')
# Traitement des PDF avec pdftotext
elif ext == '.pdf':
try:
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as tmp_pdf:
tmp_pdf.write(content)
tmp_pdf.flush()
tmp_pdf_path = tmp_pdf.name
try:
result = subprocess.run(
["pdftotext", tmp_pdf_path, "-"],
capture_output=True,
text=True,
timeout=30
)
os.unlink(tmp_pdf_path) # Supprimer le fichier temporaire
if result.returncode == 0:
return result.stdout
else:
error_msg = result.stderr or "Erreur inconnue lors de la conversion PDF"
logger.error(f"PDF conversion failed: {error_msg}")
return f"Erreur de conversion PDF: {error_msg}"
except:
# S'assurer que le fichier temporaire est supprimé en cas d'erreur
if os.path.exists(tmp_pdf_path):
os.unlink(tmp_pdf_path)
raise
except FileNotFoundError:
logger.warning("pdftotext non installé")
return "Conversion PDF non disponible (pdftotext requis)"
except subprocess.TimeoutExpired:
return "Timeout lors de la conversion PDF"
# Traitement des fichiers Word avec pandoc
elif ext in ('.docx', '.doc'):
try:
with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp_doc:
tmp_doc.write(content)
tmp_doc.flush()
tmp_doc_path = tmp_doc.name
try:
result = subprocess.run(
["pandoc", "-t", "plain", tmp_doc_path],
capture_output=True,
text=True,
timeout=30
)
os.unlink(tmp_doc_path) # Supprimer le fichier temporaire
if result.returncode == 0:
return result.stdout
else:
error_msg = result.stderr or "Erreur inconnue lors de la conversion DOCX"
logger.error(f"DOCX conversion failed: {error_msg}")
return f"Erreur de conversion DOCX: {error_msg}"
except:
# S'assurer que le fichier temporaire est supprimé en cas d'erreur
if os.path.exists(tmp_doc_path):
os.unlink(tmp_doc_path)
raise
except FileNotFoundError:
logger.warning("pandoc non installé")
return "Conversion DOCX non disponible (pandoc requis)"
except subprocess.TimeoutExpired:
return "Timeout lors de la conversion DOCX"
else:
return f"Format de fichier non supporté: {ext}"
except Exception as e:
logger.error(f"Erreur de conversion: {str(e)}")
return f"Erreur lors de la conversion du fichier: {str(e)}"
@app.post("/summarize", response_class=JSONResponse)
async def summarize_document(file: UploadFile = File(...)):
"""Endpoint pour résumer des documents avec gestion améliorée des PDF"""
try:
logger.info(f"Traitement du fichier: {file.filename}")
text = await convert_to_text(file)
if not text or not text.strip():
raise HTTPException(400, "Fichier vide ou problème de conversion")
# Si le texte est un message d'erreur
if text.startswith(("Erreur de conversion", "Conversion", "Format non supporté")):
return {
"filename": file.filename,
"summary": text, # Retourne le message d'erreur comme "résumé"
"text_length": len(text),
"warning": True
}
# Limite la taille pour l'API
text_to_process = text[:3000] # Réduire pour plus de fiabilité
response = query_huggingface(HF_MODELS["summary"], {
"inputs": text_to_process,
"parameters": {"max_length": 150, "min_length": 30}
})
if "error" in response:
logger.error(f"Erreur de l'API HF: {response['error']}")
return {
"filename": file.filename,
"summary": f"Erreur lors de la génération du résumé: {response['error']}",
"text_length": len(text),
"warning": True
}
# Gérer différents formats de réponse possibles
summary_text = ""
if isinstance(response, list) and len(response) > 0:
if isinstance(response[0], dict) and "summary_text" in response[0]:
summary_text = response[0]["summary_text"]
elif isinstance(response[0], str):
summary_text = response[0]
elif isinstance(response, dict) and "summary_text" in response:
summary_text = response["summary_text"]
if not summary_text:
summary_text = "Le modèle n'a pas pu générer de résumé. Essayez avec un texte plus court ou plus clair."
return {
"filename": file.filename,
"summary": summary_text,
"text_length": len(text),
"warning": False
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Erreur dans summarize: {str(e)}")
raise HTTPException(500, f"Erreur interne: {str(e)}")
@app.post("/answer-question", response_class=JSONResponse)
async def answer_question(
question: str = Form(...),
file: Optional[UploadFile] = File(None)
):
"""Endpoint pour répondre à des questions basées sur un document"""
try:
logger.info(f"Question reçue: {question}")
context = ""
if file:
logger.info(f"Traitement du fichier: {file.filename}")
context = await convert_to_text(file)
# Si le contexte est un message d'erreur
if context.startswith(("Erreur de conversion", "Conversion", "Format non supporté")):
return {
"question": question,
"answer": f"Problème avec le document: {context}",
"warning": True
}
# Si aucun fichier fourni, on répond juste à la question
if not context or not context.strip():
context = "Pas de contexte disponible."
# Limite la taille du contexte pour l'API
context_to_process = context[:3000] # Réduire pour plus de fiabilité
response = query_huggingface(HF_MODELS["qa"], {
"inputs": {
"question": question,
"context": context_to_process
}
})
if "error" in response:
logger.error(f"Erreur de l'API HF: {response['error']}")
return {
"question": question,
"answer": f"Erreur lors de l'analyse: {response['error']}",
"warning": True
}
# Gérer différents formats de réponse possibles
answer = ""
if isinstance(response, dict):
if "answer" in response:
answer = response["answer"]
elif "answer_text" in response:
answer = response["answer_text"]
elif "answers" in response and len(response["answers"]) > 0:
answer = response["answers"][0]["text"]
if not answer:
answer = "Je n'ai pas trouvé de réponse précise à cette question dans le document fourni."
return {
"question": question,
"answer": answer,
"warning": False
}
except Exception as e:
logger.error(f"Erreur dans answer-question: {str(e)}")
raise HTTPException(500, f"Erreur interne: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)