import os import re from dotenv import load_dotenv from fastapi import FastAPI, HTTPException, Request, Depends, Security, Query from fastapi.responses import StreamingResponse, HTMLResponse, JSONResponse, FileResponse, PlainTextResponse from fastapi.security import APIKeyHeader from pydantic import BaseModel import httpx from functools import lru_cache from pathlib import Path import json import datetime import time import threading from typing import Optional, Dict, List, Any, Generator import asyncio from starlette.status import HTTP_403_FORBIDDEN import cloudscraper from concurrent.futures import ThreadPoolExecutor import uvloop from fastapi.middleware.gzip import GZipMiddleware from starlette.middleware.cors import CORSMiddleware import contextlib import requests asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) executor = ThreadPoolExecutor(max_workers=16) load_dotenv() api_key_header = APIKeyHeader(name="Authorization", auto_error=False) from usage_tracker import UsageTracker usage_tracker = UsageTracker() app = FastAPI() app.add_middleware(GZipMiddleware, minimum_size=1000) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @lru_cache(maxsize=1) def get_env_vars(): """ Loads and caches environment variables. This function is memoized to avoid re-reading .env file on every call, improving performance. """ return { 'api_keys': os.getenv('API_KEYS', '').split(','), 'secret_api_endpoint': os.getenv('SECRET_API_ENDPOINT'), 'secret_api_endpoint_2': os.getenv('SECRET_API_ENDPOINT_2'), 'secret_api_endpoint_3': os.getenv('SECRET_API_ENDPOINT_3'), 'secret_api_endpoint_4': os.getenv('SECRET_API_ENDPOINT_4', "https://text.pollinations.ai/openai"), 'secret_api_endpoint_5': os.getenv('SECRET_API_ENDPOINT_5'), 'secret_api_endpoint_6': os.getenv('SECRET_API_ENDPOINT_6'), # New endpoint for Gemini 'mistral_api': os.getenv('MISTRAL_API', "https://api.mistral.ai"), 'mistral_key': os.getenv('MISTRAL_KEY'), 'gemini_key': os.getenv('GEMINI_KEY'), # Gemini API Key 'endpoint_origin': os.getenv('ENDPOINT_ORIGIN'), 'new_img': os.getenv('NEW_IMG') # For image generation API } # Define sets of models for different API endpoints for easier routing mistral_models = { "mistral-large-latest", "pixtral-large-latest", "mistral-moderation-latest", "ministral-3b-latest", "ministral-8b-latest", "open-mistral-nemo", "mistral-small-latest", "mistral-saba-latest", "codestral-latest" } pollinations_models = { "openai", "openai-large", "openai-fast", "openai-xlarge", "openai-reasoning", "qwen-coder", "llama", "mistral", "searchgpt", "deepseek", "claude-hybridspace", "deepseek-r1", "deepseek-reasoner", "llamalight", "gemini", "gemini-thinking", "hormoz", "phi", "phi-mini", "openai-audio", "llama-scaleway" } alternate_models = { "o1", "llama-4-scout", "o4-mini", "sonar", "sonar-pro", "sonar-reasoning", "sonar-reasoning-pro", "grok-3", "grok-3-fast", "r1-1776", "o3" } claude_3_models = { "claude-3-7-sonnet", "claude-3-7-sonnet-thinking", "claude 3.5 haiku", "claude 3.5 sonnet", "claude 3.5 haiku", "o3-mini-medium", "o3-mini-high", "grok-3", "grok-3-thinking", "grok 2" } gemini_models = { "gemini-1.5-pro", "gemini-1.5-flash", "gemini-2.0-flash-lite-preview", "gemini-2.0-flash", "gemini-2.0-flash-thinking", # aka Reasoning "gemini-2.0-flash-preview-image-generation", "gemini-2.5-flash", "gemini-2.5-pro-exp", "gemini-exp-1206" } supported_image_models = { "Flux Pro Ultra", "grok-2-aurora", "Flux Pro", "Flux Pro Ultra Raw", "Flux Dev", "Flux Schnell", "stable-diffusion-3-large-turbo", "Flux Realism", "stable-diffusion-ultra", "dall-e-3", "sdxl-lightning-4step" } class Payload(BaseModel): """Pydantic model for chat completion requests.""" model: str messages: list stream: bool = False class ImageGenerationPayload(BaseModel): """Pydantic model for image generation requests.""" model: str prompt: str size: str = "1024x1024" # Default size, assuming models support it number: int = 1 server_status = True # Global flag for server maintenance status available_model_ids: List[str] = [] # List of all available model IDs @lru_cache(maxsize=1) def get_async_client(): """Returns a memoized httpx.AsyncClient instance for making async HTTP requests.""" return httpx.AsyncClient( timeout=60.0, limits=httpx.Limits(max_keepalive_connections=50, max_connections=200) ) scraper_pool = [] MAX_SCRAPERS = 20 def get_scraper(): """Retrieves a cloudscraper instance from a pool for web scraping.""" if not scraper_pool: # Initialize the pool if it's empty (should be done at startup) for _ in range(MAX_SCRAPERS): scraper_pool.append(cloudscraper.create_scraper()) # Simple round-robin selection from the pool return scraper_pool[int(time.time() * 1000) % MAX_SCRAPERS] async def verify_api_key( request: Request, api_key: str = Security(api_key_header) ) -> bool: """ Verifies the API key provided in the Authorization header. Allows access without API key if the request comes from specific Hugging Face spaces. """ referer = request.headers.get("referer", "") if referer.startswith(("https://parthsadaria-lokiai.hf.space/playground", "https://parthsadaria-lokiai.hf.space/image-playground")): return True if not api_key: raise HTTPException( status_code=HTTP_403_FORBIDDEN, detail="No API key provided" ) if api_key.startswith('Bearer '): api_key = api_key[7:] valid_api_keys = get_env_vars().get('api_keys', []) if not valid_api_keys or valid_api_keys == ['']: raise HTTPException( status_code=HTTP_403_FORBIDDEN, detail="API keys not configured on server" ) if api_key not in set(valid_api_keys): raise HTTPException( status_code=HTTP_403_FORBIDDEN, detail="Invalid API key" ) return True @lru_cache(maxsize=1) def load_models_data(): """Loads model data from 'models.json' and caches it.""" try: file_path = Path(__file__).parent / 'models.json' with open(file_path, 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError) as e: print(f"Error loading models.json: {str(e)}") return [] @app.get("/api/v1/models") @app.get("/models") async def get_models(): """Returns the list of available models.""" models_data = load_models_data() if not models_data: raise HTTPException(status_code=500, detail="Error loading available models") return models_data async def generate_search_async(query: str, systemprompt: Optional[str] = None, stream: bool = True): """ Asynchronously generates a response using a search-based model. Streams results if `stream` is True. """ queue = asyncio.Queue() async def _fetch_search_data(): """Internal helper to fetch data from the search API and put into queue.""" try: headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"} system_message = systemprompt or "Be Helpful and Friendly" prompt = [{"role": "user", "content": query}] prompt.insert(0, {"content": system_message, "role": "system"}) payload = { "is_vscode_extension": True, "message_history": prompt, "requested_model": "searchgpt", "user_input": prompt[-1]["content"], } secret_api_endpoint_3 = get_env_vars()['secret_api_endpoint_3'] if not secret_api_endpoint_3: await queue.put({"error": "Search API endpoint not configured"}) return async with httpx.AsyncClient(timeout=30.0) as client: async with client.stream("POST", secret_api_endpoint_3, json=payload, headers=headers) as response: if response.status_code != 200: error_detail = await response.text() await queue.put({"error": f"Search API returned status code {response.status_code}: {error_detail}"}) return buffer = "" async for line in response.aiter_lines(): if line.startswith("data: "): try: json_data = json.loads(line[6:]) content = json_data.get("choices", [{}])[0].get("delta", {}).get("content", "") if content.strip(): cleaned_response = { "created": json_data.get("created"), "id": json_data.get("id"), "model": "searchgpt", "object": "chat.completion", "choices": [ { "message": { "content": content } } ] } await queue.put({"data": f"data: {json.dumps(cleaned_response)}\n\n", "text": content}) except json.JSONDecodeError: # If line is not valid JSON, treat it as raw text and pass through if it's the end of stream if line.strip() == "[DONE]": continue # This is usually handled by the aiter_lines loop finishing print(f"Warning: Could not decode JSON from search API stream: {line}") await queue.put({"error": f"Invalid JSON from search API: {line}"}) break # Stop processing on bad JSON await queue.put(None) # Signal end of stream except Exception as e: print(f"Error in _fetch_search_data: {e}") await queue.put({"error": str(e)}) await queue.put(None) asyncio.create_task(_fetch_search_data()) return queue @lru_cache(maxsize=10) def read_html_file(file_path): """Reads content of an HTML file and caches it.""" try: with open(file_path, "r") as file: return file.read() except FileNotFoundError: return None # Static file routes for basic web assets @app.get("/favicon.ico") async def favicon(): favicon_path = Path(__file__).parent / "favicon.ico" return FileResponse(favicon_path, media_type="image/x-icon") @app.get("/banner.jpg") async def banner(): banner_path = Path(__file__).parent / "banner.jpg" return FileResponse(banner_path, media_type="image/jpeg") @app.get("/ping") async def ping(): """Simple health check endpoint.""" return {"message": "pong", "response_time": "0.000000 seconds"} @app.get("/", response_class=HTMLResponse) async def root(): """Serves the main index.html file.""" html_content = read_html_file("index.html") if html_content is None: raise HTTPException(status_code=404, detail="index.html not found") return HTMLResponse(content=html_content) @app.get("/script.js", response_class=HTMLResponse) async def script(): """Serves script.js.""" html_content = read_html_file("script.js") if html_content is None: raise HTTPException(status_code=404, detail="script.js not found") return HTMLResponse(content=html_content) @app.get("/style.css", response_class=HTMLResponse) async def style(): """Serves style.css.""" html_content = read_html_file("style.css") if html_content is None: raise HTTPException(status_code=404, detail="style.css not found") return HTMLResponse(content=html_content) @app.get("/dynamo", response_class=HTMLResponse) async def dynamic_ai_page(request: Request): """ Generates a dynamic HTML page using an AI model based on user-agent and IP. Note: The hardcoded API endpoint and bearer token should ideally be managed more securely, perhaps via environment variables and proper authentication. """ user_agent = request.headers.get('user-agent', 'Unknown User') client_ip = request.client.host if request.client else "Unknown IP" location = f"IP: {client_ip}" prompt = f""" Generate a dynamic HTML page for a user with the following details: with name "LOKI.AI" - User-Agent: {user_agent} - Location: {location} - Style: Cyberpunk, minimalist, or retro Make sure the HTML is clean and includes a heading, also have cool animations a motivational message, and a cool background. Wrap the generated HTML in triple backticks (```). """ payload = { "model": "mistral-small-latest", "messages": [{"role": "user", "content": prompt}] } # Using the local /chat/completions endpoint for internal model call # This assumes the current server can proxy to Mistral. # For production, consider direct calls if not proxying is needed. headers = { "Authorization": "Bearer playground" # Use a dedicated internal token if available } try: # Use httpx.AsyncClient for making an async request async with httpx.AsyncClient() as client: response = await client.post( f"http://localhost:7860/chat/completions", # Call self or internal API json=payload, headers=headers, timeout=30.0 ) response.raise_for_status() # Raise an exception for bad status codes data = response.json() html_content = None if data and 'choices' in data and len(data['choices']) > 0: message_content = data['choices'][0].get('message', {}).get('content', '') # Extract content within triple backticks match = re.search(r"```(?:html)?(.*?)```", message_content, re.DOTALL) if match: html_content = match.group(1).strip() else: # Fallback: if no backticks, assume the whole content is HTML html_content = message_content.strip() if not html_content: raise HTTPException(status_code=500, detail="Failed to generate HTML content from AI.") return HTMLResponse(content=html_content) except httpx.RequestError as e: print(f"HTTPX Request Error in /dynamo: {e}") raise HTTPException(status_code=500, detail=f"Failed to connect to internal AI service: {e}") except httpx.HTTPStatusError as e: print(f"HTTPX Status Error in /dynamo: {e.response.status_code} - {e.response.text}") raise HTTPException(status_code=e.response.status_code, detail=f"Internal AI service responded with error: {e.response.text}") except Exception as e: print(f"An unexpected error occurred in /dynamo: {e}") raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {e}") @app.get("/scraper", response_class=PlainTextResponse) async def scrape_site(url: str = Query(..., description="URL to scrape")): """ Scrapes the content of a given URL using cloudscraper. Uses await in front of get_scraper().get() for async execution. """ try: # get_scraper() returns a synchronous scraper object, but we are running # it in an async endpoint. For CPU-bound tasks like this, it's better # to offload to a thread pool to not block the event loop. # However, cloudscraper's get method is typically synchronous. # If cloudscraper were truly async, we'd use await. # For now, running in executor to prevent blocking. loop = asyncio.get_running_loop() response_text = await loop.run_in_executor( executor, lambda: get_scraper().get(url).text ) if response_text and len(response_text.strip()) > 0: return PlainTextResponse(response_text) else: raise HTTPException(status_code=500, detail="Scraping returned empty content.") except Exception as e: print(f"Cloudscraper failed: {e}") raise HTTPException(status_code=500, detail=f"Cloudscraper failed: {e}") @app.get("/playground", response_class=HTMLResponse) async def playground(): """Serves the playground.html file.""" html_content = read_html_file("playground.html") if html_content is None: raise HTTPException(status_code=404, detail="playground.html not found") return HTMLResponse(content=html_content) @app.get("/image-playground", response_class=HTMLResponse) async def image_playground(): """Serves the image-playground.html file.""" html_content = read_html_file("image-playground.html") if html_content is None: raise HTTPException(status_code=404, detail="image-playground.html not found") return HTMLResponse(content=html_content) GITHUB_BASE = "[https://raw.githubusercontent.com/Parthsadaria/Vetra/main](https://raw.githubusercontent.com/Parthsadaria/Vetra/main)" FILES = { "html": "index.html", "css": "style.css", "js": "script.js" } async def get_github_file(filename: str) -> Optional[str]: """Fetches a file from a specified GitHub raw URL.""" url = f"{GITHUB_BASE}/{filename}" async with httpx.AsyncClient() as client: try: res = await client.get(url, follow_redirects=True) res.raise_for_status() # Raise an exception for HTTP errors (4xx or 5xx) return res.text except httpx.HTTPStatusError as e: print(f"Error fetching {filename} from GitHub: {e.response.status_code} - {e.response.text}") return None except httpx.RequestError as e: print(f"Request error fetching {filename} from GitHub: {e}") return None @app.get("/vetra", response_class=HTMLResponse) async def serve_vetra(): """ Serves a dynamic HTML page by fetching HTML, CSS, and JS from GitHub and embedding them into a single HTML response. """ html = await get_github_file(FILES["html"]) css = await get_github_file(FILES["css"]) js = await get_github_file(FILES["js"]) if not html: raise HTTPException(status_code=404, detail="index.html not found on GitHub") final_html = html.replace( "", f"" ).replace( "
{usage_data['total_requests']}
{usage_data['unique_ips_total_count']}
{len(usage_data['model_usage_period'])}
{len(usage_data['endpoint_usage_period'])}
Date | Requests | Unique IPs |
---|
Model | Total Requests | First Used | Last Used |
---|
Endpoint | Total Requests | First Used | Last Used |
---|
Timestamp | Model | Endpoint | IP Address | User Agent |
---|
", f"" ) return HTMLResponse(content=final_html) @app.get("/searchgpt") async def search_gpt(q: str, request: Request, stream: Optional[bool] = False, systemprompt: Optional[str] = None): """ Endpoint for search-based AI completion. Records usage and streams results. """ if not q: raise HTTPException(status_code=400, detail="Query parameter 'q' is required") # Record usage for searchgpt endpoint usage_tracker.record_request(request=request, model="searchgpt", endpoint="/searchgpt") queue = await generate_search_async(q, systemprompt=systemprompt, stream=True) if stream: async def stream_generator(): """Generator for streaming search results.""" collected_text = "" while True: item = await queue.get() if item is None: break if "error" in item: # Yield error as a data event so client can handle it gracefully yield f"data: {json.dumps({'error': item['error']})}\n\n" break if "data" in item: yield item["data"] collected_text += item.get("text", "") return StreamingResponse( stream_generator(), media_type="text/event-stream" ) else: # Non-streaming response: collect all chunks and return as JSON collected_text = "" while True: item = await queue.get() if item is None: break if "error" in item: raise HTTPException(status_code=500, detail=item["error"]) collected_text += item.get("text", "") return JSONResponse(content={"response": collected_text}) header_url = os.getenv('HEADER_URL') # This variable should be configured in .env @app.post("/chat/completions") @app.post("/api/v1/chat/completions") async def get_completion(payload: Payload, request: Request, authenticated: bool = Depends(verify_api_key)): """ Proxies chat completion requests to various AI model endpoints based on the model specified in the payload. Records usage and handles streaming responses. """ if not server_status: raise HTTPException( status_code=503, detail="Server is under maintenance. Please try again later." ) model_to_use = payload.model or "gpt-4o-mini" # Default model # Validate if the requested model is available if available_model_ids and model_to_use not in set(available_model_ids): raise HTTPException( status_code=400, detail=f"Model '{model_to_use}' is not available. Check /models for the available model list." ) # Record usage before making the external API call usage_tracker.record_request(request=request, model=model_to_use, endpoint="/chat/completions") payload_dict = payload.dict() payload_dict["model"] = model_to_use # Ensure the payload has the resolved model name stream_enabled = payload_dict.get("stream", True) # Default to streaming if not specified env_vars = get_env_vars() endpoint = None custom_headers = {} target_url_path = "/v1/chat/completions" # Default path for OpenAI-like APIs # Determine the correct endpoint and headers based on the model if model_to_use in mistral_models: endpoint = env_vars['mistral_api'] custom_headers = { "Authorization": f"Bearer {env_vars['mistral_key']}" } elif model_to_use in pollinations_models: endpoint = env_vars['secret_api_endpoint_4'] custom_headers = {} # Pollinations.ai might not require auth elif model_to_use in alternate_models: endpoint = env_vars['secret_api_endpoint_2'] custom_headers = {} elif model_to_use in claude_3_models: endpoint = env_vars['secret_api_endpoint_5'] custom_headers = {} # Assuming no specific auth needed for this proxy elif model_to_use in gemini_models: endpoint = env_vars['secret_api_endpoint_6'] if not endpoint: raise HTTPException(status_code=500, detail="Gemini API endpoint (SECRET_API_ENDPOINT_6) not configured.") if not env_vars['gemini_key']: raise HTTPException(status_code=500, detail="GEMINI_KEY not configured for Gemini models.") custom_headers = { "Authorization": f"Bearer {env_vars['gemini_key']}" } target_url_path = "/chat/completions" # Gemini's specific path else: # Default fallback for other models (e.g., OpenAI compatible APIs) endpoint = env_vars['secret_api_endpoint'] custom_headers = { "Origin": header_url, "Priority": "u=1, i", "Referer": header_url } if not endpoint: raise HTTPException(status_code=500, detail=f"No API endpoint configured for model: {model_to_use}") print(f"Proxying request for model '{model_to_use}' to endpoint: {endpoint}{target_url_path}") async def real_time_stream_generator(): """Generator to stream responses from the upstream API.""" try: async with httpx.AsyncClient(timeout=60.0) as client: # Stream the request to the upstream API async with client.stream("POST", f"{endpoint}{target_url_path}", json=payload_dict, headers=custom_headers) as response: # Handle non-2xx responses from the upstream API if response.status_code >= 400: error_messages = { 400: "Bad request. Verify input data.", 401: "Unauthorized. Invalid API key for upstream service.", 403: "Forbidden. You do not have access to this resource on upstream.", 404: "The requested resource was not found on upstream.", 422: "Unprocessable entity. Check your payload for upstream API.", 500: "Internal server error from upstream API." } detail_message = error_messages.get(response.status_code, f"Upstream error code: {response.status_code}") # Attempt to read upstream error response body for more detail try: error_body = await response.aread() error_json = json.loads(error_body.decode('utf-8')) if 'error' in error_json and 'message' in error_json['error']: detail_message += f" - Upstream detail: {error_json['error']['message']}" elif 'detail' in error_json: detail_message += f" - Upstream detail: {error_json['detail']}" else: detail_message += f" - Upstream raw: {error_body.decode('utf-8')[:200]}..." # Limit for logging except (json.JSONDecodeError, UnicodeDecodeError): detail_message += f" - Upstream raw: {error_body.decode('utf-8', errors='ignore')[:200]}" raise HTTPException(status_code=response.status_code, detail=detail_message) # Yield each line from the upstream stream async for line in response.aiter_lines(): if line: yield line + "\n" except httpx.TimeoutException: raise HTTPException(status_code=504, detail="Request to upstream AI service timed out.") except httpx.RequestError as e: raise HTTPException(status_code=502, detail=f"Failed to connect to upstream AI service: {str(e)}") except Exception as e: # Re-raise HTTPException if it's already one, otherwise wrap in a 500 if isinstance(e, HTTPException): raise e print(f"An unexpected error occurred during chat completion proxy: {e}") raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") if stream_enabled: return StreamingResponse( real_time_stream_generator(), media_type="text/event-stream", headers={ "Content-Type": "text/event-stream", "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # Disable buffering for SSE } ) else: # For non-streaming requests, collect all parts and return a single JSON response response_content_lines = [] async for line in real_time_stream_generator(): response_content_lines.append(line) full_response_text = "".join(response_content_lines) # Parse the concatenated stream data. This often involves stripping "data: " prefix # and combining JSON objects from each line. parsed_data = [] for line in full_response_text.splitlines(): if line.startswith("data: "): try: parsed_data.append(json.loads(line[6:])) except json.JSONDecodeError: print(f"Warning: Could not decode JSON line in non-streaming response: {line}") # Attempt to reconstruct a single coherent JSON response # This logic might need refinement based on actual API response format for non-streaming final_json_response = {} if parsed_data: # Example: For OpenAI-like API, you might want the last 'choices' part # This is a simplification and might need adjustment for other APIs if 'choices' in parsed_data[-1]: final_json_response = parsed_data[-1] else: # Fallback: just return the list of parsed objects final_json_response = {"response_parts": parsed_data} if not final_json_response: # If nothing was parsed, indicate an issue raise HTTPException(status_code=500, detail="No valid JSON response received from upstream API for non-streaming request.") return JSONResponse(content=final_json_response) @app.post("/images/generations") async def create_image(payload: ImageGenerationPayload, request: Request, authenticated: bool = Depends(verify_api_key)): """ Proxies image generation requests to a dedicated image generation API. Records usage. """ if not server_status: raise HTTPException( status_code=503, content={"message": "Server is under maintenance. Please try again later."} ) if payload.model not in supported_image_models: raise HTTPException( status_code=400, detail=f"Model '{payload.model}' is not supported for image generation. Supported models are: {', '.join(supported_image_models)}" ) # Record usage for image generation endpoint usage_tracker.record_request(request=request, model=payload.model, endpoint="/images/generations") api_payload = { "model": payload.model, "prompt": payload.prompt, "size": payload.size, "n": payload.number # Often 'n' for number of images in APIs } target_api_url = get_env_vars().get('new_img') # Get the image API URL from env vars if not target_api_url: raise HTTPException(status_code=500, detail="Image generation API endpoint (NEW_IMG) not configured.") try: async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post(target_api_url, json=api_payload) response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx) return JSONResponse(content=response.json()) except httpx.TimeoutException: raise HTTPException(status_code=504, detail="Image generation request timed out.") except httpx.RequestError as e: raise HTTPException(status_code=502, detail=f"Error connecting to image generation service: {e}") except httpx.HTTPStatusError as e: error_detail = e.response.json().get("detail", f"Image generation failed with status code: {e.response.status_code}") raise HTTPException(status_code=e.response.status_code, detail=error_detail) except Exception as e: print(f"An unexpected error occurred during image generation: {e}") raise HTTPException(status_code=500, detail=f"An unexpected error occurred during image generation: {e}") @app.get("/usage") async def get_usage_json(days: int = 7): """ Returns the raw usage data as JSON. Can specify the number of days for the summary. """ return usage_tracker.get_usage_summary(days) def generate_usage_html(usage_data: Dict[str, Any], days: int = 7): # Added 'days' parameter here """ Generates an HTML page to display usage statistics. Includes tables for model, API endpoint usage, daily usage, and recent requests. Also includes placeholders for Chart.js to render graphs. """ # Prepare data for Chart.js # Model Usage Chart Data model_labels = list(usage_data['model_usage_period'].keys()) model_counts = list(usage_data['model_usage_period'].values()) # Endpoint Usage Chart Data endpoint_labels = list(usage_data['endpoint_usage_period'].keys()) endpoint_counts = list(usage_data['endpoint_usage_period'].values()) # Daily Usage Chart Data daily_dates = list(usage_data['daily_usage_period'].keys()) daily_requests = [data['requests'] for data in usage_data['daily_usage_period'].values()] daily_unique_ips = [data['unique_ips_count'] for data in usage_data['daily_usage_period'].values()] # Format table rows for HTML model_usage_all_time_rows = "\n".join([ f"""
""" for model, stats in usage_data['all_time_model_usage'].items() ]) api_usage_all_time_rows = "\n".join([ f"""
""" for endpoint, stats in usage_data['all_time_endpoint_usage'].items() ]) daily_usage_table_rows = "\n".join([ f"""
""" for date, data in usage_data['daily_usage_period'].items() ]) recent_requests_rows = "\n".join([ f"""
""" for req in usage_data['recent_requests'] ]) html_content = f"""