feat: implement automatic kimi-coding fallback on quota errors

This commit is contained in:
Allegro
2026-03-31 19:35:54 +00:00
parent 37c75ecd7a
commit 5ef812d581
3 changed files with 719 additions and 2 deletions

404
agent/fallback_router.py Normal file
View File

@@ -0,0 +1,404 @@
"""Automatic fallback router for handling provider quota and rate limit errors.
This module provides intelligent fallback detection and routing when the primary
provider (e.g., Anthropic) encounters quota limitations or rate limits.
Features:
- Detects quota/rate limit errors from different providers
- Automatic fallback to kimi-coding when Anthropic quota is exceeded
- Configurable fallback chains with default anthropic -> kimi-coding
- Logging and monitoring of fallback events
Usage:
from agent.fallback_router import (
is_quota_error,
get_default_fallback_chain,
should_auto_fallback,
)
if is_quota_error(error, provider="anthropic"):
if should_auto_fallback(provider="anthropic"):
fallback_chain = get_default_fallback_chain("anthropic")
"""
import logging
import os
from typing import Dict, List, Optional, Any, Tuple
logger = logging.getLogger(__name__)
# Default fallback chains per provider
# Each chain is a list of fallback configurations tried in order
DEFAULT_FALLBACK_CHAINS: Dict[str, List[Dict[str, Any]]] = {
"anthropic": [
{"provider": "kimi-coding", "model": "kimi-k2.5"},
{"provider": "openrouter", "model": "anthropic/claude-sonnet-4"},
],
"openrouter": [
{"provider": "kimi-coding", "model": "kimi-k2.5"},
{"provider": "zai", "model": "glm-5"},
],
"kimi-coding": [
{"provider": "openrouter", "model": "anthropic/claude-sonnet-4"},
{"provider": "zai", "model": "glm-5"},
],
"zai": [
{"provider": "openrouter", "model": "anthropic/claude-sonnet-4"},
{"provider": "kimi-coding", "model": "kimi-k2.5"},
],
}
# Quota/rate limit error patterns by provider
# These are matched (case-insensitive) against error messages
QUOTA_ERROR_PATTERNS: Dict[str, List[str]] = {
"anthropic": [
"rate limit",
"ratelimit",
"quota exceeded",
"quota exceeded",
"insufficient quota",
"429",
"403",
"too many requests",
"capacity exceeded",
"over capacity",
"temporarily unavailable",
"server overloaded",
"resource exhausted",
"billing threshold",
"credit balance",
"payment required",
"402",
],
"openrouter": [
"rate limit",
"ratelimit",
"quota exceeded",
"insufficient credits",
"429",
"402",
"no endpoints available",
"all providers failed",
"over capacity",
],
"kimi-coding": [
"rate limit",
"ratelimit",
"quota exceeded",
"429",
"insufficient balance",
],
"zai": [
"rate limit",
"ratelimit",
"quota exceeded",
"429",
"insufficient quota",
],
}
# HTTP status codes indicating quota/rate limit issues
QUOTA_STATUS_CODES = {429, 402, 403}
def is_quota_error(error: Exception, provider: Optional[str] = None) -> bool:
"""Detect if an error is quota/rate limit related.
Args:
error: The exception to check
provider: Optional provider name to check provider-specific patterns
Returns:
True if the error appears to be quota/rate limit related
"""
if error is None:
return False
error_str = str(error).lower()
error_type = type(error).__name__.lower()
# Check for common rate limit exception types
if any(term in error_type for term in [
"ratelimit", "rate_limit", "quota", "toomanyrequests",
"insufficient_quota", "billing", "payment"
]):
return True
# Check HTTP status code if available
status_code = getattr(error, "status_code", None)
if status_code is None:
# Try common attribute names
for attr in ["code", "http_status", "response_code", "status"]:
if hasattr(error, attr):
try:
status_code = int(getattr(error, attr))
break
except (TypeError, ValueError):
continue
if status_code in QUOTA_STATUS_CODES:
return True
# Check provider-specific patterns
providers_to_check = [provider] if provider else QUOTA_ERROR_PATTERNS.keys()
for prov in providers_to_check:
patterns = QUOTA_ERROR_PATTERNS.get(prov, [])
for pattern in patterns:
if pattern.lower() in error_str:
logger.debug(
"Detected %s quota error pattern '%s' in: %s",
prov, pattern, error
)
return True
# Check generic quota patterns
generic_patterns = [
"rate limit exceeded",
"quota exceeded",
"too many requests",
"capacity exceeded",
"temporarily unavailable",
"try again later",
"resource exhausted",
"billing",
"payment required",
"insufficient credits",
"insufficient quota",
]
for pattern in generic_patterns:
if pattern in error_str:
return True
return False
def get_default_fallback_chain(
primary_provider: str,
exclude_provider: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""Get the default fallback chain for a primary provider.
Args:
primary_provider: The primary provider name
exclude_provider: Optional provider to exclude from the chain
Returns:
List of fallback configurations
"""
chain = DEFAULT_FALLBACK_CHAINS.get(primary_provider, [])
# Filter out excluded provider if specified
if exclude_provider:
chain = [
fb for fb in chain
if fb.get("provider") != exclude_provider
]
return list(chain)
def should_auto_fallback(
provider: str,
error: Optional[Exception] = None,
auto_fallback_enabled: Optional[bool] = None,
) -> bool:
"""Determine if automatic fallback should be attempted.
Args:
provider: The current provider name
error: Optional error to check for quota issues
auto_fallback_enabled: Optional override for auto-fallback setting
Returns:
True if automatic fallback should be attempted
"""
# Check environment variable override
if auto_fallback_enabled is None:
env_setting = os.getenv("HERMES_AUTO_FALLBACK", "true").lower()
auto_fallback_enabled = env_setting in ("true", "1", "yes", "on")
if not auto_fallback_enabled:
return False
# Check if provider has a configured fallback chain
if provider not in DEFAULT_FALLBACK_CHAINS:
# Still allow fallback if it's a quota error with generic handling
if error and is_quota_error(error):
logger.debug(
"Provider %s has no fallback chain but quota error detected",
provider
)
return True
return False
# If there's an error, only fallback on quota/rate limit errors
if error is not None:
return is_quota_error(error, provider)
# No error but fallback chain exists - allow eager fallback for
# providers known to have quota issues
return provider in ("anthropic",)
def log_fallback_event(
from_provider: str,
to_provider: str,
to_model: str,
reason: str,
error: Optional[Exception] = None,
) -> None:
"""Log a fallback event for monitoring.
Args:
from_provider: The provider we're falling back from
to_provider: The provider we're falling back to
to_model: The model we're falling back to
reason: The reason for the fallback
error: Optional error that triggered the fallback
"""
log_data = {
"event": "provider_fallback",
"from_provider": from_provider,
"to_provider": to_provider,
"to_model": to_model,
"reason": reason,
}
if error:
log_data["error_type"] = type(error).__name__
log_data["error_message"] = str(error)[:200]
logger.info("Provider fallback: %s -> %s (%s) | Reason: %s",
from_provider, to_provider, to_model, reason)
# Also log structured data for monitoring
logger.debug("Fallback event data: %s", log_data)
def resolve_fallback_with_credentials(
fallback_config: Dict[str, Any],
) -> Tuple[Optional[Any], Optional[str]]:
"""Resolve a fallback configuration to a client and model.
Args:
fallback_config: Fallback configuration dict with provider and model
Returns:
Tuple of (client, model) or (None, None) if credentials not available
"""
from agent.auxiliary_client import resolve_provider_client
provider = fallback_config.get("provider")
model = fallback_config.get("model")
if not provider or not model:
return None, None
try:
client, resolved_model = resolve_provider_client(
provider,
model=model,
raw_codex=True,
)
return client, resolved_model or model
except Exception as exc:
logger.debug(
"Failed to resolve fallback provider %s: %s",
provider, exc
)
return None, None
def get_auto_fallback_chain(
primary_provider: str,
user_fallback_chain: Optional[List[Dict[str, Any]]] = None,
) -> List[Dict[str, Any]]:
"""Get the effective fallback chain for automatic fallback.
Combines user-provided fallback chain with default automatic fallback chain.
Args:
primary_provider: The primary provider name
user_fallback_chain: Optional user-provided fallback chain
Returns:
The effective fallback chain to use
"""
# Use user-provided chain if available
if user_fallback_chain:
return user_fallback_chain
# Otherwise use default chain for the provider
return get_default_fallback_chain(primary_provider)
def is_fallback_available(
fallback_config: Dict[str, Any],
) -> bool:
"""Check if a fallback configuration has available credentials.
Args:
fallback_config: Fallback configuration dict
Returns:
True if credentials are available for the fallback provider
"""
provider = fallback_config.get("provider")
if not provider:
return False
# Check environment variables for API keys
env_vars = {
"anthropic": ["ANTHROPIC_API_KEY", "ANTHROPIC_TOKEN"],
"kimi-coding": ["KIMI_API_KEY", "KIMI_API_TOKEN"],
"zai": ["ZAI_API_KEY", "Z_AI_API_KEY"],
"openrouter": ["OPENROUTER_API_KEY"],
"minimax": ["MINIMAX_API_KEY"],
"minimax-cn": ["MINIMAX_CN_API_KEY"],
"deepseek": ["DEEPSEEK_API_KEY"],
"alibaba": ["DASHSCOPE_API_KEY", "ALIBABA_API_KEY"],
"nous": ["NOUS_AGENT_KEY", "NOUS_ACCESS_TOKEN"],
}
keys_to_check = env_vars.get(provider, [f"{provider.upper()}_API_KEY"])
for key in keys_to_check:
if os.getenv(key):
return True
# Check auth.json for OAuth providers
if provider in ("nous", "openai-codex"):
try:
from hermes_cli.config import get_hermes_home
auth_path = get_hermes_home() / "auth.json"
if auth_path.exists():
import json
data = json.loads(auth_path.read_text())
if data.get("active_provider") == provider:
return True
# Check for provider in providers dict
if data.get("providers", {}).get(provider):
return True
except Exception:
pass
return False
def filter_available_fallbacks(
fallback_chain: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""Filter a fallback chain to only include providers with credentials.
Args:
fallback_chain: List of fallback configurations
Returns:
Filtered list with only available fallbacks
"""
return [
fb for fb in fallback_chain
if is_fallback_available(fb)
]

View File

@@ -100,6 +100,13 @@ from agent.trajectory import (
convert_scratchpad_to_think, has_incomplete_scratchpad,
save_trajectory as _save_trajectory_to_file,
)
from agent.fallback_router import (
is_quota_error,
get_auto_fallback_chain,
log_fallback_event,
should_auto_fallback,
filter_available_fallbacks,
)
from utils import atomic_json_write
HONCHO_TOOL_NAMES = {
@@ -909,6 +916,20 @@ class AIAgent:
self._fallback_chain = [fallback_model]
else:
self._fallback_chain = []
# Auto-enable fallback for Anthropic (and other providers) when no
# explicit fallback chain is configured. This provides automatic
# failover to kimi-coding when Anthropic quota is limited.
if not self._fallback_chain and should_auto_fallback(self.provider):
auto_chain = get_auto_fallback_chain(self.provider)
# Filter to only include fallbacks with available credentials
available_chain = filter_available_fallbacks(auto_chain)
if available_chain:
self._fallback_chain = available_chain
if not self.quiet_mode:
print(f"🔄 Auto-fallback enabled: {self.provider}" +
"".join(f"{f['model']} ({f['provider']})" for f in available_chain))
self._fallback_index = 0
self._fallback_activated = False
# Legacy attribute kept for backward compat (tests, external callers)
@@ -4565,6 +4586,12 @@ class AIAgent:
f"🔄 Primary model failed — switching to fallback: "
f"{fb_model} via {fb_provider}"
)
log_fallback_event(
from_provider=self.provider,
to_provider=fb_provider,
to_model=fb_model,
reason="quota_or_rate_limit",
)
logging.info(
"Fallback activated: %s%s (%s)",
old_model, fb_model, fb_provider,
@@ -7141,8 +7168,14 @@ class AIAgent:
or "usage limit" in error_msg
or "quota" in error_msg
)
if is_rate_limited and self._fallback_index < len(self._fallback_chain):
self._emit_status("⚠️ Rate limited — switching to fallback provider...")
# Also check using the quota error detector for provider-specific patterns
is_quota_error_result = is_quota_error(api_error, self.provider)
if (is_rate_limited or is_quota_error_result) and self._fallback_index < len(self._fallback_chain):
if is_quota_error_result:
self._emit_status(f"⚠️ {self.provider} quota exceeded — switching to fallback provider...")
else:
self._emit_status("⚠️ Rate limited — switching to fallback provider...")
if self._try_activate_fallback():
retry_count = 0
continue

View File

@@ -0,0 +1,280 @@
"""Tests for the automatic fallback router module.
Tests quota error detection, fallback chain resolution, and auto-fallback logic.
"""
import os
import pytest
from unittest.mock import MagicMock, patch
from agent.fallback_router import (
is_quota_error,
get_default_fallback_chain,
should_auto_fallback,
log_fallback_event,
get_auto_fallback_chain,
is_fallback_available,
filter_available_fallbacks,
QUOTA_STATUS_CODES,
DEFAULT_FALLBACK_CHAINS,
)
class TestIsQuotaError:
"""Tests for quota error detection."""
def test_none_error_returns_false(self):
assert is_quota_error(None) is False
def test_rate_limit_status_code_429(self):
error = MagicMock()
error.status_code = 429
error.__str__ = lambda self: "Rate limit exceeded"
assert is_quota_error(error) is True
def test_payment_required_status_code_402(self):
error = MagicMock()
error.status_code = 402
error.__str__ = lambda self: "Payment required"
assert is_quota_error(error) is True
def test_forbidden_status_code_403(self):
error = MagicMock()
error.status_code = 403
error.__str__ = lambda self: "Forbidden"
assert is_quota_error(error) is True
def test_anthropic_quota_patterns(self):
patterns = [
"Rate limit exceeded",
"quota exceeded",
"insufficient quota",
"capacity exceeded",
"over capacity",
"billing threshold reached",
"credit balance too low",
]
for pattern in patterns:
error = Exception(pattern)
assert is_quota_error(error, provider="anthropic") is True, f"Failed for: {pattern}"
def test_anthropic_error_type_detection(self):
class RateLimitError(Exception):
pass
error = RateLimitError("Too many requests")
assert is_quota_error(error) is True
def test_non_quota_error(self):
error = Exception("Some random error")
assert is_quota_error(error) is False
def test_context_length_error_not_quota(self):
error = Exception("Context length exceeded")
assert is_quota_error(error) is False
def test_provider_specific_patterns(self):
# Test openrouter patterns
error = Exception("Insufficient credits")
assert is_quota_error(error, provider="openrouter") is True
# Test kimi patterns
error = Exception("Insufficient balance")
assert is_quota_error(error, provider="kimi-coding") is True
class TestGetDefaultFallbackChain:
"""Tests for default fallback chain retrieval."""
def test_anthropic_fallback_chain(self):
chain = get_default_fallback_chain("anthropic")
assert len(chain) >= 1
assert chain[0]["provider"] == "kimi-coding"
assert chain[0]["model"] == "kimi-k2.5"
def test_openrouter_fallback_chain(self):
chain = get_default_fallback_chain("openrouter")
assert len(chain) >= 1
assert any(fb["provider"] == "kimi-coding" for fb in chain)
def test_unknown_provider_returns_empty(self):
chain = get_default_fallback_chain("unknown_provider")
assert chain == []
def test_exclude_provider(self):
chain = get_default_fallback_chain("anthropic", exclude_provider="kimi-coding")
assert all(fb["provider"] != "kimi-coding" for fb in chain)
class TestShouldAutoFallback:
"""Tests for auto-fallback decision logic."""
def test_auto_fallback_enabled_by_default(self):
with patch.dict(os.environ, {"HERMES_AUTO_FALLBACK": "true"}):
assert should_auto_fallback("anthropic") is True
def test_auto_fallback_disabled_via_env(self):
with patch.dict(os.environ, {"HERMES_AUTO_FALLBACK": "false"}):
assert should_auto_fallback("anthropic") is False
def test_auto_fallback_disabled_via_override(self):
assert should_auto_fallback("anthropic", auto_fallback_enabled=False) is False
def test_quota_error_triggers_fallback(self):
error = Exception("Rate limit exceeded")
assert should_auto_fallback("unknown_provider", error=error) is True
def test_non_quota_error_no_fallback(self):
error = Exception("Some random error")
# Unknown provider with non-quota error should not fallback
assert should_auto_fallback("unknown_provider", error=error) is False
def test_anthropic_eager_fallback(self):
# Anthropic falls back eagerly even without error
assert should_auto_fallback("anthropic") is True
class TestLogFallbackEvent:
"""Tests for fallback event logging."""
def test_log_fallback_event(self):
with patch("agent.fallback_router.logger") as mock_logger:
log_fallback_event(
from_provider="anthropic",
to_provider="kimi-coding",
to_model="kimi-k2.5",
reason="quota_exceeded",
)
mock_logger.info.assert_called_once()
# Check the arguments passed to logger.info
call_args = mock_logger.info.call_args[0]
# First arg is format string, remaining are the values
assert len(call_args) >= 4
assert "anthropic" in call_args # Provider names are in the args
assert "kimi-coding" in call_args
def test_log_fallback_event_with_error(self):
error = Exception("Rate limit exceeded")
with patch("agent.fallback_router.logger") as mock_logger:
log_fallback_event(
from_provider="anthropic",
to_provider="kimi-coding",
to_model="kimi-k2.5",
reason="quota_exceeded",
error=error,
)
mock_logger.info.assert_called_once()
mock_logger.debug.assert_called_once()
class TestGetAutoFallbackChain:
"""Tests for automatic fallback chain resolution."""
def test_user_chain_takes_precedence(self):
user_chain = [{"provider": "zai", "model": "glm-5"}]
chain = get_auto_fallback_chain("anthropic", user_fallback_chain=user_chain)
assert chain == user_chain
def test_default_chain_when_no_user_chain(self):
chain = get_auto_fallback_chain("anthropic")
assert chain == DEFAULT_FALLBACK_CHAINS["anthropic"]
class TestIsFallbackAvailable:
"""Tests for fallback availability checking."""
def test_anthropic_available_with_key(self):
with patch.dict(os.environ, {"ANTHROPIC_API_KEY": "test-key"}):
config = {"provider": "anthropic", "model": "claude-3"}
assert is_fallback_available(config) is True
def test_anthropic_unavailable_without_key(self):
with patch.dict(os.environ, {}, clear=True):
config = {"provider": "anthropic", "model": "claude-3"}
assert is_fallback_available(config) is False
def test_kimi_available_with_key(self):
with patch.dict(os.environ, {"KIMI_API_KEY": "test-key"}):
config = {"provider": "kimi-coding", "model": "kimi-k2.5"}
assert is_fallback_available(config) is True
def test_kimi_available_with_token(self):
with patch.dict(os.environ, {"KIMI_API_TOKEN": "test-token"}):
config = {"provider": "kimi-coding", "model": "kimi-k2.5"}
assert is_fallback_available(config) is True
def test_invalid_config_returns_false(self):
assert is_fallback_available({}) is False
assert is_fallback_available({"provider": ""}) is False
class TestFilterAvailableFallbacks:
"""Tests for filtering available fallbacks."""
def test_filters_unavailable_providers(self):
with patch.dict(os.environ, {"KIMI_API_KEY": "test-key"}):
chain = [
{"provider": "kimi-coding", "model": "kimi-k2.5"},
{"provider": "anthropic", "model": "claude-3"}, # No key
]
available = filter_available_fallbacks(chain)
assert len(available) == 1
assert available[0]["provider"] == "kimi-coding"
def test_returns_empty_when_none_available(self):
with patch.dict(os.environ, {}, clear=True):
chain = [
{"provider": "anthropic", "model": "claude-3"},
{"provider": "kimi-coding", "model": "kimi-k2.5"},
]
available = filter_available_fallbacks(chain)
assert available == []
def test_preserves_order(self):
with patch.dict(os.environ, {"KIMI_API_KEY": "test", "ANTHROPIC_API_KEY": "test"}):
chain = [
{"provider": "kimi-coding", "model": "kimi-k2.5"},
{"provider": "anthropic", "model": "claude-3"},
]
available = filter_available_fallbacks(chain)
assert len(available) == 2
assert available[0]["provider"] == "kimi-coding"
assert available[1]["provider"] == "anthropic"
class TestIntegration:
"""Integration tests for the fallback router."""
def test_full_fallback_flow_for_anthropic_quota(self):
"""Test the complete fallback flow when Anthropic quota is exceeded."""
# Simulate Anthropic quota error
error = Exception("Rate limit exceeded: quota exceeded for model claude-3")
# Verify error detection
assert is_quota_error(error, provider="anthropic") is True
# Verify auto-fallback is enabled
assert should_auto_fallback("anthropic", error=error) is True
# Get fallback chain
chain = get_auto_fallback_chain("anthropic")
assert len(chain) > 0
# Verify kimi-coding is first fallback
assert chain[0]["provider"] == "kimi-coding"
def test_fallback_availability_checking(self):
"""Test that fallback availability is properly checked."""
with patch.dict(os.environ, {"KIMI_API_KEY": "test-key"}):
# Get default chain for anthropic
chain = get_default_fallback_chain("anthropic")
# Filter to available
available = filter_available_fallbacks(chain)
# Should have kimi-coding available
assert any(fb["provider"] == "kimi-coding" for fb in available)
if __name__ == "__main__":
pytest.main([__file__, "-v"])