File size: 3,299 Bytes
4a24dbd 47ce483 4a24dbd 47ce483 4a24dbd 77370a4 4a24dbd 77370a4 4a24dbd 47ce483 4a24dbd 47ce483 4a24dbd 47ce483 4a24dbd 47ce483 4a24dbd 47ce483 4a24dbd 47ce483 4a24dbd 47ce483 4a24dbd 47ce483 4a24dbd 47ce483 4a24dbd 70e30f8 4a24dbd 47ce483 4a24dbd 70e30f8 4a24dbd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# main.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Dict, Optional
import uuid
from datetime import datetime, timedelta
import asyncio
import random
app = FastAPI(title="Kairos News API", version="1.0")
# Enable CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# In-memory database simulation
jobs_db: Dict[str, Dict] = {}
class PostRequest(BaseModel):
query: str
topic: str
date: str # Format: "YYYY/MM to YYYY/MM"
class JobStatus(BaseModel):
id: str
status: str # "processing", "completed", "failed"
created_at: datetime
completed_at: Optional[datetime]
request: PostRequest
result: Optional[Dict]
@app.post("/index", response_model=JobStatus)
async def create_job(request: PostRequest, background_tasks: BackgroundTasks):
"""Create a new processing job"""
job_id = str(uuid.uuid4())
# Store initial job data
jobs_db[job_id] = {
"status": "processing",
"created_at": datetime.now(),
"completed_at": None,
"request": request.dict(),
"result": None
}
# Simulate background processing
background_tasks.add_task(process_job, job_id)
return {
"id": job_id,
"status": "processing",
"created_at": jobs_db[job_id]["created_at"],
"completed_at": None,
"request": request,
"result": None
}
@app.get("/loading", response_model=JobStatus)
async def get_job_status(id: str):
"""Check job status with timeout simulation"""
if id not in jobs_db:
raise HTTPException(status_code=404, detail="Job not found")
job = jobs_db[id]
# Simulate random processing time (3-25 seconds)
elapsed = datetime.now() - job["created_at"]
if elapsed < timedelta(seconds=3):
await asyncio.sleep(1) # Artificial delay
# 10% chance of failure for demonstration
if random.random() < 0.1 and job["status"] == "processing":
job["status"] = "failed"
job["result"] = {"error": "Random processing failure"}
return {
"id": id,
"status": job["status"],
"created_at": job["created_at"],
"completed_at": job["completed_at"],
"request": job["request"],
"result": job["result"]
}
async def process_job(job_id: str):
"""Background task to simulate processing"""
await asyncio.sleep(random.uniform(3, 10)) # Random processing time
if job_id in jobs_db:
jobs_db[job_id]["status"] = "completed"
jobs_db[job_id]["completed_at"] = datetime.now()
jobs_db[job_id]["result"] = {
"query": jobs_db[job_id]["request"]["query"],
"topic": jobs_db[job_id]["request"]["topic"],
"date_range": jobs_db[job_id]["request"]["date"],
"analysis": f"Processed results for {jobs_db[job_id]['request']['query']}",
"sources": ["Source A", "Source B", "Source C"],
"summary": "This is a generated summary based on your query."
}
@app.get("/jobs")
async def list_jobs():
"""Debug endpoint to view all jobs"""
return jobs_db |