test: add graceful degradation test scenarios (#919)
Some checks failed
Tests / lint (pull_request) Failing after 22s
Tests / test (pull_request) Has been skipped

Implements the missing test coverage called out in issue #919. Creates
tests/infrastructure/test_graceful_degradation.py covering:

* Ollama health-check failure modes (connection refused, timeout, HTTP 503/500, generic exception)
* requests library unavailable → optimistic availability for Ollama and vllm-mlx
* End-to-end cascade: Ollama fails → Anthropic/cloud fallback used
* Circuit-breaker lifecycle: CLOSED → OPEN (threshold failures) → HALF_OPEN (recovery timeout) → CLOSED (test successes)
* Disabled provider skipped without touching circuit state
* All providers fail → RuntimeError listing individual provider errors
* ClaudeBackend: no key / API error → graceful degraded messages; health_check
* GrokBackend: no key / API error → graceful degraded messages; health_check
* Chat store: SQLite directory auto-creation, concurrent-write safety, pruning, ordering

Fixes #919

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Alexander Whitestone
2026-03-23 21:49:03 -04:00
parent 2d6bfe6ba1
commit b10afc0676

View File

@@ -0,0 +1,589 @@
"""Graceful degradation test scenarios — Issue #919.
Tests specifically for service failure paths and fallback logic:
* Ollama health-check failures (connection refused, timeout, HTTP errors)
* Cascade router: Ollama down → falls back to Anthropic/cloud provider
* Circuit-breaker lifecycle: CLOSED → OPEN (repeated failures) → HALF_OPEN (recovery window)
* All providers fail → descriptive RuntimeError
* Disabled provider skipped without touching circuit breaker
* ``requests`` library unavailable → optimistic availability assumption
* ClaudeBackend / GrokBackend no-key graceful messages
* Chat store: SQLite directory auto-creation and concurrent access safety
"""
from __future__ import annotations
import threading
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from infrastructure.router.cascade import (
CascadeRouter,
CircuitState,
Provider,
ProviderStatus,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_ollama_provider(name: str = "local-ollama", priority: int = 1) -> Provider:
return Provider(
name=name,
type="ollama",
enabled=True,
priority=priority,
url="http://localhost:11434",
models=[{"name": "llama3", "default": True}],
)
def _make_anthropic_provider(name: str = "cloud-fallback", priority: int = 2) -> Provider:
return Provider(
name=name,
type="anthropic",
enabled=True,
priority=priority,
api_key="sk-ant-test",
models=[{"name": "claude-haiku-4-5-20251001", "default": True}],
)
# ---------------------------------------------------------------------------
# Ollama health-check failure scenarios
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestOllamaHealthCheckFailures:
"""_check_provider_available returns False for all Ollama failure modes."""
def _router(self) -> CascadeRouter:
return CascadeRouter(config_path=Path("/nonexistent"))
def test_connection_refused_returns_false(self):
"""Connection refused during Ollama health check → provider excluded."""
router = self._router()
provider = _make_ollama_provider()
with patch("infrastructure.router.cascade.requests") as mock_req:
mock_req.get.side_effect = ConnectionError("Connection refused")
assert router._check_provider_available(provider) is False
def test_timeout_returns_false(self):
"""Request timeout during Ollama health check → provider excluded."""
router = self._router()
provider = _make_ollama_provider()
with patch("infrastructure.router.cascade.requests") as mock_req:
# Simulate a timeout using a generic OSError (matches real-world timeout behaviour)
mock_req.get.side_effect = OSError("timed out")
assert router._check_provider_available(provider) is False
def test_http_503_returns_false(self):
"""HTTP 503 from Ollama health endpoint → provider excluded."""
router = self._router()
provider = _make_ollama_provider()
mock_response = MagicMock()
mock_response.status_code = 503
with patch("infrastructure.router.cascade.requests") as mock_req:
mock_req.get.return_value = mock_response
assert router._check_provider_available(provider) is False
def test_http_500_returns_false(self):
"""HTTP 500 from Ollama health endpoint → provider excluded."""
router = self._router()
provider = _make_ollama_provider()
mock_response = MagicMock()
mock_response.status_code = 500
with patch("infrastructure.router.cascade.requests") as mock_req:
mock_req.get.return_value = mock_response
assert router._check_provider_available(provider) is False
def test_generic_exception_returns_false(self):
"""Unexpected exception during Ollama check → provider excluded (no crash)."""
router = self._router()
provider = _make_ollama_provider()
with patch("infrastructure.router.cascade.requests") as mock_req:
mock_req.get.side_effect = RuntimeError("unexpected error")
assert router._check_provider_available(provider) is False
def test_requests_unavailable_assumes_available(self):
"""When ``requests`` lib is None, Ollama availability is assumed True."""
import infrastructure.router.cascade as cascade_module
router = self._router()
provider = _make_ollama_provider()
old_requests = cascade_module.requests
cascade_module.requests = None
try:
assert router._check_provider_available(provider) is True
finally:
cascade_module.requests = old_requests
# ---------------------------------------------------------------------------
# Cascade: Ollama fails → Anthropic fallback
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestOllamaToAnthropicFallback:
"""Cascade router falls back to Anthropic when Ollama is unavailable or failing."""
@pytest.mark.asyncio
async def test_ollama_connection_refused_falls_back_to_anthropic(self):
"""When Ollama raises a connection error, cascade uses Anthropic provider."""
router = CascadeRouter(config_path=Path("/nonexistent"))
ollama_provider = _make_ollama_provider(priority=1)
anthropic_provider = _make_anthropic_provider(priority=2)
router.providers = [ollama_provider, anthropic_provider]
with (
patch.object(router, "_call_ollama", side_effect=ConnectionError("refused")),
patch.object(
router,
"_call_anthropic",
new_callable=AsyncMock,
return_value={"content": "fallback response", "model": "claude-haiku-4-5-20251001"},
),
# Allow cloud bypass of the metabolic quota gate in test
patch.object(router, "_quota_allows_cloud", return_value=True),
):
result = await router.complete(
messages=[{"role": "user", "content": "hello"}],
model="llama3",
)
assert result["provider"] == "cloud-fallback"
assert "fallback response" in result["content"]
@pytest.mark.asyncio
async def test_ollama_circuit_open_skips_to_anthropic(self):
"""When Ollama circuit is OPEN, cascade skips directly to Anthropic."""
router = CascadeRouter(config_path=Path("/nonexistent"))
ollama_provider = _make_ollama_provider(priority=1)
anthropic_provider = _make_anthropic_provider(priority=2)
router.providers = [ollama_provider, anthropic_provider]
# Force the circuit open on Ollama
ollama_provider.circuit_state = CircuitState.OPEN
ollama_provider.status = ProviderStatus.UNHEALTHY
import time
ollama_provider.circuit_opened_at = time.time() # just opened — not yet recoverable
with (
patch.object(
router,
"_call_anthropic",
new_callable=AsyncMock,
return_value={"content": "cloud answer", "model": "claude-haiku-4-5-20251001"},
) as mock_anthropic,
# Allow cloud bypass of the metabolic quota gate in test
patch.object(router, "_quota_allows_cloud", return_value=True),
):
result = await router.complete(
messages=[{"role": "user", "content": "ping"}],
)
mock_anthropic.assert_called_once()
assert result["provider"] == "cloud-fallback"
@pytest.mark.asyncio
async def test_all_providers_fail_raises_runtime_error(self):
"""When every provider fails, RuntimeError is raised with combined error info."""
router = CascadeRouter(config_path=Path("/nonexistent"))
ollama_provider = _make_ollama_provider(priority=1)
anthropic_provider = _make_anthropic_provider(priority=2)
router.providers = [ollama_provider, anthropic_provider]
with (
patch.object(router, "_call_ollama", side_effect=RuntimeError("Ollama down")),
patch.object(router, "_call_anthropic", side_effect=RuntimeError("API quota exceeded")),
patch.object(router, "_quota_allows_cloud", return_value=True),
):
with pytest.raises(RuntimeError, match="All providers failed"):
await router.complete(messages=[{"role": "user", "content": "test"}])
@pytest.mark.asyncio
async def test_error_message_includes_individual_provider_errors(self):
"""RuntimeError from all-fail scenario lists each provider's error."""
router = CascadeRouter(config_path=Path("/nonexistent"))
ollama_provider = _make_ollama_provider(priority=1)
anthropic_provider = _make_anthropic_provider(priority=2)
router.providers = [ollama_provider, anthropic_provider]
router.config.max_retries_per_provider = 1
with (
patch.object(router, "_call_ollama", side_effect=RuntimeError("connection refused")),
patch.object(router, "_call_anthropic", side_effect=RuntimeError("rate limit")),
patch.object(router, "_quota_allows_cloud", return_value=True),
):
with pytest.raises(RuntimeError) as exc_info:
await router.complete(messages=[{"role": "user", "content": "test"}])
error_msg = str(exc_info.value)
assert "connection refused" in error_msg
assert "rate limit" in error_msg
# ---------------------------------------------------------------------------
# Circuit-breaker lifecycle
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestCircuitBreakerLifecycle:
"""Full CLOSED → OPEN → HALF_OPEN → CLOSED lifecycle."""
def test_closed_initially(self):
"""New provider starts with circuit CLOSED and HEALTHY status."""
provider = _make_ollama_provider()
assert provider.circuit_state == CircuitState.CLOSED
assert provider.status == ProviderStatus.HEALTHY
def test_open_after_threshold_failures(self):
"""Circuit opens once consecutive failures reach the threshold."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.circuit_breaker_failure_threshold = 3
provider = _make_ollama_provider()
for _ in range(3):
router._record_failure(provider)
assert provider.circuit_state == CircuitState.OPEN
assert provider.status == ProviderStatus.UNHEALTHY
assert provider.circuit_opened_at is not None
def test_open_circuit_skips_provider(self):
"""_is_provider_available returns False when circuit is OPEN (and timeout not elapsed)."""
import time
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.circuit_breaker_recovery_timeout = 9999 # won't elapse during test
provider = _make_ollama_provider()
provider.circuit_state = CircuitState.OPEN
provider.status = ProviderStatus.UNHEALTHY
provider.circuit_opened_at = time.time()
assert router._is_provider_available(provider) is False
def test_half_open_after_recovery_timeout(self):
"""After the recovery timeout elapses, _is_provider_available transitions to HALF_OPEN."""
import time
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.circuit_breaker_recovery_timeout = 0.01 # 10 ms
provider = _make_ollama_provider()
provider.circuit_state = CircuitState.OPEN
provider.status = ProviderStatus.UNHEALTHY
provider.circuit_opened_at = time.time() - 1.0 # clearly elapsed
result = router._is_provider_available(provider)
assert result is True
assert provider.circuit_state == CircuitState.HALF_OPEN
def test_closed_after_half_open_successes(self):
"""Circuit closes after enough successful half-open test calls."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.circuit_breaker_half_open_max_calls = 2
provider = _make_ollama_provider()
provider.circuit_state = CircuitState.HALF_OPEN
provider.half_open_calls = 0
router._record_success(provider, 50.0)
assert provider.circuit_state == CircuitState.HALF_OPEN # not yet
router._record_success(provider, 50.0)
assert provider.circuit_state == CircuitState.CLOSED
assert provider.status == ProviderStatus.HEALTHY
assert provider.metrics.consecutive_failures == 0
def test_failure_in_half_open_reopens_circuit(self):
"""A failure during HALF_OPEN increments consecutive failures, reopening if threshold met."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.circuit_breaker_failure_threshold = 1 # reopen on first failure
provider = _make_ollama_provider()
provider.circuit_state = CircuitState.HALF_OPEN
router._record_failure(provider)
assert provider.circuit_state == CircuitState.OPEN
def test_disabled_provider_skipped_without_circuit_change(self):
"""A disabled provider is immediately rejected; its circuit state is not touched."""
router = CascadeRouter(config_path=Path("/nonexistent"))
provider = _make_ollama_provider()
provider.enabled = False
available = router._is_provider_available(provider)
assert available is False
assert provider.circuit_state == CircuitState.CLOSED # unchanged
# ---------------------------------------------------------------------------
# ClaudeBackend graceful degradation
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestClaudeBackendGracefulDegradation:
"""ClaudeBackend degrades gracefully when the API is unavailable."""
def test_run_no_key_returns_unconfigured_message(self):
"""run() returns a graceful message when no API key is set."""
from timmy.backends import ClaudeBackend
backend = ClaudeBackend(api_key="", model="haiku")
result = backend.run("hello")
assert "not configured" in result.content.lower()
assert "ANTHROPIC_API_KEY" in result.content
def test_run_api_error_returns_unavailable_message(self):
"""run() returns a graceful error when the Anthropic API raises."""
from timmy.backends import ClaudeBackend
backend = ClaudeBackend(api_key="sk-ant-test", model="haiku")
mock_client = MagicMock()
mock_client.messages.create.side_effect = ConnectionError("API unreachable")
with patch.object(backend, "_get_client", return_value=mock_client):
result = backend.run("ping")
assert "unavailable" in result.content.lower()
def test_health_check_no_key_reports_error(self):
"""health_check() reports not-ok when API key is missing."""
from timmy.backends import ClaudeBackend
backend = ClaudeBackend(api_key="", model="haiku")
status = backend.health_check()
assert status["ok"] is False
assert "ANTHROPIC_API_KEY" in status["error"]
def test_health_check_api_error_reports_error(self):
"""health_check() returns ok=False and captures the error on API failure."""
from timmy.backends import ClaudeBackend
backend = ClaudeBackend(api_key="sk-ant-test", model="haiku")
mock_client = MagicMock()
mock_client.messages.create.side_effect = RuntimeError("connection timed out")
with patch.object(backend, "_get_client", return_value=mock_client):
status = backend.health_check()
assert status["ok"] is False
assert "connection timed out" in status["error"]
# ---------------------------------------------------------------------------
# GrokBackend graceful degradation
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestGrokBackendGracefulDegradation:
"""GrokBackend degrades gracefully when xAI API is unavailable."""
def test_run_no_key_returns_unconfigured_message(self):
"""run() returns a graceful message when no XAI_API_KEY is set."""
from timmy.backends import GrokBackend
backend = GrokBackend(api_key="", model="grok-3-mini")
result = backend.run("hello")
assert "not configured" in result.content.lower()
def test_run_api_error_returns_unavailable_message(self):
"""run() returns graceful error when xAI API raises."""
from timmy.backends import GrokBackend
backend = GrokBackend(api_key="xai-test-key", model="grok-3-mini")
mock_client = MagicMock()
mock_client.chat.completions.create.side_effect = RuntimeError("network error")
with patch.object(backend, "_get_client", return_value=mock_client):
result = backend.run("ping")
assert "unavailable" in result.content.lower()
def test_health_check_no_key_reports_error(self):
"""health_check() reports not-ok when XAI_API_KEY is missing."""
from timmy.backends import GrokBackend
backend = GrokBackend(api_key="", model="grok-3-mini")
status = backend.health_check()
assert status["ok"] is False
assert "XAI_API_KEY" in status["error"]
# ---------------------------------------------------------------------------
# Chat store: SQLite resilience
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestChatStoreSQLiteResilience:
"""MessageLog handles edge cases without crashing."""
def test_auto_creates_missing_parent_directory(self, tmp_path):
"""MessageLog creates the data directory automatically on first use."""
from infrastructure.chat_store import MessageLog
db_path = tmp_path / "deep" / "nested" / "chat.db"
assert not db_path.parent.exists()
log = MessageLog(db_path=db_path)
log.append("user", "hello", "2026-01-01T00:00:00")
assert db_path.exists()
assert len(log) == 1
log.close()
def test_concurrent_appends_are_safe(self, tmp_path):
"""Multiple threads appending simultaneously do not corrupt the DB."""
from infrastructure.chat_store import MessageLog
db_path = tmp_path / "chat.db"
log = MessageLog(db_path=db_path)
errors: list[Exception] = []
def write_messages(thread_id: int) -> None:
try:
for i in range(10):
log.append("user", f"thread {thread_id} msg {i}", "2026-01-01T00:00:00")
except Exception as exc:
errors.append(exc)
threads = [threading.Thread(target=write_messages, args=(t,)) for t in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
assert errors == [], f"Concurrent writes produced errors: {errors}"
# 5 threads × 10 messages each
assert len(log) == 50
log.close()
def test_all_returns_messages_in_insertion_order(self, tmp_path):
"""all() returns messages ordered oldest-first."""
from infrastructure.chat_store import MessageLog
db_path = tmp_path / "chat.db"
log = MessageLog(db_path=db_path)
log.append("user", "first", "2026-01-01T00:00:00")
log.append("agent", "second", "2026-01-01T00:00:01")
log.append("user", "third", "2026-01-01T00:00:02")
messages = log.all()
assert [m.content for m in messages] == ["first", "second", "third"]
log.close()
def test_recent_returns_latest_n_messages(self, tmp_path):
"""recent(n) returns the n most recent messages, oldest-first within the slice."""
from infrastructure.chat_store import MessageLog
db_path = tmp_path / "chat.db"
log = MessageLog(db_path=db_path)
for i in range(20):
log.append("user", f"msg {i}", f"2026-01-01T00:{i:02d}:00")
recent = log.recent(5)
assert len(recent) == 5
assert recent[0].content == "msg 15"
assert recent[-1].content == "msg 19"
log.close()
def test_prune_keeps_max_messages(self, tmp_path):
"""append() prunes oldest messages when count exceeds MAX_MESSAGES."""
import infrastructure.chat_store as store_mod
from infrastructure.chat_store import MessageLog
original_max = store_mod.MAX_MESSAGES
store_mod.MAX_MESSAGES = 5
try:
db_path = tmp_path / "chat.db"
log = MessageLog(db_path=db_path)
for i in range(8):
log.append("user", f"msg {i}", "2026-01-01T00:00:00")
assert len(log) == 5
messages = log.all()
# Oldest 3 should be pruned
assert messages[0].content == "msg 3"
log.close()
finally:
store_mod.MAX_MESSAGES = original_max
# ---------------------------------------------------------------------------
# Provider availability: requests lib missing
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestRequestsLibraryMissing:
"""When ``requests`` is not installed, providers assume they are available."""
def _swap_requests(self, value):
import infrastructure.router.cascade as cascade_module
old = cascade_module.requests
cascade_module.requests = value
return old
def test_ollama_assumes_available_without_requests(self):
"""Ollama provider returns True when requests is None."""
import infrastructure.router.cascade as cascade_module
router = CascadeRouter(config_path=Path("/nonexistent"))
provider = _make_ollama_provider()
old = self._swap_requests(None)
try:
assert router._check_provider_available(provider) is True
finally:
cascade_module.requests = old
def test_vllm_mlx_assumes_available_without_requests(self):
"""vllm-mlx provider returns True when requests is None."""
import infrastructure.router.cascade as cascade_module
router = CascadeRouter(config_path=Path("/nonexistent"))
provider = Provider(
name="vllm-local",
type="vllm_mlx",
enabled=True,
priority=1,
base_url="http://localhost:8000/v1",
)
old = self._swap_requests(None)
try:
assert router._check_provider_available(provider) is True
finally:
cascade_module.requests = old