diff --git a/src/infrastructure/router/__init__.py b/src/infrastructure/router/__init__.py index 49d97ee..dfe39c1 100644 --- a/src/infrastructure/router/__init__.py +++ b/src/infrastructure/router/__init__.py @@ -2,6 +2,7 @@ from .api import router from .cascade import CascadeRouter, Provider, ProviderStatus, get_router +from .history import HealthHistoryStore, get_history_store __all__ = [ "CascadeRouter", @@ -9,4 +10,6 @@ __all__ = [ "ProviderStatus", "get_router", "router", + "HealthHistoryStore", + "get_history_store", ] diff --git a/src/infrastructure/router/api.py b/src/infrastructure/router/api.py index 4175a8c..9bd00c6 100644 --- a/src/infrastructure/router/api.py +++ b/src/infrastructure/router/api.py @@ -8,6 +8,7 @@ from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from .cascade import CascadeRouter, get_router +from .history import HealthHistoryStore, get_history_store logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/v1/router", tags=["router"]) @@ -199,6 +200,17 @@ async def reload_config( raise HTTPException(status_code=500, detail=f"Reload failed: {exc}") from exc +@router.get("/history") +async def get_history( + hours: int = 24, + store: Annotated[HealthHistoryStore, Depends(get_history_store)] = None, +) -> list[dict[str, Any]]: + """Get provider health history for the last N hours.""" + if store is None: + store = get_history_store() + return store.get_history(hours=hours) + + @router.get("/config") async def get_config( cascade: Annotated[CascadeRouter, Depends(get_cascade_router)], diff --git a/src/infrastructure/router/history.py b/src/infrastructure/router/history.py new file mode 100644 index 0000000..06171ee --- /dev/null +++ b/src/infrastructure/router/history.py @@ -0,0 +1,152 @@ +"""Provider health history — time-series snapshots for dashboard visualization.""" + +import asyncio +import logging +import sqlite3 +from datetime import UTC, datetime, timedelta +from pathlib import Path + +logger = logging.getLogger(__name__) + +_store: "HealthHistoryStore | None" = None + + +class HealthHistoryStore: + """Stores timestamped provider health snapshots in SQLite.""" + + def __init__(self, db_path: str = "data/router_history.db") -> None: + self.db_path = db_path + if db_path != ":memory:": + Path(db_path).parent.mkdir(parents=True, exist_ok=True) + self._conn = sqlite3.connect(db_path, check_same_thread=False) + self._conn.row_factory = sqlite3.Row + self._init_schema() + self._bg_task: asyncio.Task | None = None + + def _init_schema(self) -> None: + self._conn.execute(""" + CREATE TABLE IF NOT EXISTS snapshots ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL, + provider_name TEXT NOT NULL, + status TEXT NOT NULL, + error_rate REAL NOT NULL, + avg_latency_ms REAL NOT NULL, + circuit_state TEXT NOT NULL, + total_requests INTEGER NOT NULL + ) + """) + self._conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_snapshots_ts + ON snapshots(timestamp) + """) + self._conn.commit() + + def record_snapshot(self, providers: list[dict]) -> None: + """Record a health snapshot for all providers.""" + ts = datetime.now(UTC).isoformat() + rows = [ + ( + ts, + p["name"], + p["status"], + p["error_rate"], + p["avg_latency_ms"], + p["circuit_state"], + p["total_requests"], + ) + for p in providers + ] + self._conn.executemany( + """INSERT INTO snapshots + (timestamp, provider_name, status, error_rate, + avg_latency_ms, circuit_state, total_requests) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + rows, + ) + self._conn.commit() + + def get_history(self, hours: int = 24) -> list[dict]: + """Return snapshots from the last N hours, grouped by timestamp.""" + cutoff = (datetime.now(UTC) - timedelta(hours=hours)).isoformat() + rows = self._conn.execute( + """SELECT timestamp, provider_name, status, error_rate, + avg_latency_ms, circuit_state, total_requests + FROM snapshots WHERE timestamp >= ? ORDER BY timestamp""", + (cutoff,), + ).fetchall() + + # Group by timestamp + snapshots: dict[str, list[dict]] = {} + for row in rows: + ts = row["timestamp"] + if ts not in snapshots: + snapshots[ts] = [] + snapshots[ts].append( + { + "name": row["provider_name"], + "status": row["status"], + "error_rate": row["error_rate"], + "avg_latency_ms": row["avg_latency_ms"], + "circuit_state": row["circuit_state"], + "total_requests": row["total_requests"], + } + ) + + return [{"timestamp": ts, "providers": providers} for ts, providers in snapshots.items()] + + def prune(self, keep_hours: int = 168) -> int: + """Remove snapshots older than keep_hours. Returns rows deleted.""" + cutoff = (datetime.now(UTC) - timedelta(hours=keep_hours)).isoformat() + cursor = self._conn.execute("DELETE FROM snapshots WHERE timestamp < ?", (cutoff,)) + self._conn.commit() + return cursor.rowcount + + def close(self) -> None: + """Close the database connection.""" + if self._bg_task and not self._bg_task.done(): + self._bg_task.cancel() + self._conn.close() + + def _capture_snapshot(self, cascade_router) -> None: # noqa: ANN001 + """Capture current provider state as a snapshot.""" + providers = [] + for p in cascade_router.providers: + providers.append( + { + "name": p.name, + "status": p.status.value, + "error_rate": round(p.metrics.error_rate, 4), + "avg_latency_ms": round(p.metrics.avg_latency_ms, 2), + "circuit_state": p.circuit_state.value, + "total_requests": p.metrics.total_requests, + } + ) + self.record_snapshot(providers) + + async def start_background_task( + self, + cascade_router, + interval_seconds: int = 60, # noqa: ANN001 + ) -> None: + """Start periodic snapshot capture.""" + + async def _loop() -> None: + while True: + try: + self._capture_snapshot(cascade_router) + logger.debug("Recorded health snapshot") + except Exception: + logger.exception("Failed to record health snapshot") + await asyncio.sleep(interval_seconds) + + self._bg_task = asyncio.create_task(_loop()) + logger.info("Health history background task started (interval=%ds)", interval_seconds) + + +def get_history_store() -> HealthHistoryStore: + """Get or create the singleton history store.""" + global _store # noqa: PLW0603 + if _store is None: + _store = HealthHistoryStore() + return _store diff --git a/tests/infrastructure/test_router_history.py b/tests/infrastructure/test_router_history.py new file mode 100644 index 0000000..b6bf093 --- /dev/null +++ b/tests/infrastructure/test_router_history.py @@ -0,0 +1,149 @@ +"""Tests for provider health history store and API endpoint.""" + +import time +from datetime import UTC, datetime, timedelta +from unittest.mock import MagicMock + +import pytest +from src.infrastructure.router.history import HealthHistoryStore + + +@pytest.fixture +def store(): + """In-memory history store for testing.""" + s = HealthHistoryStore(db_path=":memory:") + yield s + s.close() + + +@pytest.fixture +def sample_providers(): + return [ + { + "name": "anthropic", + "status": "healthy", + "error_rate": 0.01, + "avg_latency_ms": 250.5, + "circuit_state": "closed", + "total_requests": 100, + }, + { + "name": "local", + "status": "degraded", + "error_rate": 0.15, + "avg_latency_ms": 80.0, + "circuit_state": "closed", + "total_requests": 50, + }, + ] + + +def test_record_and_retrieve(store, sample_providers): + store.record_snapshot(sample_providers) + history = store.get_history(hours=1) + assert len(history) == 1 + assert len(history[0]["providers"]) == 2 + assert history[0]["providers"][0]["name"] == "anthropic" + assert history[0]["providers"][1]["name"] == "local" + assert "timestamp" in history[0] + + +def test_multiple_snapshots(store, sample_providers): + store.record_snapshot(sample_providers) + time.sleep(0.01) + store.record_snapshot(sample_providers) + history = store.get_history(hours=1) + assert len(history) == 2 + + +def test_hours_filtering(store, sample_providers): + old_ts = (datetime.now(UTC) - timedelta(hours=48)).isoformat() + store._conn.execute( + """INSERT INTO snapshots + (timestamp, provider_name, status, error_rate, + avg_latency_ms, circuit_state, total_requests) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + (old_ts, "anthropic", "healthy", 0.0, 100.0, "closed", 10), + ) + store._conn.commit() + store.record_snapshot(sample_providers) + + history = store.get_history(hours=24) + assert len(history) == 1 + + history = store.get_history(hours=72) + assert len(history) == 2 + + +def test_prune(store, sample_providers): + old_ts = (datetime.now(UTC) - timedelta(hours=200)).isoformat() + store._conn.execute( + """INSERT INTO snapshots + (timestamp, provider_name, status, error_rate, + avg_latency_ms, circuit_state, total_requests) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + (old_ts, "anthropic", "healthy", 0.0, 100.0, "closed", 10), + ) + store._conn.commit() + store.record_snapshot(sample_providers) + + deleted = store.prune(keep_hours=168) + assert deleted == 1 + history = store.get_history(hours=999) + assert len(history) == 1 + + +def test_empty_history(store): + assert store.get_history(hours=24) == [] + + +def test_capture_snapshot_from_router(store): + mock_metrics = MagicMock() + mock_metrics.error_rate = 0.05 + mock_metrics.avg_latency_ms = 200.0 + mock_metrics.total_requests = 42 + + mock_provider = MagicMock() + mock_provider.name = "test-provider" + mock_provider.status.value = "healthy" + mock_provider.metrics = mock_metrics + mock_provider.circuit_state.value = "closed" + + mock_router = MagicMock() + mock_router.providers = [mock_provider] + + store._capture_snapshot(mock_router) + history = store.get_history(hours=1) + assert len(history) == 1 + p = history[0]["providers"][0] + assert p["name"] == "test-provider" + assert p["status"] == "healthy" + assert p["error_rate"] == 0.05 + assert p["total_requests"] == 42 + + +def test_history_api_endpoint(store, sample_providers): + """GET /api/v1/router/history returns snapshot data.""" + store.record_snapshot(sample_providers) + + from fastapi import FastAPI + from fastapi.testclient import TestClient + from src.infrastructure.router.api import get_cascade_router + from src.infrastructure.router.api import router as api_router + from src.infrastructure.router.history import get_history_store + + app = FastAPI() + app.include_router(api_router) + + app.dependency_overrides[get_history_store] = lambda: store + app.dependency_overrides[get_cascade_router] = lambda: MagicMock() + + client = TestClient(app) + resp = client.get("/api/v1/router/history?hours=1") + assert resp.status_code == 200 + data = resp.json() + assert len(data) == 1 + assert len(data[0]["providers"]) == 2 + assert data[0]["providers"][0]["name"] == "anthropic" + + app.dependency_overrides.clear()