rohit
Create self-contained app.py with dynamic imports to fix module loading issues
cfa1426
raw
history blame
4.13 kB
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import os
import logging
import sys
import json
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
app = FastAPI(title="RAG Pipeline API", description="Multi-dataset RAG API", version="1.0.0")
# Initialize pipelines for all datasets
pipelines = {}
google_api_key = os.getenv("GOOGLE_API_KEY")
logger.info(f"Starting RAG Pipeline API")
logger.info(f"Google API Key present: {'Yes' if google_api_key else 'No'}")
# Don't load datasets during startup - do it asynchronously after server starts
logger.info("RAG Pipeline API is ready to serve requests - datasets will load in background")
class Question(BaseModel):
text: str
dataset: str = "developer-portfolio"
@app.post("/answer")
async def get_answer(question: Question):
try:
# Check if any pipelines are loaded
if not pipelines:
return {
"answer": "RAG Pipeline is running but datasets are still loading in the background. Please try again in a moment, or check /health for loading status.",
"dataset": question.dataset,
"status": "datasets_loading"
}
# Select the appropriate pipeline based on dataset
if question.dataset not in pipelines:
raise HTTPException(status_code=400, detail=f"Dataset '{question.dataset}' not available. Available datasets: {list(pipelines.keys())}")
selected_pipeline = pipelines[question.dataset]
answer = selected_pipeline.answer_question(question.text)
return {"answer": answer, "dataset": question.dataset}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/datasets")
async def list_datasets():
"""List all available datasets"""
return {"datasets": list(pipelines.keys())}
async def load_datasets_background():
"""Load datasets in background after server starts"""
global pipelines
if google_api_key:
try:
# Import modules only when needed
import sys
sys.path.append('/app')
from app.pipeline import RAGPipeline
from app.config import DATASET_CONFIGS
# Only load developer-portfolio to save memory
dataset_name = "developer-portfolio"
logger.info(f"Loading dataset: {dataset_name}")
pipeline = RAGPipeline.from_preset(
google_api_key=google_api_key,
preset_name=dataset_name
)
pipelines[dataset_name] = pipeline
logger.info(f"Successfully loaded {dataset_name}")
except Exception as e:
logger.error(f"Failed to load dataset: {e}")
logger.info(f"Background loading complete - {len(pipelines)} datasets loaded")
else:
logger.warning("No Google API key provided - running in demo mode without datasets")
@app.on_event("startup")
async def startup_event():
logger.info("FastAPI application startup complete")
logger.info(f"Server should be running on port: 7860")
# Start loading datasets in background (non-blocking)
import asyncio
asyncio.create_task(load_datasets_background())
@app.on_event("shutdown")
async def shutdown_event():
logger.info("FastAPI application shutting down")
@app.get("/")
async def root():
"""Root endpoint"""
return {"status": "ok", "message": "RAG Pipeline API", "version": "1.0.0", "datasets": list(pipelines.keys())}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
logger.info("Health check called")
loading_status = "complete" if "developer-portfolio" in pipelines else "loading"
return {
"status": "healthy",
"datasets_loaded": len(pipelines),
"total_datasets": 1, # Only loading developer-portfolio
"loading_status": loading_status,
"port": "7860"
}