|
import httpx |
|
import asyncio |
|
import json |
|
from typing import List, Dict, Optional, Any |
|
|
|
|
|
import config as app_config |
|
|
|
_model_cache: Optional[Dict[str, List[str]]] = None |
|
_cache_lock = asyncio.Lock() |
|
|
|
async def fetch_and_parse_models_config() -> Optional[Dict[str, List[str]]]: |
|
""" |
|
Fetches the model configuration JSON from the URL specified in app_config. |
|
Parses it and returns a dictionary with 'vertex_models' and 'vertex_express_models'. |
|
Returns None if fetching or parsing fails. |
|
""" |
|
if not app_config.MODELS_CONFIG_URL: |
|
print("ERROR: MODELS_CONFIG_URL is not set in the environment/config.") |
|
return None |
|
|
|
print(f"Fetching model configuration from: {app_config.MODELS_CONFIG_URL}") |
|
try: |
|
async with httpx.AsyncClient() as client: |
|
response = await client.get(app_config.MODELS_CONFIG_URL) |
|
response.raise_for_status() |
|
data = response.json() |
|
|
|
|
|
if isinstance(data, dict) and \ |
|
"vertex_models" in data and isinstance(data["vertex_models"], list) and \ |
|
"vertex_express_models" in data and isinstance(data["vertex_express_models"], list): |
|
print("Successfully fetched and parsed model configuration.") |
|
|
|
|
|
prefixed_express_models = [f"[EXPRESS] {model_name}" for model_name in data["vertex_express_models"]] |
|
|
|
return { |
|
"vertex_models": data["vertex_models"], |
|
"vertex_express_models": prefixed_express_models |
|
} |
|
else: |
|
print(f"ERROR: Fetched model configuration has an invalid structure: {data}") |
|
return None |
|
except httpx.RequestError as e: |
|
print(f"ERROR: HTTP request failed while fetching model configuration: {e}") |
|
return None |
|
except json.JSONDecodeError as e: |
|
print(f"ERROR: Failed to decode JSON from model configuration: {e}") |
|
return None |
|
except Exception as e: |
|
print(f"ERROR: An unexpected error occurred while fetching/parsing model configuration: {e}") |
|
return None |
|
|
|
async def get_models_config() -> Dict[str, List[str]]: |
|
""" |
|
Returns the cached model configuration. |
|
If not cached, fetches and caches it. |
|
Returns a default empty structure if fetching fails. |
|
""" |
|
global _model_cache |
|
async with _cache_lock: |
|
if _model_cache is None: |
|
print("Model cache is empty. Fetching configuration...") |
|
_model_cache = await fetch_and_parse_models_config() |
|
if _model_cache is None: |
|
print("WARNING: Using default empty model configuration due to fetch/parse failure.") |
|
_model_cache = {"vertex_models": [], "vertex_express_models": []} |
|
return _model_cache |
|
|
|
async def get_vertex_models() -> List[str]: |
|
config = await get_models_config() |
|
return config.get("vertex_models", []) |
|
|
|
async def get_vertex_express_models() -> List[str]: |
|
config = await get_models_config() |
|
return config.get("vertex_express_models", []) |
|
|
|
async def refresh_models_config_cache() -> bool: |
|
""" |
|
Forces a refresh of the model configuration cache. |
|
Returns True if successful, False otherwise. |
|
""" |
|
global _model_cache |
|
print("Attempting to refresh model configuration cache...") |
|
async with _cache_lock: |
|
new_config = await fetch_and_parse_models_config() |
|
if new_config is not None: |
|
_model_cache = new_config |
|
print("Model configuration cache refreshed successfully.") |
|
return True |
|
else: |
|
print("ERROR: Failed to refresh model configuration cache.") |
|
|
|
|
|
return False |