import asyncio import json import random import re import string import time import uuid from datetime import datetime, timezone from typing import Any, Dict, List, Optional import boto3 import httpx import tiktoken import platform import hashlib from fastapi import HTTPException from api.config import ( MODEL_MAPPING, get_headers_api_chat, get_headers_chat, BASE_URL, AGENT_MODE, TRENDING_AGENT_MODE, MODEL_PREFIXES ) from api.logger import setup_logger from api.models import ChatRequest from api.validate import getHid # Import the asynchronous getHid function logger = setup_logger(__name__) # --------------------------------------------- # CLOUDFLARE R2 CONFIGURATION # --------------------------------------------- R2_ACCESS_KEY_ID = "df9c9eb87e850a8eb27afd3968077b42" R2_SECRET_ACCESS_KEY = "14b08b0855263bb63d2618da3a6537e1b0446d89d51da03a568620b1e5342ea8" R2_ENDPOINT_URL = "https://f2f92ac53fae792c4155f6e93a514989.r2.cloudflarestorage.com" R2_BUCKET_NAME = "snapzion" R2_REPLACED_URLS_KEY = "snapzion.txt" s3 = boto3.client( "s3", endpoint_url=R2_ENDPOINT_URL, aws_access_key_id=R2_ACCESS_KEY_ID, aws_secret_access_key=R2_SECRET_ACCESS_KEY, ) BLOCKED_MESSAGE = ( "Generated by BLACKBOX.AI, try unlimited chat https://www.blackbox.ai " "and for API requests replace https://www.blackbox.ai with https://api.blackbox.ai" ) # --------------------------------------------- # RANDOM USER-DATA GENERATION # --------------------------------------------- def get_random_name_email_customer(): first_names = ["Aliace", "B21ob", "Car232ol", "Daavid", "Evewwlyn", "Fraank", "Grssace", "Hefctor", "Ivgy", "Jackdie"] last_names = ["Smilth", "Johnkson", "Dajvis", "Mihller", "Thomgpson", "Garwcia", "Broawn", "Wilfson", "Maartin", "Clarak"] random_name = f"{random.choice(first_names)} {random.choice(last_names)}" email_username = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8)) random_email = f"{email_username}@gmail.com" suffix_length = len("Rldf7IKdNhdhiw") suffix_chars = string.ascii_letters + string.digits random_suffix = ''.join(random.choice(suffix_chars) for _ in range(suffix_length)) random_customer_id = f"cus_{random_suffix}" return random_name, random_email, random_customer_id def generate_system_fingerprint() -> str: raw_data = f"{platform.node()}-{time.time()}-{uuid.uuid4()}" short_hash = hashlib.md5(raw_data.encode()).hexdigest()[:12] return f"fp_{short_hash}" def get_last_user_prompt(messages: List[Any]) -> str: for msg in reversed(messages): if msg.role == "user": if isinstance(msg.content, str): return msg.content.strip() elif isinstance(msg.content, list): for item in msg.content: if item.get("type") == "text": return item.get("text", "").strip() return "" def upload_replaced_urls_to_r2(urls: List[str], alt_text: str = "") -> None: if not urls: logger.info("No replaced or final Snapzion URLs to store. Skipping snapzion.txt update.") return existing_data = "" try: response = s3.get_object(Bucket=R2_BUCKET_NAME, Key=R2_REPLACED_URLS_KEY) existing_data = response['Body'].read().decode('utf-8') logger.info("Successfully read existing snapzion.txt from R2.") except s3.exceptions.NoSuchKey: logger.info("snapzion.txt does not exist yet. Will create a new one.") except Exception as e: logger.error(f"Error reading snapzion.txt from R2: {e}") alt_text = alt_text.strip() markdown_lines = [f"![{alt_text}]({url})" for url in urls] to_append = "\n".join(markdown_lines) updated_content = (existing_data + "\n" + to_append) if existing_data.strip() else to_append try: s3.put_object( Bucket=R2_BUCKET_NAME, Key=R2_REPLACED_URLS_KEY, Body=updated_content.encode("utf-8"), ContentType="text/plain", ) logger.info(f"Appended {len(urls)} new URLs to snapzion.txt in R2 (in Markdown format).") except Exception as e: logger.error(f"Failed to upload replaced URLs to R2: {e}") def calculate_tokens(text: str, model: str) -> int: try: encoding = tiktoken.encoding_for_model(model) tokens = encoding.encode(text) return len(tokens) except KeyError: logger.warning(f"Model '{model}' not supported by tiktoken for token counting. Using a generic method.") return len(text.split()) def create_chat_completion_data( content: str, model: str, timestamp: int, request_id: str, system_fingerprint: str, prompt_tokens: int = 0, completion_tokens: int = 0, finish_reason: Optional[str] = None, ) -> Dict[str, Any]: usage = None if finish_reason == "stop": usage = { "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": prompt_tokens + completion_tokens, } return { "id": request_id, "object": "chat.completion.chunk" if finish_reason is None else "chat.completion", "created": timestamp, "model": model, "system_fingerprint": system_fingerprint, "choices": [{ "index": 0, "delta" if finish_reason is None else "message": { "content": content, "role": "assistant" }, "finish_reason": finish_reason }], "usage": usage, } def message_to_dict(message, model_prefix: Optional[str] = None): content = "" images_data = [] image_urls = [] if isinstance(message.content, list): for item in message.content: if item.get("type") == "text": content = item.get("text", "").strip() elif item.get("type") == "image_url" and len(images_data) < 3: image_url = item["image_url"].get("url", "") if image_url: file_path = f"MultipleFiles/{uuid.uuid4().hex}.jpg" images_data.append({"filePath": file_path, "contents": image_url}) image_urls.append({"image_url": {"url": image_url}}) elif isinstance(message.content, str): content = message.content.strip() if model_prefix and content: content = f"{model_prefix} {content}" base = {"role": message.role, "content": content} if images_data: base["data"] = { "imageBase64": images_data[0]["contents"], "fileText": "", "title": "snapshot", "imagesData": images_data } for img in image_urls[1:]: base["content"] = [base["content"], img] return base def strip_model_prefix(content: str, model_prefix: Optional[str] = None) -> str: if model_prefix and content.startswith(model_prefix): logger.debug(f"Stripping prefix '{model_prefix}' from content.") return content[len(model_prefix):].strip() return content # --------------------------------------------- # STREAMING RESPONSE HANDLER # --------------------------------------------- async def process_streaming_response(request: ChatRequest): # Generate a fixed-length session ID on each request session_id = str(uuid.uuid4()) # 36-character UUID is_new_user = False # set your own logic here system_fingerprint = generate_system_fingerprint() random_name, random_email, random_customer_id = get_random_name_email_customer() request_id = f"chatcmpl-{uuid.uuid4()}" logger.info(f"Processing request (stream) {request_id} - Model: {request.model}") agent_mode = AGENT_MODE.get(request.model, {}) trending_agent_mode = TRENDING_AGENT_MODE.get(request.model, {}) model_prefix = MODEL_PREFIXES.get(request.model, "") headers_api_chat = get_headers_api_chat(BASE_URL) if request.model == "o1-preview": await asyncio.sleep(random.randint(1, 60)) h_value = await getHid() if not h_value: logger.error("No h-value for validation.") raise HTTPException(status_code=500, detail="Missing h-value.") messages = [message_to_dict(msg, model_prefix=model_prefix) for msg in request.messages] json_data = { "agentMode": agent_mode, "clickedAnswer2": False, "clickedAnswer3": False, "reasoningMode": False, "clickedForceWebSearch": False, "codeInterpreterMode": False, "codeModelMode": True, "githubToken": "", "deepSearchMode": False, "domains": None, "id": request_id, "imageGenerationMode": False, "isChromeExt": False, "isMicMode": False, "isPremium": True, "isMemoryEnabled": False, "maxTokens": request.max_tokens, "messages": messages, "mobileClient": False, "playgroundTemperature": request.temperature, "playgroundTopP": request.top_p, "previewToken": None, "trendingAgentMode": trending_agent_mode, "userId": None, "userSelectedModel": MODEL_MAPPING.get(request.model, request.model), "userSystemPrompt": None, "validated": h_value, "visitFromDelta": False, "webSearchModePrompt": False, "vscodeClient": False, "designerMode": False, "workspaceId": "", "beastMode": False, "customProfile": { "name": "", "occupation": "", "traits": [], "additionalInfo": "", "enableNewChats": False }, "webSearchModeOption": { "autoMode": False, "webMode": False, "offlineMode": True }, # Insert the session with new id and isNewUser flag "session": { "user": { "name": random_name, "email": random_email, "image": "https://lh3.googleusercontent.com/a/...=s96-c", "subscriptionStatus": "PREMIUM" }, "expires": datetime.now(timezone.utc) .isoformat(timespec='milliseconds') .replace('+00:00', 'Z'), "subscriptionCache": { "customerId": random_customer_id, "status": "PREMIUM", "isTrialSubscription": "False", "expiryTimestamp": 1744652408, "lastChecked": int(time.time() * 1000) }, "id": session_id, "isNewUser": is_new_user } } prompt_tokens = sum( calculate_tokens(msg["content"], request.model) for msg in messages if "content" in msg ) completion_tokens = 0 final_snapzion_links: List[str] = [] async with httpx.AsyncClient() as client: try: async with client.stream( "POST", f"{BASE_URL}/api/chat", headers=headers_api_chat, json=json_data, timeout=100 ) as response: response.raise_for_status() async for chunk in response.aiter_text(): timestamp = int(datetime.now().timestamp()) if not chunk: continue if chunk.startswith("$@$v=undefined-rv1$@$"): chunk = chunk[21:] if BLOCKED_MESSAGE in chunk: chunk = chunk.replace(BLOCKED_MESSAGE, "").strip() if not chunk: continue if "https://storage.googleapis.com" in chunk: chunk = chunk.replace("https://storage.googleapis.com", "https://cdn.snapzion.com") snapzion_urls = re.findall(r"(https://cdn\.snapzion\.com[^\s\)]+)", chunk) final_snapzion_links.extend(snapzion_urls) cleaned = strip_model_prefix(chunk, model_prefix) completion_tokens += calculate_tokens(cleaned, request.model) yield "data: " + json.dumps( create_chat_completion_data(cleaned, request.model, timestamp, request_id, system_fingerprint, prompt_tokens, completion_tokens) ) + "\n\n" # send final stop event yield "data: " + json.dumps( create_chat_completion_data( "", request.model, int(datetime.now().timestamp()), request_id, system_fingerprint, prompt_tokens, completion_tokens, "stop" ) ) + "\n\n" yield "data: [DONE]\n\n" except httpx.HTTPStatusError as e: logger.error(f"HTTP error (stream) {request_id}: {e}") error_message = f"HTTP error occurred: {e}" try: details = e.response.json() error_message += f" Details: {details}" except ValueError: error_message += f" Response body: {e.response.text}" yield "data: " + json.dumps( create_chat_completion_data(error_message, request.model, int(datetime.now().timestamp()), request_id, system_fingerprint, prompt_tokens, completion_tokens, "error") ) + "\n\n" yield "data: [DONE]\n\n" except (httpx.RequestError, Exception) as e: logger.error(f"Request error (stream) {request_id}: {e}") error_message = f"Request error occurred: {e}" yield "data: " + json.dumps( create_chat_completion_data(error_message, request.model, int(datetime.now().timestamp()), request_id, system_fingerprint, prompt_tokens, completion_tokens, "error") ) + "\n\n" yield "data: [DONE]\n\n" last_user_prompt = get_last_user_prompt(request.messages) upload_replaced_urls_to_r2(final_snapzion_links, alt_text=last_user_prompt) # --------------------------------------------- # NON-STREAMING RESPONSE HANDLER # --------------------------------------------- async def process_non_streaming_response(request: ChatRequest): # Generate a fixed-length session ID on each request session_id = str(uuid.uuid4()) is_new_user = False system_fingerprint = generate_system_fingerprint() random_name, random_email, random_customer_id = get_random_name_email_customer() request_id = f"chatcmpl-{uuid.uuid4()}" logger.info(f"Processing request (non-stream) {request_id} - Model: {request.model}") agent_mode = AGENT_MODE.get(request.model, {}) trending_agent_mode = TRENDING_AGENT_MODE.get(request.model, {}) model_prefix = MODEL_PREFIXES.get(request.model, "") headers_api_chat = get_headers_api_chat(BASE_URL) headers_chat = get_headers_chat( BASE_URL, next_action=str(uuid.uuid4()), next_router_state_tree=json.dumps([""]) ) if request.model == "o1-preview": await asyncio.sleep(random.randint(20, 60)) # You could also use: h_value = await getHid() h_value = "00f37b34-a166-4efb-bce5-1312d87f2f94" if not h_value: logger.error("Failed to retrieve h-value.") raise HTTPException(status_code=500, detail="Missing h-value.") messages = [message_to_dict(msg, model_prefix=model_prefix) for msg in request.messages] json_data = { "agentMode": agent_mode, "clickedAnswer2": False, "clickedAnswer3": False, "reasoningMode": False, "clickedForceWebSearch": False, "codeInterpreterMode": False, "codeModelMode": True, "githubToken": "", "deepSearchMode": False, "domains": None, "id": request_id, "imageGenerationMode": False, "isChromeExt": False, "isMicMode": False, "isPremium": True, "isMemoryEnabled": False, "maxTokens": request.max_tokens, "messages": messages, "mobileClient": False, "playgroundTemperature": request.temperature, "playgroundTopP": request.top_p, "previewToken": None, "trendingAgentMode": trending_agent_mode, "userId": None, "userSelectedModel": MODEL_MAPPING.get(request.model, request.model), "userSystemPrompt": None, "validated": h_value, "visitFromDelta": False, "webSearchModePrompt": False, "vscodeClient": False, "designerMode": False, "workspaceId": "", "beastMode": False, "customProfile": { "name": "", "occupation": "", "traits": [], "additionalInfo": "", "enableNewChats": False }, "webSearchModeOption": { "autoMode": False, "webMode": False, "offlineMode": True }, # Insert the session with new id and isNewUser flag "session": { "user": { "name": random_name, "email": random_email, "image": "https://lh3.googleusercontent.com/a/...=s96-c", "subscriptionStatus": "PREMIUM" }, "expires": datetime.now(timezone.utc) .isoformat(timespec='milliseconds') .replace('+00:00', 'Z'), "subscriptionCache": { "customerId": random_customer_id, "status": "PREMIUM", "isTrialSubscription": "False", "expiryTimestamp": 1744652408, "lastChecked": int(time.time() * 1000) }, "id": session_id, "isNewUser": is_new_user } } prompt_tokens = sum( calculate_tokens(msg["content"], request.model) for msg in messages if "content" in msg ) full_response = "" final_snapzion_links: List[str] = [] async with httpx.AsyncClient() as client: try: async with client.stream( "POST", f"{BASE_URL}/api/chat", headers=headers_api_chat, json=json_data ) as response: response.raise_for_status() async for chunk in response.aiter_text(): full_response += chunk except httpx.HTTPStatusError as e: logger.error(f"HTTP error (non-stream) {request_id}: {e}") error_message = f"HTTP error occurred: {e}" try: error_details = e.response.json() error_message += f" Details: {error_details}" except ValueError: error_message += f" Response body: {e.response.text}" return { "id": request_id, "object": "chat.completion", "created": int(datetime.now().timestamp()), "model": request.model, "system_fingerprint": system_fingerprint, "choices": [{ "index": 0, "message": {"role": "assistant", "content": error_message}, "finish_reason": "error" }], "usage": {"prompt_tokens": prompt_tokens, "completion_tokens": 0, "total_tokens": prompt_tokens} } except Exception as e: logger.error(f"Unexpected error (non-stream) {request_id}: {e}") error_message = f"An unexpected error occurred: {e}" return { "id": request_id, "object": "chat.completion", "created": int(datetime.now().timestamp()), "model": request.model, "system_fingerprint": system_fingerprint, "choices": [{ "index": 0, "message": {"role": "assistant", "content": error_message}, "finish_reason": "error" }], "usage": {"prompt_tokens": prompt_tokens, "completion_tokens": 0, "total_tokens": prompt_tokens} } # Post-process response if full_response.startswith("$@$v=undefined-rv1$@$"): full_response = full_response[21:] if BLOCKED_MESSAGE in full_response: full_response = full_response.replace(BLOCKED_MESSAGE, "").strip() if not full_response: raise HTTPException(status_code=500, detail="Blocked message in response.") if "https://storage.googleapis.com" in full_response: full_response = full_response.replace("https://storage.googleapis.com", "https://cdn.snapzion.com") snapzion_urls = re.findall(r"(https://cdn\.snapzion\.com[^\s\)]+)", full_response) final_snapzion_links.extend(snapzion_urls) cleaned = strip_model_prefix(full_response, model_prefix) completion_tokens = calculate_tokens(cleaned, request.model) last_user_prompt = get_last_user_prompt(request.messages) upload_replaced_urls_to_r2(final_snapzion_links, alt_text=last_user_prompt) return { "id": request_id, "object": "chat.completion", "created": int(datetime.now().timestamp()), "model": request.model, "system_fingerprint": system_fingerprint, "choices": [{ "index": 0, "message": {"role": "assistant", "content": cleaned}, "finish_reason": "stop" }], "usage": {"prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": prompt_tokens + completion_tokens} }