1
0

[loop-cycle-1658] feat: provider health history endpoint (#457) (#611)

This commit is contained in:
2026-03-20 16:09:20 -04:00
parent 4ba8d25749
commit f3b3d1e648
4 changed files with 316 additions and 0 deletions

View File

@@ -2,6 +2,7 @@
from .api import router
from .cascade import CascadeRouter, Provider, ProviderStatus, get_router
from .history import HealthHistoryStore, get_history_store
__all__ = [
"CascadeRouter",
@@ -9,4 +10,6 @@ __all__ = [
"ProviderStatus",
"get_router",
"router",
"HealthHistoryStore",
"get_history_store",
]

View File

@@ -8,6 +8,7 @@ from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from .cascade import CascadeRouter, get_router
from .history import HealthHistoryStore, get_history_store
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/router", tags=["router"])
@@ -199,6 +200,17 @@ async def reload_config(
raise HTTPException(status_code=500, detail=f"Reload failed: {exc}") from exc
@router.get("/history")
async def get_history(
hours: int = 24,
store: Annotated[HealthHistoryStore, Depends(get_history_store)] = None,
) -> list[dict[str, Any]]:
"""Get provider health history for the last N hours."""
if store is None:
store = get_history_store()
return store.get_history(hours=hours)
@router.get("/config")
async def get_config(
cascade: Annotated[CascadeRouter, Depends(get_cascade_router)],

View File

@@ -0,0 +1,152 @@
"""Provider health history — time-series snapshots for dashboard visualization."""
import asyncio
import logging
import sqlite3
from datetime import UTC, datetime, timedelta
from pathlib import Path
logger = logging.getLogger(__name__)
_store: "HealthHistoryStore | None" = None
class HealthHistoryStore:
"""Stores timestamped provider health snapshots in SQLite."""
def __init__(self, db_path: str = "data/router_history.db") -> None:
self.db_path = db_path
if db_path != ":memory:":
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(db_path, check_same_thread=False)
self._conn.row_factory = sqlite3.Row
self._init_schema()
self._bg_task: asyncio.Task | None = None
def _init_schema(self) -> None:
self._conn.execute("""
CREATE TABLE IF NOT EXISTS snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
provider_name TEXT NOT NULL,
status TEXT NOT NULL,
error_rate REAL NOT NULL,
avg_latency_ms REAL NOT NULL,
circuit_state TEXT NOT NULL,
total_requests INTEGER NOT NULL
)
""")
self._conn.execute("""
CREATE INDEX IF NOT EXISTS idx_snapshots_ts
ON snapshots(timestamp)
""")
self._conn.commit()
def record_snapshot(self, providers: list[dict]) -> None:
"""Record a health snapshot for all providers."""
ts = datetime.now(UTC).isoformat()
rows = [
(
ts,
p["name"],
p["status"],
p["error_rate"],
p["avg_latency_ms"],
p["circuit_state"],
p["total_requests"],
)
for p in providers
]
self._conn.executemany(
"""INSERT INTO snapshots
(timestamp, provider_name, status, error_rate,
avg_latency_ms, circuit_state, total_requests)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
rows,
)
self._conn.commit()
def get_history(self, hours: int = 24) -> list[dict]:
"""Return snapshots from the last N hours, grouped by timestamp."""
cutoff = (datetime.now(UTC) - timedelta(hours=hours)).isoformat()
rows = self._conn.execute(
"""SELECT timestamp, provider_name, status, error_rate,
avg_latency_ms, circuit_state, total_requests
FROM snapshots WHERE timestamp >= ? ORDER BY timestamp""",
(cutoff,),
).fetchall()
# Group by timestamp
snapshots: dict[str, list[dict]] = {}
for row in rows:
ts = row["timestamp"]
if ts not in snapshots:
snapshots[ts] = []
snapshots[ts].append(
{
"name": row["provider_name"],
"status": row["status"],
"error_rate": row["error_rate"],
"avg_latency_ms": row["avg_latency_ms"],
"circuit_state": row["circuit_state"],
"total_requests": row["total_requests"],
}
)
return [{"timestamp": ts, "providers": providers} for ts, providers in snapshots.items()]
def prune(self, keep_hours: int = 168) -> int:
"""Remove snapshots older than keep_hours. Returns rows deleted."""
cutoff = (datetime.now(UTC) - timedelta(hours=keep_hours)).isoformat()
cursor = self._conn.execute("DELETE FROM snapshots WHERE timestamp < ?", (cutoff,))
self._conn.commit()
return cursor.rowcount
def close(self) -> None:
"""Close the database connection."""
if self._bg_task and not self._bg_task.done():
self._bg_task.cancel()
self._conn.close()
def _capture_snapshot(self, cascade_router) -> None: # noqa: ANN001
"""Capture current provider state as a snapshot."""
providers = []
for p in cascade_router.providers:
providers.append(
{
"name": p.name,
"status": p.status.value,
"error_rate": round(p.metrics.error_rate, 4),
"avg_latency_ms": round(p.metrics.avg_latency_ms, 2),
"circuit_state": p.circuit_state.value,
"total_requests": p.metrics.total_requests,
}
)
self.record_snapshot(providers)
async def start_background_task(
self,
cascade_router,
interval_seconds: int = 60, # noqa: ANN001
) -> None:
"""Start periodic snapshot capture."""
async def _loop() -> None:
while True:
try:
self._capture_snapshot(cascade_router)
logger.debug("Recorded health snapshot")
except Exception:
logger.exception("Failed to record health snapshot")
await asyncio.sleep(interval_seconds)
self._bg_task = asyncio.create_task(_loop())
logger.info("Health history background task started (interval=%ds)", interval_seconds)
def get_history_store() -> HealthHistoryStore:
"""Get or create the singleton history store."""
global _store # noqa: PLW0603
if _store is None:
_store = HealthHistoryStore()
return _store