llm_fastapi / main.py
sreejith8100's picture
ch
e28e6dd
raw
history blame
2.33 kB
from fastapi import FastAPI
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel
import types
import json
from pydantic import validator
from endpoint_handler import EndpointHandler # your handler file
import base64
app = FastAPI()
handler = None
@app.on_event("startup")
async def load_handler():
global handler
handler = EndpointHandler()
class PredictInput(BaseModel):
image: str # base64-encoded image string
question: str
stream: bool = False
@validator("question")
def question_not_empty(cls, v):
if not v.strip():
raise ValueError("Question must not be empty")
return v
@validator("image")
def valid_base64_and_size(cls, v):
try:
decoded = base64.b64decode(v, validate=True)
except Exception:
raise ValueError("`image` must be valid base64")
if len(decoded) > 10 * 1024 * 1024: # 10 MB limit
raise ValueError("Image exceeds 10 MB after decoding")
return v
class PredictRequest(BaseModel):
inputs: PredictInput
@app.get("/")
async def root():
return {"message": "FastAPI app is running on Hugging Face"}
@app.post("/predict")
async def predict_endpoint(payload: PredictRequest):
print(f"[Request] Received question: {payload.inputs.question}")
data = {
"inputs": {
"image": payload.inputs.image,
"question": payload.inputs.question,
"stream": payload.inputs.stream
}
}
try:
result = handler.predict(data)
except ValueError as ve:
return JSONResponse({"error": str(ve)}, status_code=400)
except Exception as e:
return JSONResponse({"error": "Internal server error"}, status_code=500)
if isinstance(result, types.GeneratorType):
def event_stream():
try:
for chunk in result:
yield f"data: {json.dumps(chunk)}\n\n"
# Add [END] marker after generator ends
yield 'data: "[END]"\n\n'
except Exception as e:
# Send error in stream
yield f'data: "[ERROR] {str(e)}"\n\n'
return StreamingResponse(event_stream(), media_type="text/event-stream")
return JSONResponse(content=result)