405 lines
12 KiB
Python
405 lines
12 KiB
Python
"""Automatic fallback router for handling provider quota and rate limit errors.
|
|
|
|
This module provides intelligent fallback detection and routing when the primary
|
|
provider (e.g., Anthropic) encounters quota limitations or rate limits.
|
|
|
|
Features:
|
|
- Detects quota/rate limit errors from different providers
|
|
- Automatic fallback to kimi-coding when Anthropic quota is exceeded
|
|
- Configurable fallback chains with default anthropic -> kimi-coding
|
|
- Logging and monitoring of fallback events
|
|
|
|
Usage:
|
|
from agent.fallback_router import (
|
|
is_quota_error,
|
|
get_default_fallback_chain,
|
|
should_auto_fallback,
|
|
)
|
|
|
|
if is_quota_error(error, provider="anthropic"):
|
|
if should_auto_fallback(provider="anthropic"):
|
|
fallback_chain = get_default_fallback_chain("anthropic")
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
from typing import Dict, List, Optional, Any, Tuple
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Default fallback chains per provider
|
|
# Each chain is a list of fallback configurations tried in order
|
|
DEFAULT_FALLBACK_CHAINS: Dict[str, List[Dict[str, Any]]] = {
|
|
"anthropic": [
|
|
{"provider": "kimi-coding", "model": "kimi-k2.5"},
|
|
{"provider": "openrouter", "model": "anthropic/claude-sonnet-4"},
|
|
],
|
|
"openrouter": [
|
|
{"provider": "kimi-coding", "model": "kimi-k2.5"},
|
|
{"provider": "zai", "model": "glm-5"},
|
|
],
|
|
"kimi-coding": [
|
|
{"provider": "openrouter", "model": "anthropic/claude-sonnet-4"},
|
|
{"provider": "zai", "model": "glm-5"},
|
|
],
|
|
"zai": [
|
|
{"provider": "openrouter", "model": "anthropic/claude-sonnet-4"},
|
|
{"provider": "kimi-coding", "model": "kimi-k2.5"},
|
|
],
|
|
}
|
|
|
|
# Quota/rate limit error patterns by provider
|
|
# These are matched (case-insensitive) against error messages
|
|
QUOTA_ERROR_PATTERNS: Dict[str, List[str]] = {
|
|
"anthropic": [
|
|
"rate limit",
|
|
"ratelimit",
|
|
"quota exceeded",
|
|
"quota exceeded",
|
|
"insufficient quota",
|
|
"429",
|
|
"403",
|
|
"too many requests",
|
|
"capacity exceeded",
|
|
"over capacity",
|
|
"temporarily unavailable",
|
|
"server overloaded",
|
|
"resource exhausted",
|
|
"billing threshold",
|
|
"credit balance",
|
|
"payment required",
|
|
"402",
|
|
],
|
|
"openrouter": [
|
|
"rate limit",
|
|
"ratelimit",
|
|
"quota exceeded",
|
|
"insufficient credits",
|
|
"429",
|
|
"402",
|
|
"no endpoints available",
|
|
"all providers failed",
|
|
"over capacity",
|
|
],
|
|
"kimi-coding": [
|
|
"rate limit",
|
|
"ratelimit",
|
|
"quota exceeded",
|
|
"429",
|
|
"insufficient balance",
|
|
],
|
|
"zai": [
|
|
"rate limit",
|
|
"ratelimit",
|
|
"quota exceeded",
|
|
"429",
|
|
"insufficient quota",
|
|
],
|
|
}
|
|
|
|
# HTTP status codes indicating quota/rate limit issues
|
|
QUOTA_STATUS_CODES = {429, 402, 403}
|
|
|
|
|
|
def is_quota_error(error: Exception, provider: Optional[str] = None) -> bool:
|
|
"""Detect if an error is quota/rate limit related.
|
|
|
|
Args:
|
|
error: The exception to check
|
|
provider: Optional provider name to check provider-specific patterns
|
|
|
|
Returns:
|
|
True if the error appears to be quota/rate limit related
|
|
"""
|
|
if error is None:
|
|
return False
|
|
|
|
error_str = str(error).lower()
|
|
error_type = type(error).__name__.lower()
|
|
|
|
# Check for common rate limit exception types
|
|
if any(term in error_type for term in [
|
|
"ratelimit", "rate_limit", "quota", "toomanyrequests",
|
|
"insufficient_quota", "billing", "payment"
|
|
]):
|
|
return True
|
|
|
|
# Check HTTP status code if available
|
|
status_code = getattr(error, "status_code", None)
|
|
if status_code is None:
|
|
# Try common attribute names
|
|
for attr in ["code", "http_status", "response_code", "status"]:
|
|
if hasattr(error, attr):
|
|
try:
|
|
status_code = int(getattr(error, attr))
|
|
break
|
|
except (TypeError, ValueError):
|
|
continue
|
|
|
|
if status_code in QUOTA_STATUS_CODES:
|
|
return True
|
|
|
|
# Check provider-specific patterns
|
|
providers_to_check = [provider] if provider else QUOTA_ERROR_PATTERNS.keys()
|
|
|
|
for prov in providers_to_check:
|
|
patterns = QUOTA_ERROR_PATTERNS.get(prov, [])
|
|
for pattern in patterns:
|
|
if pattern.lower() in error_str:
|
|
logger.debug(
|
|
"Detected %s quota error pattern '%s' in: %s",
|
|
prov, pattern, error
|
|
)
|
|
return True
|
|
|
|
# Check generic quota patterns
|
|
generic_patterns = [
|
|
"rate limit exceeded",
|
|
"quota exceeded",
|
|
"too many requests",
|
|
"capacity exceeded",
|
|
"temporarily unavailable",
|
|
"try again later",
|
|
"resource exhausted",
|
|
"billing",
|
|
"payment required",
|
|
"insufficient credits",
|
|
"insufficient quota",
|
|
]
|
|
|
|
for pattern in generic_patterns:
|
|
if pattern in error_str:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def get_default_fallback_chain(
|
|
primary_provider: str,
|
|
exclude_provider: Optional[str] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
"""Get the default fallback chain for a primary provider.
|
|
|
|
Args:
|
|
primary_provider: The primary provider name
|
|
exclude_provider: Optional provider to exclude from the chain
|
|
|
|
Returns:
|
|
List of fallback configurations
|
|
"""
|
|
chain = DEFAULT_FALLBACK_CHAINS.get(primary_provider, [])
|
|
|
|
# Filter out excluded provider if specified
|
|
if exclude_provider:
|
|
chain = [
|
|
fb for fb in chain
|
|
if fb.get("provider") != exclude_provider
|
|
]
|
|
|
|
return list(chain)
|
|
|
|
|
|
def should_auto_fallback(
|
|
provider: str,
|
|
error: Optional[Exception] = None,
|
|
auto_fallback_enabled: Optional[bool] = None,
|
|
) -> bool:
|
|
"""Determine if automatic fallback should be attempted.
|
|
|
|
Args:
|
|
provider: The current provider name
|
|
error: Optional error to check for quota issues
|
|
auto_fallback_enabled: Optional override for auto-fallback setting
|
|
|
|
Returns:
|
|
True if automatic fallback should be attempted
|
|
"""
|
|
# Check environment variable override
|
|
if auto_fallback_enabled is None:
|
|
env_setting = os.getenv("HERMES_AUTO_FALLBACK", "true").lower()
|
|
auto_fallback_enabled = env_setting in ("true", "1", "yes", "on")
|
|
|
|
if not auto_fallback_enabled:
|
|
return False
|
|
|
|
# Check if provider has a configured fallback chain
|
|
if provider not in DEFAULT_FALLBACK_CHAINS:
|
|
# Still allow fallback if it's a quota error with generic handling
|
|
if error and is_quota_error(error):
|
|
logger.debug(
|
|
"Provider %s has no fallback chain but quota error detected",
|
|
provider
|
|
)
|
|
return True
|
|
return False
|
|
|
|
# If there's an error, only fallback on quota/rate limit errors
|
|
if error is not None:
|
|
return is_quota_error(error, provider)
|
|
|
|
# No error but fallback chain exists - allow eager fallback for
|
|
# providers known to have quota issues
|
|
return provider in ("anthropic",)
|
|
|
|
|
|
def log_fallback_event(
|
|
from_provider: str,
|
|
to_provider: str,
|
|
to_model: str,
|
|
reason: str,
|
|
error: Optional[Exception] = None,
|
|
) -> None:
|
|
"""Log a fallback event for monitoring.
|
|
|
|
Args:
|
|
from_provider: The provider we're falling back from
|
|
to_provider: The provider we're falling back to
|
|
to_model: The model we're falling back to
|
|
reason: The reason for the fallback
|
|
error: Optional error that triggered the fallback
|
|
"""
|
|
log_data = {
|
|
"event": "provider_fallback",
|
|
"from_provider": from_provider,
|
|
"to_provider": to_provider,
|
|
"to_model": to_model,
|
|
"reason": reason,
|
|
}
|
|
|
|
if error:
|
|
log_data["error_type"] = type(error).__name__
|
|
log_data["error_message"] = str(error)[:200]
|
|
|
|
logger.info("Provider fallback: %s -> %s (%s) | Reason: %s",
|
|
from_provider, to_provider, to_model, reason)
|
|
|
|
# Also log structured data for monitoring
|
|
logger.debug("Fallback event data: %s", log_data)
|
|
|
|
|
|
def resolve_fallback_with_credentials(
|
|
fallback_config: Dict[str, Any],
|
|
) -> Tuple[Optional[Any], Optional[str]]:
|
|
"""Resolve a fallback configuration to a client and model.
|
|
|
|
Args:
|
|
fallback_config: Fallback configuration dict with provider and model
|
|
|
|
Returns:
|
|
Tuple of (client, model) or (None, None) if credentials not available
|
|
"""
|
|
from agent.auxiliary_client import resolve_provider_client
|
|
|
|
provider = fallback_config.get("provider")
|
|
model = fallback_config.get("model")
|
|
|
|
if not provider or not model:
|
|
return None, None
|
|
|
|
try:
|
|
client, resolved_model = resolve_provider_client(
|
|
provider,
|
|
model=model,
|
|
raw_codex=True,
|
|
)
|
|
return client, resolved_model or model
|
|
except Exception as exc:
|
|
logger.debug(
|
|
"Failed to resolve fallback provider %s: %s",
|
|
provider, exc
|
|
)
|
|
return None, None
|
|
|
|
|
|
def get_auto_fallback_chain(
|
|
primary_provider: str,
|
|
user_fallback_chain: Optional[List[Dict[str, Any]]] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
"""Get the effective fallback chain for automatic fallback.
|
|
|
|
Combines user-provided fallback chain with default automatic fallback chain.
|
|
|
|
Args:
|
|
primary_provider: The primary provider name
|
|
user_fallback_chain: Optional user-provided fallback chain
|
|
|
|
Returns:
|
|
The effective fallback chain to use
|
|
"""
|
|
# Use user-provided chain if available
|
|
if user_fallback_chain:
|
|
return user_fallback_chain
|
|
|
|
# Otherwise use default chain for the provider
|
|
return get_default_fallback_chain(primary_provider)
|
|
|
|
|
|
def is_fallback_available(
|
|
fallback_config: Dict[str, Any],
|
|
) -> bool:
|
|
"""Check if a fallback configuration has available credentials.
|
|
|
|
Args:
|
|
fallback_config: Fallback configuration dict
|
|
|
|
Returns:
|
|
True if credentials are available for the fallback provider
|
|
"""
|
|
provider = fallback_config.get("provider")
|
|
if not provider:
|
|
return False
|
|
|
|
# Check environment variables for API keys
|
|
env_vars = {
|
|
"anthropic": ["ANTHROPIC_API_KEY", "ANTHROPIC_TOKEN"],
|
|
"kimi-coding": ["KIMI_API_KEY", "KIMI_API_TOKEN"],
|
|
"zai": ["ZAI_API_KEY", "Z_AI_API_KEY"],
|
|
"openrouter": ["OPENROUTER_API_KEY"],
|
|
"minimax": ["MINIMAX_API_KEY"],
|
|
"minimax-cn": ["MINIMAX_CN_API_KEY"],
|
|
"deepseek": ["DEEPSEEK_API_KEY"],
|
|
"alibaba": ["DASHSCOPE_API_KEY", "ALIBABA_API_KEY"],
|
|
"nous": ["NOUS_AGENT_KEY", "NOUS_ACCESS_TOKEN"],
|
|
}
|
|
|
|
keys_to_check = env_vars.get(provider, [f"{provider.upper()}_API_KEY"])
|
|
|
|
for key in keys_to_check:
|
|
if os.getenv(key):
|
|
return True
|
|
|
|
# Check auth.json for OAuth providers
|
|
if provider in ("nous", "openai-codex"):
|
|
try:
|
|
from hermes_cli.config import get_hermes_home
|
|
auth_path = get_hermes_home() / "auth.json"
|
|
if auth_path.exists():
|
|
import json
|
|
data = json.loads(auth_path.read_text())
|
|
if data.get("active_provider") == provider:
|
|
return True
|
|
# Check for provider in providers dict
|
|
if data.get("providers", {}).get(provider):
|
|
return True
|
|
except Exception:
|
|
pass
|
|
|
|
return False
|
|
|
|
|
|
def filter_available_fallbacks(
|
|
fallback_chain: List[Dict[str, Any]],
|
|
) -> List[Dict[str, Any]]:
|
|
"""Filter a fallback chain to only include providers with credentials.
|
|
|
|
Args:
|
|
fallback_chain: List of fallback configurations
|
|
|
|
Returns:
|
|
Filtered list with only available fallbacks
|
|
"""
|
|
return [
|
|
fb for fb in fallback_chain
|
|
if is_fallback_available(fb)
|
|
]
|