import os from dotenv import load_dotenv from fastapi import FastAPI, HTTPException, Request from fastapi.responses import StreamingResponse, HTMLResponse, JSONResponse, FileResponse from pydantic import BaseModel import httpx from pathlib import Path # Import Path from pathlib import requests import re import cloudscraper import json from typing import Optional import datetime load_dotenv() app = FastAPI() # Get API keys and secret endpoint from environment variables api_keys_str = os.getenv('API_KEYS') valid_api_keys = api_keys_str.split(',') if api_keys_str else [] secret_api_endpoint = os.getenv('SECRET_API_ENDPOINT') secret_api_endpoint_2 = os.getenv('SECRET_API_ENDPOINT_2') secret_api_endpoint_3 = os.getenv('SECRET_API_ENDPOINT_3') # New endpoint for searchgpt # Validate if the main secret API endpoints are set if not secret_api_endpoint or not secret_api_endpoint_2 or not secret_api_endpoint_3: raise HTTPException(status_code=500, detail="API endpoint(s) are not configured in environment variables.") # Define models that should use the secondary endpoint alternate_models = {"gpt-4o-mini", "claude-3-haiku", "llama-3.1-70b", "mixtral-8x7b"} class Payload(BaseModel): model: str messages: list stream: bool @app.get("/favicon.ico") async def favicon(): # The favicon.ico file is in the same directory as the app favicon_path = Path(__file__).parent / "favicon.ico" return FileResponse(favicon_path, media_type="image/x-icon") def generate_search(query: str, systemprompt: Optional[str] = None, stream: bool = True) -> str: headers = {"User-Agent": ""} # Use the provided system prompt, or default to "Be Helpful and Friendly" system_message = systemprompt or "Be Helpful and Friendly" # Create the prompt history with the user query and system message prompt = [ {"role": "user", "content": query}, ] prompt.insert(0, {"content": system_message, "role": "system"}) # Prepare the payload for the API request payload = { "is_vscode_extension": True, "message_history": prompt, "requested_model": "searchgpt", "user_input": prompt[-1]["content"], } # Send the request to the chat endpoint response = requests.post(secret_api_endpoint_3, headers=headers, json=payload, stream=True) streaming_text = "" # Process the streaming response for value in response.iter_lines(decode_unicode=True): if value.startswith("data: "): try: json_modified_value = json.loads(value[6:]) content = json_modified_value.get("choices", [{}])[0].get("delta", {}).get("content", "") if content.strip(): # Only process non-empty content cleaned_response = { "created": json_modified_value.get("created"), "id": json_modified_value.get("id"), "model": "searchgpt", "object": "chat.completion", "choices": [ { "message": { "content": content } } ] } if stream: yield f"data: {json.dumps(cleaned_response)}\n\n" streaming_text += content except json.JSONDecodeError: continue if not stream: yield streaming_text @app.get("/searchgpt") async def search_gpt(q: str, stream: Optional[bool] = False, systemprompt: Optional[str] = None): if not q: raise HTTPException(status_code=400, detail="Query parameter 'q' is required") if stream: return StreamingResponse( generate_search(q, systemprompt=systemprompt, stream=True), media_type="text/event-stream" ) else: # For non-streaming, collect the text and return as JSON response response_text = "".join([chunk for chunk in generate_search(q, systemprompt=systemprompt, stream=False)]) return JSONResponse(content={"response": response_text}) @app.get("/", response_class=HTMLResponse) async def root(): # Open and read the content of index.html (in the same folder as the app) file_path = "index.html" try: with open(file_path, "r") as file: html_content = file.read() return HTMLResponse(content=html_content) except FileNotFoundError: return HTMLResponse(content="