"""Automatic fallback router for handling provider quota and rate limit errors. This module provides intelligent fallback detection and routing when the primary provider (e.g., Anthropic) encounters quota limitations or rate limits. Features: - Detects quota/rate limit errors from different providers - Automatic fallback to kimi-coding when Anthropic quota is exceeded - Configurable fallback chains with default anthropic -> kimi-coding - Logging and monitoring of fallback events Usage: from agent.fallback_router import ( is_quota_error, get_default_fallback_chain, should_auto_fallback, ) if is_quota_error(error, provider="anthropic"): if should_auto_fallback(provider="anthropic"): fallback_chain = get_default_fallback_chain("anthropic") """ import logging import os from typing import Dict, List, Optional, Any, Tuple logger = logging.getLogger(__name__) # Default fallback chains per provider # Each chain is a list of fallback configurations tried in order DEFAULT_FALLBACK_CHAINS: Dict[str, List[Dict[str, Any]]] = { "anthropic": [ {"provider": "kimi-coding", "model": "kimi-k2.5"}, {"provider": "openrouter", "model": "anthropic/claude-sonnet-4"}, ], "openrouter": [ {"provider": "kimi-coding", "model": "kimi-k2.5"}, {"provider": "zai", "model": "glm-5"}, ], "kimi-coding": [ {"provider": "openrouter", "model": "anthropic/claude-sonnet-4"}, {"provider": "zai", "model": "glm-5"}, ], "zai": [ {"provider": "openrouter", "model": "anthropic/claude-sonnet-4"}, {"provider": "kimi-coding", "model": "kimi-k2.5"}, ], } # Quota/rate limit error patterns by provider # These are matched (case-insensitive) against error messages QUOTA_ERROR_PATTERNS: Dict[str, List[str]] = { "anthropic": [ "rate limit", "ratelimit", "quota exceeded", "quota exceeded", "insufficient quota", "429", "403", "too many requests", "capacity exceeded", "over capacity", "temporarily unavailable", "server overloaded", "resource exhausted", "billing threshold", "credit balance", "payment required", "402", ], "openrouter": [ "rate limit", "ratelimit", "quota exceeded", "insufficient credits", "429", "402", "no endpoints available", "all providers failed", "over capacity", ], "kimi-coding": [ "rate limit", "ratelimit", "quota exceeded", "429", "insufficient balance", ], "zai": [ "rate limit", "ratelimit", "quota exceeded", "429", "insufficient quota", ], } # HTTP status codes indicating quota/rate limit issues QUOTA_STATUS_CODES = {429, 402, 403} def is_quota_error(error: Exception, provider: Optional[str] = None) -> bool: """Detect if an error is quota/rate limit related. Args: error: The exception to check provider: Optional provider name to check provider-specific patterns Returns: True if the error appears to be quota/rate limit related """ if error is None: return False error_str = str(error).lower() error_type = type(error).__name__.lower() # Check for common rate limit exception types if any(term in error_type for term in [ "ratelimit", "rate_limit", "quota", "toomanyrequests", "insufficient_quota", "billing", "payment" ]): return True # Check HTTP status code if available status_code = getattr(error, "status_code", None) if status_code is None: # Try common attribute names for attr in ["code", "http_status", "response_code", "status"]: if hasattr(error, attr): try: status_code = int(getattr(error, attr)) break except (TypeError, ValueError): continue if status_code in QUOTA_STATUS_CODES: return True # Check provider-specific patterns providers_to_check = [provider] if provider else QUOTA_ERROR_PATTERNS.keys() for prov in providers_to_check: patterns = QUOTA_ERROR_PATTERNS.get(prov, []) for pattern in patterns: if pattern.lower() in error_str: logger.debug( "Detected %s quota error pattern '%s' in: %s", prov, pattern, error ) return True # Check generic quota patterns generic_patterns = [ "rate limit exceeded", "quota exceeded", "too many requests", "capacity exceeded", "temporarily unavailable", "try again later", "resource exhausted", "billing", "payment required", "insufficient credits", "insufficient quota", ] for pattern in generic_patterns: if pattern in error_str: return True return False def get_default_fallback_chain( primary_provider: str, exclude_provider: Optional[str] = None, ) -> List[Dict[str, Any]]: """Get the default fallback chain for a primary provider. Args: primary_provider: The primary provider name exclude_provider: Optional provider to exclude from the chain Returns: List of fallback configurations """ chain = DEFAULT_FALLBACK_CHAINS.get(primary_provider, []) # Filter out excluded provider if specified if exclude_provider: chain = [ fb for fb in chain if fb.get("provider") != exclude_provider ] return list(chain) def should_auto_fallback( provider: str, error: Optional[Exception] = None, auto_fallback_enabled: Optional[bool] = None, ) -> bool: """Determine if automatic fallback should be attempted. Args: provider: The current provider name error: Optional error to check for quota issues auto_fallback_enabled: Optional override for auto-fallback setting Returns: True if automatic fallback should be attempted """ # Check environment variable override if auto_fallback_enabled is None: env_setting = os.getenv("HERMES_AUTO_FALLBACK", "true").lower() auto_fallback_enabled = env_setting in ("true", "1", "yes", "on") if not auto_fallback_enabled: return False # Check if provider has a configured fallback chain if provider not in DEFAULT_FALLBACK_CHAINS: # Still allow fallback if it's a quota error with generic handling if error and is_quota_error(error): logger.debug( "Provider %s has no fallback chain but quota error detected", provider ) return True return False # If there's an error, only fallback on quota/rate limit errors if error is not None: return is_quota_error(error, provider) # No error but fallback chain exists - allow eager fallback for # providers known to have quota issues return provider in ("anthropic",) def log_fallback_event( from_provider: str, to_provider: str, to_model: str, reason: str, error: Optional[Exception] = None, ) -> None: """Log a fallback event for monitoring. Args: from_provider: The provider we're falling back from to_provider: The provider we're falling back to to_model: The model we're falling back to reason: The reason for the fallback error: Optional error that triggered the fallback """ log_data = { "event": "provider_fallback", "from_provider": from_provider, "to_provider": to_provider, "to_model": to_model, "reason": reason, } if error: log_data["error_type"] = type(error).__name__ log_data["error_message"] = str(error)[:200] logger.info("Provider fallback: %s -> %s (%s) | Reason: %s", from_provider, to_provider, to_model, reason) # Also log structured data for monitoring logger.debug("Fallback event data: %s", log_data) def resolve_fallback_with_credentials( fallback_config: Dict[str, Any], ) -> Tuple[Optional[Any], Optional[str]]: """Resolve a fallback configuration to a client and model. Args: fallback_config: Fallback configuration dict with provider and model Returns: Tuple of (client, model) or (None, None) if credentials not available """ from agent.auxiliary_client import resolve_provider_client provider = fallback_config.get("provider") model = fallback_config.get("model") if not provider or not model: return None, None try: client, resolved_model = resolve_provider_client( provider, model=model, raw_codex=True, ) return client, resolved_model or model except Exception as exc: logger.debug( "Failed to resolve fallback provider %s: %s", provider, exc ) return None, None def get_auto_fallback_chain( primary_provider: str, user_fallback_chain: Optional[List[Dict[str, Any]]] = None, ) -> List[Dict[str, Any]]: """Get the effective fallback chain for automatic fallback. Combines user-provided fallback chain with default automatic fallback chain. Args: primary_provider: The primary provider name user_fallback_chain: Optional user-provided fallback chain Returns: The effective fallback chain to use """ # Use user-provided chain if available if user_fallback_chain: return user_fallback_chain # Otherwise use default chain for the provider return get_default_fallback_chain(primary_provider) def is_fallback_available( fallback_config: Dict[str, Any], ) -> bool: """Check if a fallback configuration has available credentials. Args: fallback_config: Fallback configuration dict Returns: True if credentials are available for the fallback provider """ provider = fallback_config.get("provider") if not provider: return False # Check environment variables for API keys env_vars = { "anthropic": ["ANTHROPIC_API_KEY", "ANTHROPIC_TOKEN"], "kimi-coding": ["KIMI_API_KEY", "KIMI_API_TOKEN"], "zai": ["ZAI_API_KEY", "Z_AI_API_KEY"], "openrouter": ["OPENROUTER_API_KEY"], "minimax": ["MINIMAX_API_KEY"], "minimax-cn": ["MINIMAX_CN_API_KEY"], "deepseek": ["DEEPSEEK_API_KEY"], "alibaba": ["DASHSCOPE_API_KEY", "ALIBABA_API_KEY"], "nous": ["NOUS_AGENT_KEY", "NOUS_ACCESS_TOKEN"], } keys_to_check = env_vars.get(provider, [f"{provider.upper()}_API_KEY"]) for key in keys_to_check: if os.getenv(key): return True # Check auth.json for OAuth providers if provider in ("nous", "openai-codex"): try: from hermes_cli.config import get_hermes_home auth_path = get_hermes_home() / "auth.json" if auth_path.exists(): import json data = json.loads(auth_path.read_text()) if data.get("active_provider") == provider: return True # Check for provider in providers dict if data.get("providers", {}).get(provider): return True except Exception: pass return False def filter_available_fallbacks( fallback_chain: List[Dict[str, Any]], ) -> List[Dict[str, Any]]: """Filter a fallback chain to only include providers with credentials. Args: fallback_chain: List of fallback configurations Returns: Filtered list with only available fallbacks """ return [ fb for fb in fallback_chain if is_fallback_available(fb) ]