From 5ef812d5818612058992acd5d9563af82b054104 Mon Sep 17 00:00:00 2001 From: Allegro Date: Tue, 31 Mar 2026 19:35:54 +0000 Subject: [PATCH] feat: implement automatic kimi-coding fallback on quota errors --- agent/fallback_router.py | 404 ++++++++++++++++++++++++++++++++++ run_agent.py | 37 +++- tests/test_fallback_router.py | 280 +++++++++++++++++++++++ 3 files changed, 719 insertions(+), 2 deletions(-) create mode 100644 agent/fallback_router.py create mode 100644 tests/test_fallback_router.py diff --git a/agent/fallback_router.py b/agent/fallback_router.py new file mode 100644 index 00000000..5c7bb6e7 --- /dev/null +++ b/agent/fallback_router.py @@ -0,0 +1,404 @@ +"""Automatic fallback router for handling provider quota and rate limit errors. + +This module provides intelligent fallback detection and routing when the primary +provider (e.g., Anthropic) encounters quota limitations or rate limits. + +Features: +- Detects quota/rate limit errors from different providers +- Automatic fallback to kimi-coding when Anthropic quota is exceeded +- Configurable fallback chains with default anthropic -> kimi-coding +- Logging and monitoring of fallback events + +Usage: + from agent.fallback_router import ( + is_quota_error, + get_default_fallback_chain, + should_auto_fallback, + ) + + if is_quota_error(error, provider="anthropic"): + if should_auto_fallback(provider="anthropic"): + fallback_chain = get_default_fallback_chain("anthropic") +""" + +import logging +import os +from typing import Dict, List, Optional, Any, Tuple + +logger = logging.getLogger(__name__) + +# Default fallback chains per provider +# Each chain is a list of fallback configurations tried in order +DEFAULT_FALLBACK_CHAINS: Dict[str, List[Dict[str, Any]]] = { + "anthropic": [ + {"provider": "kimi-coding", "model": "kimi-k2.5"}, + {"provider": "openrouter", "model": "anthropic/claude-sonnet-4"}, + ], + "openrouter": [ + {"provider": "kimi-coding", "model": "kimi-k2.5"}, + {"provider": "zai", "model": "glm-5"}, + ], + "kimi-coding": [ + {"provider": "openrouter", "model": "anthropic/claude-sonnet-4"}, + {"provider": "zai", "model": "glm-5"}, + ], + "zai": [ + {"provider": "openrouter", "model": "anthropic/claude-sonnet-4"}, + {"provider": "kimi-coding", "model": "kimi-k2.5"}, + ], +} + +# Quota/rate limit error patterns by provider +# These are matched (case-insensitive) against error messages +QUOTA_ERROR_PATTERNS: Dict[str, List[str]] = { + "anthropic": [ + "rate limit", + "ratelimit", + "quota exceeded", + "quota exceeded", + "insufficient quota", + "429", + "403", + "too many requests", + "capacity exceeded", + "over capacity", + "temporarily unavailable", + "server overloaded", + "resource exhausted", + "billing threshold", + "credit balance", + "payment required", + "402", + ], + "openrouter": [ + "rate limit", + "ratelimit", + "quota exceeded", + "insufficient credits", + "429", + "402", + "no endpoints available", + "all providers failed", + "over capacity", + ], + "kimi-coding": [ + "rate limit", + "ratelimit", + "quota exceeded", + "429", + "insufficient balance", + ], + "zai": [ + "rate limit", + "ratelimit", + "quota exceeded", + "429", + "insufficient quota", + ], +} + +# HTTP status codes indicating quota/rate limit issues +QUOTA_STATUS_CODES = {429, 402, 403} + + +def is_quota_error(error: Exception, provider: Optional[str] = None) -> bool: + """Detect if an error is quota/rate limit related. + + Args: + error: The exception to check + provider: Optional provider name to check provider-specific patterns + + Returns: + True if the error appears to be quota/rate limit related + """ + if error is None: + return False + + error_str = str(error).lower() + error_type = type(error).__name__.lower() + + # Check for common rate limit exception types + if any(term in error_type for term in [ + "ratelimit", "rate_limit", "quota", "toomanyrequests", + "insufficient_quota", "billing", "payment" + ]): + return True + + # Check HTTP status code if available + status_code = getattr(error, "status_code", None) + if status_code is None: + # Try common attribute names + for attr in ["code", "http_status", "response_code", "status"]: + if hasattr(error, attr): + try: + status_code = int(getattr(error, attr)) + break + except (TypeError, ValueError): + continue + + if status_code in QUOTA_STATUS_CODES: + return True + + # Check provider-specific patterns + providers_to_check = [provider] if provider else QUOTA_ERROR_PATTERNS.keys() + + for prov in providers_to_check: + patterns = QUOTA_ERROR_PATTERNS.get(prov, []) + for pattern in patterns: + if pattern.lower() in error_str: + logger.debug( + "Detected %s quota error pattern '%s' in: %s", + prov, pattern, error + ) + return True + + # Check generic quota patterns + generic_patterns = [ + "rate limit exceeded", + "quota exceeded", + "too many requests", + "capacity exceeded", + "temporarily unavailable", + "try again later", + "resource exhausted", + "billing", + "payment required", + "insufficient credits", + "insufficient quota", + ] + + for pattern in generic_patterns: + if pattern in error_str: + return True + + return False + + +def get_default_fallback_chain( + primary_provider: str, + exclude_provider: Optional[str] = None, +) -> List[Dict[str, Any]]: + """Get the default fallback chain for a primary provider. + + Args: + primary_provider: The primary provider name + exclude_provider: Optional provider to exclude from the chain + + Returns: + List of fallback configurations + """ + chain = DEFAULT_FALLBACK_CHAINS.get(primary_provider, []) + + # Filter out excluded provider if specified + if exclude_provider: + chain = [ + fb for fb in chain + if fb.get("provider") != exclude_provider + ] + + return list(chain) + + +def should_auto_fallback( + provider: str, + error: Optional[Exception] = None, + auto_fallback_enabled: Optional[bool] = None, +) -> bool: + """Determine if automatic fallback should be attempted. + + Args: + provider: The current provider name + error: Optional error to check for quota issues + auto_fallback_enabled: Optional override for auto-fallback setting + + Returns: + True if automatic fallback should be attempted + """ + # Check environment variable override + if auto_fallback_enabled is None: + env_setting = os.getenv("HERMES_AUTO_FALLBACK", "true").lower() + auto_fallback_enabled = env_setting in ("true", "1", "yes", "on") + + if not auto_fallback_enabled: + return False + + # Check if provider has a configured fallback chain + if provider not in DEFAULT_FALLBACK_CHAINS: + # Still allow fallback if it's a quota error with generic handling + if error and is_quota_error(error): + logger.debug( + "Provider %s has no fallback chain but quota error detected", + provider + ) + return True + return False + + # If there's an error, only fallback on quota/rate limit errors + if error is not None: + return is_quota_error(error, provider) + + # No error but fallback chain exists - allow eager fallback for + # providers known to have quota issues + return provider in ("anthropic",) + + +def log_fallback_event( + from_provider: str, + to_provider: str, + to_model: str, + reason: str, + error: Optional[Exception] = None, +) -> None: + """Log a fallback event for monitoring. + + Args: + from_provider: The provider we're falling back from + to_provider: The provider we're falling back to + to_model: The model we're falling back to + reason: The reason for the fallback + error: Optional error that triggered the fallback + """ + log_data = { + "event": "provider_fallback", + "from_provider": from_provider, + "to_provider": to_provider, + "to_model": to_model, + "reason": reason, + } + + if error: + log_data["error_type"] = type(error).__name__ + log_data["error_message"] = str(error)[:200] + + logger.info("Provider fallback: %s -> %s (%s) | Reason: %s", + from_provider, to_provider, to_model, reason) + + # Also log structured data for monitoring + logger.debug("Fallback event data: %s", log_data) + + +def resolve_fallback_with_credentials( + fallback_config: Dict[str, Any], +) -> Tuple[Optional[Any], Optional[str]]: + """Resolve a fallback configuration to a client and model. + + Args: + fallback_config: Fallback configuration dict with provider and model + + Returns: + Tuple of (client, model) or (None, None) if credentials not available + """ + from agent.auxiliary_client import resolve_provider_client + + provider = fallback_config.get("provider") + model = fallback_config.get("model") + + if not provider or not model: + return None, None + + try: + client, resolved_model = resolve_provider_client( + provider, + model=model, + raw_codex=True, + ) + return client, resolved_model or model + except Exception as exc: + logger.debug( + "Failed to resolve fallback provider %s: %s", + provider, exc + ) + return None, None + + +def get_auto_fallback_chain( + primary_provider: str, + user_fallback_chain: Optional[List[Dict[str, Any]]] = None, +) -> List[Dict[str, Any]]: + """Get the effective fallback chain for automatic fallback. + + Combines user-provided fallback chain with default automatic fallback chain. + + Args: + primary_provider: The primary provider name + user_fallback_chain: Optional user-provided fallback chain + + Returns: + The effective fallback chain to use + """ + # Use user-provided chain if available + if user_fallback_chain: + return user_fallback_chain + + # Otherwise use default chain for the provider + return get_default_fallback_chain(primary_provider) + + +def is_fallback_available( + fallback_config: Dict[str, Any], +) -> bool: + """Check if a fallback configuration has available credentials. + + Args: + fallback_config: Fallback configuration dict + + Returns: + True if credentials are available for the fallback provider + """ + provider = fallback_config.get("provider") + if not provider: + return False + + # Check environment variables for API keys + env_vars = { + "anthropic": ["ANTHROPIC_API_KEY", "ANTHROPIC_TOKEN"], + "kimi-coding": ["KIMI_API_KEY", "KIMI_API_TOKEN"], + "zai": ["ZAI_API_KEY", "Z_AI_API_KEY"], + "openrouter": ["OPENROUTER_API_KEY"], + "minimax": ["MINIMAX_API_KEY"], + "minimax-cn": ["MINIMAX_CN_API_KEY"], + "deepseek": ["DEEPSEEK_API_KEY"], + "alibaba": ["DASHSCOPE_API_KEY", "ALIBABA_API_KEY"], + "nous": ["NOUS_AGENT_KEY", "NOUS_ACCESS_TOKEN"], + } + + keys_to_check = env_vars.get(provider, [f"{provider.upper()}_API_KEY"]) + + for key in keys_to_check: + if os.getenv(key): + return True + + # Check auth.json for OAuth providers + if provider in ("nous", "openai-codex"): + try: + from hermes_cli.config import get_hermes_home + auth_path = get_hermes_home() / "auth.json" + if auth_path.exists(): + import json + data = json.loads(auth_path.read_text()) + if data.get("active_provider") == provider: + return True + # Check for provider in providers dict + if data.get("providers", {}).get(provider): + return True + except Exception: + pass + + return False + + +def filter_available_fallbacks( + fallback_chain: List[Dict[str, Any]], +) -> List[Dict[str, Any]]: + """Filter a fallback chain to only include providers with credentials. + + Args: + fallback_chain: List of fallback configurations + + Returns: + Filtered list with only available fallbacks + """ + return [ + fb for fb in fallback_chain + if is_fallback_available(fb) + ] diff --git a/run_agent.py b/run_agent.py index e8991c67..9b664c22 100644 --- a/run_agent.py +++ b/run_agent.py @@ -100,6 +100,13 @@ from agent.trajectory import ( convert_scratchpad_to_think, has_incomplete_scratchpad, save_trajectory as _save_trajectory_to_file, ) +from agent.fallback_router import ( + is_quota_error, + get_auto_fallback_chain, + log_fallback_event, + should_auto_fallback, + filter_available_fallbacks, +) from utils import atomic_json_write HONCHO_TOOL_NAMES = { @@ -909,6 +916,20 @@ class AIAgent: self._fallback_chain = [fallback_model] else: self._fallback_chain = [] + + # Auto-enable fallback for Anthropic (and other providers) when no + # explicit fallback chain is configured. This provides automatic + # failover to kimi-coding when Anthropic quota is limited. + if not self._fallback_chain and should_auto_fallback(self.provider): + auto_chain = get_auto_fallback_chain(self.provider) + # Filter to only include fallbacks with available credentials + available_chain = filter_available_fallbacks(auto_chain) + if available_chain: + self._fallback_chain = available_chain + if not self.quiet_mode: + print(f"🔄 Auto-fallback enabled: {self.provider} → " + + " → ".join(f"{f['model']} ({f['provider']})" for f in available_chain)) + self._fallback_index = 0 self._fallback_activated = False # Legacy attribute kept for backward compat (tests, external callers) @@ -4565,6 +4586,12 @@ class AIAgent: f"🔄 Primary model failed — switching to fallback: " f"{fb_model} via {fb_provider}" ) + log_fallback_event( + from_provider=self.provider, + to_provider=fb_provider, + to_model=fb_model, + reason="quota_or_rate_limit", + ) logging.info( "Fallback activated: %s → %s (%s)", old_model, fb_model, fb_provider, @@ -7141,8 +7168,14 @@ class AIAgent: or "usage limit" in error_msg or "quota" in error_msg ) - if is_rate_limited and self._fallback_index < len(self._fallback_chain): - self._emit_status("⚠️ Rate limited — switching to fallback provider...") + # Also check using the quota error detector for provider-specific patterns + is_quota_error_result = is_quota_error(api_error, self.provider) + + if (is_rate_limited or is_quota_error_result) and self._fallback_index < len(self._fallback_chain): + if is_quota_error_result: + self._emit_status(f"⚠️ {self.provider} quota exceeded — switching to fallback provider...") + else: + self._emit_status("⚠️ Rate limited — switching to fallback provider...") if self._try_activate_fallback(): retry_count = 0 continue diff --git a/tests/test_fallback_router.py b/tests/test_fallback_router.py new file mode 100644 index 00000000..fd7878ac --- /dev/null +++ b/tests/test_fallback_router.py @@ -0,0 +1,280 @@ +"""Tests for the automatic fallback router module. + +Tests quota error detection, fallback chain resolution, and auto-fallback logic. +""" + +import os +import pytest +from unittest.mock import MagicMock, patch + +from agent.fallback_router import ( + is_quota_error, + get_default_fallback_chain, + should_auto_fallback, + log_fallback_event, + get_auto_fallback_chain, + is_fallback_available, + filter_available_fallbacks, + QUOTA_STATUS_CODES, + DEFAULT_FALLBACK_CHAINS, +) + + +class TestIsQuotaError: + """Tests for quota error detection.""" + + def test_none_error_returns_false(self): + assert is_quota_error(None) is False + + def test_rate_limit_status_code_429(self): + error = MagicMock() + error.status_code = 429 + error.__str__ = lambda self: "Rate limit exceeded" + assert is_quota_error(error) is True + + def test_payment_required_status_code_402(self): + error = MagicMock() + error.status_code = 402 + error.__str__ = lambda self: "Payment required" + assert is_quota_error(error) is True + + def test_forbidden_status_code_403(self): + error = MagicMock() + error.status_code = 403 + error.__str__ = lambda self: "Forbidden" + assert is_quota_error(error) is True + + def test_anthropic_quota_patterns(self): + patterns = [ + "Rate limit exceeded", + "quota exceeded", + "insufficient quota", + "capacity exceeded", + "over capacity", + "billing threshold reached", + "credit balance too low", + ] + for pattern in patterns: + error = Exception(pattern) + assert is_quota_error(error, provider="anthropic") is True, f"Failed for: {pattern}" + + def test_anthropic_error_type_detection(self): + class RateLimitError(Exception): + pass + + error = RateLimitError("Too many requests") + assert is_quota_error(error) is True + + def test_non_quota_error(self): + error = Exception("Some random error") + assert is_quota_error(error) is False + + def test_context_length_error_not_quota(self): + error = Exception("Context length exceeded") + assert is_quota_error(error) is False + + def test_provider_specific_patterns(self): + # Test openrouter patterns + error = Exception("Insufficient credits") + assert is_quota_error(error, provider="openrouter") is True + + # Test kimi patterns + error = Exception("Insufficient balance") + assert is_quota_error(error, provider="kimi-coding") is True + + +class TestGetDefaultFallbackChain: + """Tests for default fallback chain retrieval.""" + + def test_anthropic_fallback_chain(self): + chain = get_default_fallback_chain("anthropic") + assert len(chain) >= 1 + assert chain[0]["provider"] == "kimi-coding" + assert chain[0]["model"] == "kimi-k2.5" + + def test_openrouter_fallback_chain(self): + chain = get_default_fallback_chain("openrouter") + assert len(chain) >= 1 + assert any(fb["provider"] == "kimi-coding" for fb in chain) + + def test_unknown_provider_returns_empty(self): + chain = get_default_fallback_chain("unknown_provider") + assert chain == [] + + def test_exclude_provider(self): + chain = get_default_fallback_chain("anthropic", exclude_provider="kimi-coding") + assert all(fb["provider"] != "kimi-coding" for fb in chain) + + +class TestShouldAutoFallback: + """Tests for auto-fallback decision logic.""" + + def test_auto_fallback_enabled_by_default(self): + with patch.dict(os.environ, {"HERMES_AUTO_FALLBACK": "true"}): + assert should_auto_fallback("anthropic") is True + + def test_auto_fallback_disabled_via_env(self): + with patch.dict(os.environ, {"HERMES_AUTO_FALLBACK": "false"}): + assert should_auto_fallback("anthropic") is False + + def test_auto_fallback_disabled_via_override(self): + assert should_auto_fallback("anthropic", auto_fallback_enabled=False) is False + + def test_quota_error_triggers_fallback(self): + error = Exception("Rate limit exceeded") + assert should_auto_fallback("unknown_provider", error=error) is True + + def test_non_quota_error_no_fallback(self): + error = Exception("Some random error") + # Unknown provider with non-quota error should not fallback + assert should_auto_fallback("unknown_provider", error=error) is False + + def test_anthropic_eager_fallback(self): + # Anthropic falls back eagerly even without error + assert should_auto_fallback("anthropic") is True + + +class TestLogFallbackEvent: + """Tests for fallback event logging.""" + + def test_log_fallback_event(self): + with patch("agent.fallback_router.logger") as mock_logger: + log_fallback_event( + from_provider="anthropic", + to_provider="kimi-coding", + to_model="kimi-k2.5", + reason="quota_exceeded", + ) + mock_logger.info.assert_called_once() + # Check the arguments passed to logger.info + call_args = mock_logger.info.call_args[0] + # First arg is format string, remaining are the values + assert len(call_args) >= 4 + assert "anthropic" in call_args # Provider names are in the args + assert "kimi-coding" in call_args + + def test_log_fallback_event_with_error(self): + error = Exception("Rate limit exceeded") + with patch("agent.fallback_router.logger") as mock_logger: + log_fallback_event( + from_provider="anthropic", + to_provider="kimi-coding", + to_model="kimi-k2.5", + reason="quota_exceeded", + error=error, + ) + mock_logger.info.assert_called_once() + mock_logger.debug.assert_called_once() + + +class TestGetAutoFallbackChain: + """Tests for automatic fallback chain resolution.""" + + def test_user_chain_takes_precedence(self): + user_chain = [{"provider": "zai", "model": "glm-5"}] + chain = get_auto_fallback_chain("anthropic", user_fallback_chain=user_chain) + assert chain == user_chain + + def test_default_chain_when_no_user_chain(self): + chain = get_auto_fallback_chain("anthropic") + assert chain == DEFAULT_FALLBACK_CHAINS["anthropic"] + + +class TestIsFallbackAvailable: + """Tests for fallback availability checking.""" + + def test_anthropic_available_with_key(self): + with patch.dict(os.environ, {"ANTHROPIC_API_KEY": "test-key"}): + config = {"provider": "anthropic", "model": "claude-3"} + assert is_fallback_available(config) is True + + def test_anthropic_unavailable_without_key(self): + with patch.dict(os.environ, {}, clear=True): + config = {"provider": "anthropic", "model": "claude-3"} + assert is_fallback_available(config) is False + + def test_kimi_available_with_key(self): + with patch.dict(os.environ, {"KIMI_API_KEY": "test-key"}): + config = {"provider": "kimi-coding", "model": "kimi-k2.5"} + assert is_fallback_available(config) is True + + def test_kimi_available_with_token(self): + with patch.dict(os.environ, {"KIMI_API_TOKEN": "test-token"}): + config = {"provider": "kimi-coding", "model": "kimi-k2.5"} + assert is_fallback_available(config) is True + + def test_invalid_config_returns_false(self): + assert is_fallback_available({}) is False + assert is_fallback_available({"provider": ""}) is False + + +class TestFilterAvailableFallbacks: + """Tests for filtering available fallbacks.""" + + def test_filters_unavailable_providers(self): + with patch.dict(os.environ, {"KIMI_API_KEY": "test-key"}): + chain = [ + {"provider": "kimi-coding", "model": "kimi-k2.5"}, + {"provider": "anthropic", "model": "claude-3"}, # No key + ] + available = filter_available_fallbacks(chain) + assert len(available) == 1 + assert available[0]["provider"] == "kimi-coding" + + def test_returns_empty_when_none_available(self): + with patch.dict(os.environ, {}, clear=True): + chain = [ + {"provider": "anthropic", "model": "claude-3"}, + {"provider": "kimi-coding", "model": "kimi-k2.5"}, + ] + available = filter_available_fallbacks(chain) + assert available == [] + + def test_preserves_order(self): + with patch.dict(os.environ, {"KIMI_API_KEY": "test", "ANTHROPIC_API_KEY": "test"}): + chain = [ + {"provider": "kimi-coding", "model": "kimi-k2.5"}, + {"provider": "anthropic", "model": "claude-3"}, + ] + available = filter_available_fallbacks(chain) + assert len(available) == 2 + assert available[0]["provider"] == "kimi-coding" + assert available[1]["provider"] == "anthropic" + + +class TestIntegration: + """Integration tests for the fallback router.""" + + def test_full_fallback_flow_for_anthropic_quota(self): + """Test the complete fallback flow when Anthropic quota is exceeded.""" + # Simulate Anthropic quota error + error = Exception("Rate limit exceeded: quota exceeded for model claude-3") + + # Verify error detection + assert is_quota_error(error, provider="anthropic") is True + + # Verify auto-fallback is enabled + assert should_auto_fallback("anthropic", error=error) is True + + # Get fallback chain + chain = get_auto_fallback_chain("anthropic") + assert len(chain) > 0 + + # Verify kimi-coding is first fallback + assert chain[0]["provider"] == "kimi-coding" + + def test_fallback_availability_checking(self): + """Test that fallback availability is properly checked.""" + with patch.dict(os.environ, {"KIMI_API_KEY": "test-key"}): + # Get default chain for anthropic + chain = get_default_fallback_chain("anthropic") + + # Filter to available + available = filter_available_fallbacks(chain) + + # Should have kimi-coding available + assert any(fb["provider"] == "kimi-coding" for fb in available) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])