import os import logging import json from huggingface_hub import model_info, InferenceClient from dotenv import load_dotenv from config.models_config import PREFERRED_PROVIDERS, DEFAULT_BENCHMARK_MODEL, ALTERNATIVE_BENCHMARK_MODELS # Load environment variables once at the module level load_dotenv() # Configure logging logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) def prioritize_providers(providers): """Prioritize preferred providers, keeping all others.""" return sorted(providers, key=lambda provider: provider not in PREFERRED_PROVIDERS) def test_provider(model_name: str, provider: str, verbose: bool = False) -> bool: """ Test if a specific provider is available for a model using InferenceClient Args: model_name: Name of the model provider: Provider to test verbose: Whether to log detailed information Returns: True if the provider is available, False otherwise """ try: load_dotenv() # Get HF token from environment hf_token = os.environ.get("HF_TOKEN") if not hf_token: if verbose: logger.warning("No HF_TOKEN found in environment variables. This will likely cause authentication failures.") print("WARNING: HF_TOKEN is missing. Most model providers require valid authentication.") # Try without token (for providers that accept anonymous requests) return _test_provider_without_token(model_name, provider, verbose) # Get HF organization from environment hf_organization = os.environ.get("HF_ORGANIZATION") if not hf_organization: if verbose: logger.warning("HF_ORGANIZATION not defined in environment") if verbose: logger.info(f"Testing provider {provider} for model {model_name}") # Initialize the InferenceClient with the specific provider try: client = InferenceClient( model=model_name, token=hf_token, provider=provider, bill_to=hf_organization if hf_organization else "yourbench", timeout=3 # Increased timeout to allow model loading ) try: # Use the chat completions method for testing response = client.chat_completion( messages=[{"role": "user", "content": "Hello"}], max_tokens=5 ) if verbose: logger.info(f"Provider {provider} is available for {model_name}") return True except Exception as e: if verbose: error_message = str(e) logger.warning(f"Error with provider {provider}: {error_message}") # Log specific error types if we can identify them if "status_code=429" in error_message: logger.warning(f"Provider {provider} rate limited. You may need to wait or upgrade your plan.") elif "status_code=401" in error_message or "status_code=403" in error_message: logger.warning(f"Authentication failed for provider {provider}. Your HF_TOKEN may be invalid or expired.") print(f"Authentication error with provider {provider}. Please check your HF_TOKEN.") # If authentication fails, try without token (for public models) if verbose: logger.info(f"Trying provider {provider} without authentication") return _test_provider_without_token(model_name, provider, verbose) elif "status_code=503" in error_message: logger.warning(f"Provider {provider} service unavailable. Model may be loading or provider is down.") elif "timed out" in error_message.lower(): logger.warning(f"Timeout error with provider {provider} - request timed out after 10 seconds") return False except Exception as auth_error: if "401" in str(auth_error) or "Unauthorized" in str(auth_error): # If authentication fails, try without token (for public models) if verbose: logger.warning(f"Authentication error with {provider}: {str(auth_error)}. Your HF_TOKEN may be invalid.") print(f"Authentication error detected. Please verify your HF_TOKEN is valid and has appropriate permissions.") return _test_provider_without_token(model_name, provider, verbose) else: if verbose: logger.warning(f"Error creating client for {provider}: {str(auth_error)}") return False except Exception as e: if verbose: logger.warning(f"Error in test_provider: {str(e)}") return False def _test_provider_without_token(model_name: str, provider: str, verbose: bool = False) -> bool: """ Try to test a provider without authentication token Args: model_name: Name of the model provider: Provider to test verbose: Display detailed logs Returns: True if provider is available, False otherwise """ try: if verbose: logger.info(f"Testing provider {provider} for model {model_name} without authentication") # Initialize without token client = InferenceClient( model=model_name, provider=provider, bill_to=os.getenv('HF_ORGANIZATION', 'yourbench'), timeout=3 ) try: # Use the chat completions method for testing response = client.chat_completion( messages=[{"role": "user", "content": "Hello"}], max_tokens=5 ) if verbose: logger.info(f"Provider {provider} is available for {model_name} without authentication") return True except Exception as e: if verbose: logger.warning(f"Error with provider {provider} without authentication: {str(e)}") return False except Exception as e: if verbose: logger.warning(f"Error in _test_provider_without_token: {str(e)}") return False def get_available_model_provider(model_name, verbose=False): """ Get the first available provider for a given model. Args: model_name: Name of the model on the Hub verbose: Whether to log detailed information Returns: First available provider or None if none are available """ try: # Get HF token from environment hf_token = os.environ.get("HF_TOKEN") if not hf_token: if verbose: logger.error("HF_TOKEN not defined in environment") raise ValueError("HF_TOKEN not defined in environment") # Get providers for the model and prioritize them info = None try: # Try with token try: if verbose: logger.info(f"Trying to get model info for {model_name} with auth token") info = model_info(model_name, token=hf_token, expand="inferenceProviderMapping") except Exception as auth_error: # If authentication fails, try without token (for public models) if "401" in str(auth_error) or "Unauthorized" in str(auth_error): if verbose: logger.warning(f"Authentication failed for {model_name}, trying without token") # Try to get info without token try: info = model_info(model_name, expand="inferenceProviderMapping") except Exception as e: if verbose: logger.error(f"Failed to get model info without token: {str(e)}") # As a last resort, return the default provider list to test if verbose: logger.warning(f"Using default providers list as fallback for {model_name}") # Try with default provider list return _test_fallback_providers(model_name, verbose) else: # Other error, re-raise raise auth_error if not info or not hasattr(info, "inference_provider_mapping"): if verbose: logger.info(f"No inference providers found for {model_name}") # Try with default provider list return _test_fallback_providers(model_name, verbose) providers = list(info.inference_provider_mapping.keys()) if not providers: if verbose: logger.info(f"Empty list of providers for {model_name}") # Try with default provider list return _test_fallback_providers(model_name, verbose) except Exception as e: if verbose: logger.error(f"Error retrieving model info for {model_name}: {str(e)}") # Try with default provider list return _test_fallback_providers(model_name, verbose) # Prioritize providers prioritized_providers = prioritize_providers(providers) if verbose: logger.info(f"Available providers for {model_name}: {', '.join(providers)}") logger.info(f"Prioritized providers: {', '.join(prioritized_providers)}") # Test each preferred provider first failed_providers = [] for provider in prioritized_providers: if verbose: logger.info(f"Testing provider {provider} for {model_name}") try: if test_provider(model_name, provider, verbose): if verbose: logger.info(f"Provider {provider} is available for {model_name}") return provider else: failed_providers.append(provider) if verbose: logger.warning(f"Provider {provider} test failed for {model_name}") except Exception as e: failed_providers.append(provider) if verbose: logger.error(f"Exception while testing provider {provider} for {model_name}: {str(e)}") # If all prioritized providers failed, try any remaining providers remaining_providers = [p for p in providers if p not in prioritized_providers and p not in failed_providers] if remaining_providers and verbose: logger.info(f"Trying remaining non-prioritized providers: {', '.join(remaining_providers)}") for provider in remaining_providers: if verbose: logger.info(f"Testing non-prioritized provider {provider} for {model_name}") try: if test_provider(model_name, provider, verbose): if verbose: logger.info(f"Non-prioritized provider {provider} is available for {model_name}") return provider except Exception as e: if verbose: logger.error(f"Exception while testing non-prioritized provider {provider}: {str(e)}") # If we've tried all providers and none worked, log this but don't raise an exception if verbose: logger.error(f"No available providers for {model_name}. Tried {len(failed_providers + remaining_providers)} providers.") return None except Exception as e: if verbose: logger.error(f"Error in get_available_model_provider: {str(e)}") return None def _test_fallback_providers(model_name, verbose=False): """ Fallback function that tests a list of common providers without going through the API Args: model_name: Name of the model verbose: Display detailed logs Returns: The first available provider or None """ # List of providers to test directly default_providers = ["huggingface", "sambanova", "novita", "fireworks-ai", "together", "openai", "anthropic"] if verbose: logger.warning(f"Using fallback providers list for {model_name}: {', '.join(default_providers)}") # Test each provider directly for provider in default_providers: if verbose: logger.info(f"Testing fallback provider {provider} for {model_name}") try: if test_provider(model_name, provider, verbose): if verbose: logger.info(f"FALLBACK: Provider {provider} is available for {model_name}") return provider except Exception as e: if verbose: logger.warning(f"FALLBACK: Error testing provider {provider} for {model_name}: {str(e)}") return None def test_models(verbose=True): """ Test the default model and alternative models, then return a summary of results. Args: verbose: Display detailed logs Returns: A dictionary with test results """ results = { "default_model": None, "working_model": None, "provider": None, "all_models": {}, "available_models": [], "unavailable_models": [] } print("\n===== Checking HuggingFace Authentication =====") # Get HF token hf_token = os.environ.get("HF_TOKEN") if hf_token: print("✅ HF_TOKEN is available") # Check if token has a valid format (simple check) if not hf_token.startswith("hf_"): print("⚠️ WARNING: Your HF_TOKEN does not start with 'hf_' which is unusual. Please verify its format.") # Don't show any token characters, just indicate its presence masked_token = "••••••••••" # Check token validity by testing inference API directly import requests try: # Test with a simple public model (gpt2) test_model = "gpt2" api_url = f"https://api-inference.huggingface.co/models/{test_model}" print(f"Testing token with inference API on public model {test_model}...") headers = {"Authorization": f"Bearer {hf_token}"} payload = {"inputs": "Hello, how are you?"} response = requests.post(api_url, headers=headers, json=payload, timeout=10) if response.status_code in [200, 503]: # 503 = model is loading, but token is accepted print(f"✅ HF_TOKEN validated - Token accepted by the inference API! Status: {response.status_code}") if response.status_code == 503: print("ℹ️ Model is loading, but token is valid") # If token is valid for inference API, also check if we can get # user information (but not blocking if it fails) try: whoami_response = requests.get( "https://huggingface.co/api/whoami", headers={"Authorization": f"Bearer {hf_token}"} ) if whoami_response.status_code == 200: user_info = whoami_response.json() print(f"✅ Additional info - Authenticated as: {user_info.get('name', 'Unknown user')}") # Check if user has access to paid models if user_info.get('canPay', False): print("✅ Your account has payment methods configured - you may have access to premium models") else: print("ℹ️ Your account does not have payment methods configured - access to premium models may be limited") except Exception: # Ignore errors when getting user info pass else: print(f"❌ HF_TOKEN validation failed with status code: {response.status_code}") error_message = "Unknown error" try: error_data = response.json() if "error" in error_data: error_message = error_data["error"] print(f"❌ Error message: {error_message}") except: print(f"❌ Error message: {response.text}") print("⚠️ Most model providers will not work with invalid credentials") # Alternative test with status endpoint try: print("Attempting alternative validation with status endpoint...") status_url = "https://api-inference.huggingface.co/status" status_response = requests.get(status_url, headers=headers, timeout=10) if status_response.status_code == 200: print("✅ Token can access the status endpoint. This is partially good news.") else: print(f"❌ Status endpoint test also failed: {status_response.status_code}") except Exception as e: print(f"❌ Alternative validation also failed: {str(e)}") except Exception as e: print(f"❌ Error validating HF_TOKEN with inference API: {str(e)}") else: print("❌ HF_TOKEN is missing - authentication to HuggingFace API will fail") print("⚠️ Most models and providers require authentication") # Get HF organization hf_organization = os.environ.get("HF_ORGANIZATION") if hf_organization: print(f"✅ HF_ORGANIZATION is available: {hf_organization}") else: print("ℹ️ HF_ORGANIZATION is not set") if verbose: print(f"\n===== Testing main default model: {DEFAULT_BENCHMARK_MODEL} =====") # Test the default model provider = get_available_model_provider(DEFAULT_BENCHMARK_MODEL, verbose=verbose) if provider: if verbose: print(f"\n✅ SUCCESS: Found provider for default model {DEFAULT_BENCHMARK_MODEL}: {provider}") results["default_model"] = DEFAULT_BENCHMARK_MODEL results["working_model"] = DEFAULT_BENCHMARK_MODEL results["provider"] = provider else: if verbose: print(f"\n❌ DEFAULT MODEL FAILED: No provider found for {DEFAULT_BENCHMARK_MODEL}") print("Trying alternative models...") # Try alternative models for alt_model in ALTERNATIVE_BENCHMARK_MODELS: if verbose: print(f"\nTrying alternative model: {alt_model}") alt_provider = get_available_model_provider(alt_model, verbose=verbose) if alt_provider: if verbose: print(f"\n✅ SUCCESS: Found provider for alternative model {alt_model}: {alt_provider}") results["working_model"] = alt_model results["provider"] = alt_provider break elif verbose: print(f"❌ Failed to find provider for alternative model: {alt_model}") else: if verbose: print("\n❌ ALL MODELS FAILED: No provider found for any model") print("\n⚠️ This is likely due to authentication issues with your HF_TOKEN") print("⚠️ Please check your token or try using models that don't require authentication") # Test all models to get an overview models = [ "Qwen/QwQ-32B", "Qwen/Qwen2.5-72B-Instruct", "Qwen/Qwen2.5-32B-Instruct", "meta-llama/Llama-3.1-8B-Instruct", "meta-llama/Llama-3.3-70B-Instruct", "deepseek-ai/DeepSeek-R1-Distill-Llama-70B", "mistralai/Mistral-Small-24B-Instruct-2501", ] if verbose: print("\n===== Testing all available models =====") for model in models: provider = get_available_model_provider(model, verbose) results["all_models"][model] = provider if provider: results["available_models"].append((model, provider)) else: results["unavailable_models"].append(model) if verbose: print("\n===== Results Summary =====") if results["available_models"]: print("Models with available providers:") for model, provider in results["available_models"]: print(f"✅ Model: {model}, Provider: {provider}") else: print("❌ No models with available providers found") print("⚠️ Please check your HF_TOKEN and permissions") if results["unavailable_models"]: print("\nModels with no available providers:") for model in results["unavailable_models"]: print(f"❌ {model}") print(f"\nTotal Available Models: {len(results['available_models'])}") print(f"Total Unavailable Models: {len(results['unavailable_models'])}") return results if __name__ == "__main__": # Run test if script is run directly test_results = test_models(verbose=True)