Spaces:
Running
Running
import os | |
from dotenv import load_dotenv | |
from fastapi import FastAPI, HTTPException, Request | |
from fastapi.responses import StreamingResponse, HTMLResponse, JSONResponse | |
from pydantic import BaseModel | |
import httpx | |
import requests | |
import re | |
import json | |
from typing import Optional | |
load_dotenv() | |
app = FastAPI() | |
# Get API keys and secret endpoint from environment variables | |
api_keys_str = os.getenv('API_KEYS') | |
valid_api_keys = api_keys_str.split(',') if api_keys_str else [] | |
secret_api_endpoint = os.getenv('SECRET_API_ENDPOINT') | |
secret_api_endpoint_2 = os.getenv('SECRET_API_ENDPOINT_2') | |
secret_api_endpoint_3 = os.getenv('SECRET_API_ENDPOINT_3') # New endpoint for searchgpt | |
# Validate if the main secret API endpoints are set | |
if not secret_api_endpoint or not secret_api_endpoint_2 or not secret_api_endpoint_3: | |
raise HTTPException(status_code=500, detail="API endpoint(s) are not configured in environment variables.") | |
# Define models that should use the secondary endpoint | |
alternate_models = {"gpt-4o-mini", "claude-3-haiku", "llama-3.1-70b", "mixtral-8x7b"} | |
class Payload(BaseModel): | |
model: str | |
messages: list | |
stream: bool | |
def generate_search(query: str, stream: bool = True) -> str: | |
headers = {"User-Agent": ""} | |
prompt = [ | |
{"role": "user", "content": query}, | |
] | |
# Insert the system prompt at the beginning of the conversation history | |
prompt.insert(0, {"content": "Be Helpful and Friendly", "role": "system"}) | |
payload = { | |
"is_vscode_extension": True, | |
"message_history": prompt, | |
"requested_model": "searchgpt", | |
"user_input": prompt[-1]["content"], | |
} | |
# Use the newly added SECRET_API_ENDPOINT_3 for the search API call | |
chat_endpoint = secret_api_endpoint_3 | |
response = requests.post(chat_endpoint, headers=headers, json=payload, stream=True) | |
# Collect streamed text content | |
streaming_text = "" | |
for value in response.iter_lines(decode_unicode=True, chunk_size=12): | |
modified_value = re.sub("data:", "", value) | |
if modified_value: | |
try: | |
json_modified_value = json.loads(modified_value) | |
content = json_modified_value["choices"][0]["delta"]["content"] | |
if stream: | |
yield f"data: {content}\n\n" | |
streaming_text += content | |
except: | |
continue | |
if not stream: | |
yield streaming_text | |
async def search_gpt(q: str, stream: Optional[bool] = False): | |
if not q: | |
raise HTTPException(status_code=400, detail="Query parameter 'q' is required") | |
if stream: | |
return StreamingResponse( | |
generate_search(q, stream=True), | |
media_type="text/event-stream" | |
) | |
else: | |
# For non-streaming response, collect all content and return as JSON | |
response_text = "".join([chunk for chunk in generate_search(q, stream=False)]) | |
return JSONResponse(content={"response": response_text}) | |
async def root(): | |
# Open and read the content of index.html (in the same folder as the app) | |
file_path = "index.html" | |
try: | |
with open(file_path, "r") as file: | |
html_content = file.read() | |
return HTMLResponse(content=html_content) | |
except FileNotFoundError: | |
return HTMLResponse(content="<h1>File not found</h1>", status_code=404) | |
async def get_models(): | |
async with httpx.AsyncClient() as client: | |
try: | |
response = await client.get(f"{secret_api_endpoint}/v1/models", timeout=3) | |
response.raise_for_status() | |
return response.json() | |
except httpx.RequestError as e: | |
raise HTTPException(status_code=500, detail=f"Request failed: {e}") | |
async def fetch_models(): | |
return await get_models() | |
async def get_completion(payload: Payload, request: Request): | |
# Use the correct endpoint depending on the model type (no authentication now π) | |
endpoint = secret_api_endpoint_2 if payload.model in alternate_models else secret_api_endpoint | |
# Use the payload directly as it includes stream and other user data | |
payload_dict = payload.dict() | |
print(payload_dict) # coz i m curious af heheheh :) | |
#data is kept to me only so dont worry | |
async def stream_generator(payload_dict): | |
async with httpx.AsyncClient() as client: | |
try: | |
async with client.stream("POST", f"{endpoint}/v1/chat/completions", json=payload_dict, timeout=10) as response: | |
response.raise_for_status() | |
async for line in response.aiter_lines(): | |
if line: | |
yield f"{line}\n" | |
except httpx.HTTPStatusError as status_err: | |
raise HTTPException(status_code=status_err.response.status_code, detail=f"HTTP error: {status_err}") | |
except httpx.RequestError as req_err: | |
raise HTTPException(status_code=500, detail=f"Streaming failed: {req_err}") | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {e}") | |
return StreamingResponse(stream_generator(openai_payload), media_type="application/json") | |
async def startup_event(): | |
print("API endpoints:") | |
print("GET /") | |
print("GET /models") | |
print("GET /searchgpt") # We now have the new search API | |
print("POST /chat/completions") | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) | |