From 8518db921e70cf217fab0e7c5c505f9fe5bc6352 Mon Sep 17 00:00:00 2001 From: Kimi Agent Date: Tue, 24 Mar 2026 19:31:14 +0000 Subject: [PATCH] [kimi] Implement graceful shutdown and health checks (#1397) (#1457) --- src/dashboard/routes/health.py | 249 +++++++++++++++++++++++++++- src/dashboard/startup.py | 117 ++++++++++++-- tests/dashboard/test_health.py | 286 +++++++++++++++++++++++++++++++++ 3 files changed, 641 insertions(+), 11 deletions(-) diff --git a/src/dashboard/routes/health.py b/src/dashboard/routes/health.py index 581e7057..5ec3040c 100644 --- a/src/dashboard/routes/health.py +++ b/src/dashboard/routes/health.py @@ -6,6 +6,7 @@ for the Mission Control dashboard. import asyncio import logging +import os import sqlite3 import time from contextlib import closing @@ -14,7 +15,7 @@ from pathlib import Path from typing import Any from fastapi import APIRouter, Request -from fastapi.responses import HTMLResponse +from fastapi.responses import HTMLResponse, JSONResponse from pydantic import BaseModel from config import APP_START_TIME as _START_TIME @@ -24,6 +25,47 @@ logger = logging.getLogger(__name__) router = APIRouter(tags=["health"]) +# Shutdown state tracking for graceful shutdown +_shutdown_requested = False +_shutdown_reason: str | None = None +_shutdown_start_time: float | None = None + +# Default graceful shutdown timeout (seconds) +GRACEFUL_SHUTDOWN_TIMEOUT = float(os.getenv("GRACEFUL_SHUTDOWN_TIMEOUT", "30")) + + +def request_shutdown(reason: str = "unknown") -> None: + """Signal that a graceful shutdown has been requested. + + This is called by signal handlers to inform health checks + that the service is shutting down. + """ + global _shutdown_requested, _shutdown_reason, _shutdown_start_time # noqa: PLW0603 + _shutdown_requested = True + _shutdown_reason = reason + _shutdown_start_time = time.monotonic() + logger.info("Shutdown requested: %s", reason) + + +def is_shutting_down() -> bool: + """Check if the service is in the process of shutting down.""" + return _shutdown_requested + + +def get_shutdown_info() -> dict[str, Any] | None: + """Get information about the shutdown state, if active.""" + if not _shutdown_requested: + return None + elapsed = None + if _shutdown_start_time: + elapsed = time.monotonic() - _shutdown_start_time + return { + "requested": _shutdown_requested, + "reason": _shutdown_reason, + "elapsed_seconds": elapsed, + "timeout_seconds": GRACEFUL_SHUTDOWN_TIMEOUT, + } + class DependencyStatus(BaseModel): """Status of a single dependency.""" @@ -52,6 +94,36 @@ class HealthStatus(BaseModel): uptime_seconds: float +class DetailedHealthStatus(BaseModel): + """Detailed health status with all service checks.""" + + status: str # "healthy", "degraded", "unhealthy" + timestamp: str + version: str + uptime_seconds: float + services: dict[str, dict[str, Any]] + system: dict[str, Any] + shutdown: dict[str, Any] | None = None + + +class ReadinessStatus(BaseModel): + """Readiness probe response.""" + + ready: bool + timestamp: str + checks: dict[str, bool] + reason: str | None = None + + +class LivenessStatus(BaseModel): + """Liveness probe response.""" + + alive: bool + timestamp: str + uptime_seconds: float + shutdown_requested: bool = False + + # Simple uptime tracking # Ollama health cache (30-second TTL) @@ -326,3 +398,178 @@ async def health_snapshot(): }, "tokens": {"status": "unknown", "message": "Snapshot failed"}, } + + +# ----------------------------------------------------------------------------- +# Production Health Check Endpoints (Readiness & Liveness Probes) +# ----------------------------------------------------------------------------- + + +@router.get("/health/detailed") +async def health_detailed() -> JSONResponse: + """Comprehensive health check with all service statuses. + + Returns 200 if healthy, 503 if degraded/unhealthy. + Includes shutdown state for graceful shutdown awareness. + """ + uptime = (datetime.now(UTC) - _START_TIME).total_seconds() + + # Check all services in parallel + ollama_dep, sqlite_dep = await asyncio.gather( + _check_ollama(), + asyncio.to_thread(_check_sqlite), + ) + + # Build service status map + services = { + "ollama": { + "status": ollama_dep.status, + "healthy": ollama_dep.status == "healthy", + "details": ollama_dep.details, + }, + "sqlite": { + "status": sqlite_dep.status, + "healthy": sqlite_dep.status == "healthy", + "details": sqlite_dep.details, + }, + } + + # Determine overall status + all_healthy = all(s["healthy"] for s in services.values()) + any_unhealthy = any(s["status"] == "unavailable" for s in services.values()) + + if all_healthy: + status = "healthy" + status_code = 200 + elif any_unhealthy: + status = "unhealthy" + status_code = 503 + else: + status = "degraded" + status_code = 503 + + # Add shutdown state if shutting down + shutdown_info = get_shutdown_info() + + # System info + import psutil + + try: + process = psutil.Process() + memory_info = process.memory_info() + system = { + "memory_mb": round(memory_info.rss / (1024 * 1024), 2), + "cpu_percent": process.cpu_percent(interval=0.1), + "threads": process.num_threads(), + } + except Exception as exc: + logger.debug("Could not get system info: %s", exc) + system = {"error": "unavailable"} + + response_data = { + "status": status, + "timestamp": datetime.now(UTC).isoformat(), + "version": "2.0.0", + "uptime_seconds": uptime, + "services": services, + "system": system, + } + + if shutdown_info: + response_data["shutdown"] = shutdown_info + # Force 503 if shutting down + status_code = 503 + + return JSONResponse(content=response_data, status_code=status_code) + + +@router.get("/ready") +async def readiness_probe() -> JSONResponse: + """Readiness probe for Kubernetes/Docker. + + Returns 200 when the service is ready to receive traffic. + Returns 503 during startup or shutdown. + """ + uptime = (datetime.now(UTC) - _START_TIME).total_seconds() + + # Minimum uptime before ready (allow startup to complete) + MIN_READY_UPTIME = 5.0 + + checks = { + "startup_complete": uptime >= MIN_READY_UPTIME, + "database": False, + "not_shutting_down": not is_shutting_down(), + } + + # Check database connectivity + try: + db_path = Path(settings.repo_root) / "data" / "timmy.db" + if db_path.exists(): + with closing(sqlite3.connect(str(db_path))) as conn: + conn.execute("SELECT 1") + checks["database"] = True + except Exception as exc: + logger.debug("Readiness DB check failed: %s", exc) + + ready = all(checks.values()) + + response_data = { + "ready": ready, + "timestamp": datetime.now(UTC).isoformat(), + "checks": checks, + } + + if not ready and is_shutting_down(): + response_data["reason"] = f"Service shutting down: {_shutdown_reason}" + + status_code = 200 if ready else 503 + return JSONResponse(content=response_data, status_code=status_code) + + +@router.get("/live") +async def liveness_probe() -> JSONResponse: + """Liveness probe for Kubernetes/Docker. + + Returns 200 if the service is alive and functioning. + Returns 503 if the service is deadlocked or should be restarted. + """ + uptime = (datetime.now(UTC) - _START_TIME).total_seconds() + + # Basic liveness: we respond, so we're alive + alive = True + + # If shutting down and past timeout, report not alive to force restart + if is_shutting_down() and _shutdown_start_time: + elapsed = time.monotonic() - _shutdown_start_time + if elapsed > GRACEFUL_SHUTDOWN_TIMEOUT: + alive = False + logger.warning("Liveness probe failed: shutdown timeout exceeded") + + response_data = { + "alive": alive, + "timestamp": datetime.now(UTC).isoformat(), + "uptime_seconds": uptime, + "shutdown_requested": is_shutting_down(), + } + + status_code = 200 if alive else 503 + return JSONResponse(content=response_data, status_code=status_code) + + +@router.get("/health/shutdown", include_in_schema=False) +async def shutdown_status() -> JSONResponse: + """Get shutdown status (internal/debug endpoint). + + Returns shutdown state information for debugging graceful shutdown. + """ + shutdown_info = get_shutdown_info() + + response_data = { + "shutting_down": is_shutting_down(), + "timestamp": datetime.now(UTC).isoformat(), + } + + if shutdown_info: + response_data.update(shutdown_info) + + return JSONResponse(content=response_data) diff --git a/src/dashboard/startup.py b/src/dashboard/startup.py index ecd8cac1..bb1d3c95 100644 --- a/src/dashboard/startup.py +++ b/src/dashboard/startup.py @@ -2,6 +2,7 @@ import asyncio import logging +import signal from contextlib import asynccontextmanager from pathlib import Path @@ -19,6 +20,9 @@ from dashboard.schedulers import ( logger = logging.getLogger(__name__) +# Global event to signal shutdown request +_shutdown_event = asyncio.Event() + def _startup_init() -> None: """Validate config and enable event persistence.""" @@ -131,6 +135,65 @@ def _startup_pruning() -> None: _check_vault_size() +def _setup_signal_handlers() -> None: + """Setup signal handlers for graceful shutdown. + + Handles SIGTERM (Docker stop, Kubernetes delete) and SIGINT (Ctrl+C) + by setting the shutdown event and notifying health checks. + + Note: Signal handlers can only be registered in the main thread. + In test environments (running in separate threads), this is skipped. + """ + import threading + + # Signal handlers can only be set in the main thread + if threading.current_thread() is not threading.main_thread(): + logger.debug("Skipping signal handler setup: not in main thread") + return + + loop = asyncio.get_running_loop() + + def _signal_handler(sig: signal.Signals) -> None: + sig_name = sig.name if hasattr(sig, "name") else str(sig) + logger.info("Received signal %s, initiating graceful shutdown...", sig_name) + + # Notify health module about shutdown + try: + from dashboard.routes.health import request_shutdown + + request_shutdown(reason=f"signal:{sig_name}") + except Exception as exc: + logger.debug("Failed to set shutdown state: %s", exc) + + # Set the shutdown event to unblock lifespan + _shutdown_event.set() + + # Register handlers for common shutdown signals + for sig in (signal.SIGTERM, signal.SIGINT): + try: + loop.add_signal_handler(sig, lambda s=sig: _signal_handler(s)) + logger.debug("Registered handler for %s", sig.name if hasattr(sig, "name") else sig) + except (NotImplementedError, ValueError) as exc: + # Windows or non-main thread - signal handlers not available + logger.debug("Could not register signal handler for %s: %s", sig, exc) + + +async def _wait_for_shutdown(timeout: float | None = None) -> bool: + """Wait for shutdown signal or timeout. + + Returns True if shutdown was requested, False if timeout expired. + """ + if timeout: + try: + await asyncio.wait_for(_shutdown_event.wait(), timeout=timeout) + return True + except TimeoutError: + return False + else: + await _shutdown_event.wait() + return True + + async def _shutdown_cleanup( bg_tasks: list[asyncio.Task], workshop_heartbeat, @@ -161,11 +224,25 @@ async def _shutdown_cleanup( @asynccontextmanager async def lifespan(app: FastAPI): - """Application lifespan manager with non-blocking startup.""" + """Application lifespan manager with non-blocking startup and graceful shutdown. + + Handles SIGTERM/SIGINT signals for graceful shutdown in container environments. + When a shutdown signal is received: + 1. Health checks are notified (readiness returns 503) + 2. Active requests are allowed to complete (with timeout) + 3. Background tasks are cancelled + 4. Cleanup operations run + """ + # Reset shutdown state for fresh start + _shutdown_event.clear() + _startup_init() bg_tasks = _startup_background_tasks() _startup_pruning() + # Setup signal handlers for graceful shutdown + _setup_signal_handlers() + # Start Workshop presence heartbeat with WS relay from dashboard.routes.world import broadcast_world_state from timmy.workshop_state import WorkshopHeartbeat @@ -191,15 +268,35 @@ async def lifespan(app: FastAPI): logger.debug("Failed to mark sovereignty session start") logger.info("✓ Dashboard ready for requests") + logger.info(" Graceful shutdown enabled (SIGTERM/SIGINT)") - yield - - await _shutdown_cleanup(bg_tasks, workshop_heartbeat) - - # Generate and commit sovereignty session report + # Wait for shutdown signal or continue until cancelled + # The yield allows FastAPI to serve requests try: - from timmy.sovereignty import generate_and_commit_report + yield + except asyncio.CancelledError: + # FastAPI cancelled the lifespan (normal during shutdown) + logger.debug("Lifespan cancelled, beginning cleanup...") + finally: + # Cleanup phase - this runs during shutdown + logger.info("Beginning graceful shutdown...") - await generate_and_commit_report() - except Exception as exc: - logger.warning("Sovereignty report generation failed at shutdown: %s", exc) + # Notify health checks that we're shutting down + try: + from dashboard.routes.health import request_shutdown + + request_shutdown(reason="lifespan_cleanup") + except Exception as exc: + logger.debug("Failed to set shutdown state: %s", exc) + + await _shutdown_cleanup(bg_tasks, workshop_heartbeat) + + # Generate and commit sovereignty session report + try: + from timmy.sovereignty import generate_and_commit_report + + await generate_and_commit_report() + except Exception as exc: + logger.warning("Sovereignty report generation failed at shutdown: %s", exc) + + logger.info("✓ Graceful shutdown complete") diff --git a/tests/dashboard/test_health.py b/tests/dashboard/test_health.py index 9f83f91e..c0761f9e 100644 --- a/tests/dashboard/test_health.py +++ b/tests/dashboard/test_health.py @@ -15,13 +15,19 @@ import pytest from dashboard.routes.health import ( DependencyStatus, + DetailedHealthStatus, HealthStatus, + LivenessStatus, + ReadinessStatus, SovereigntyReport, _calculate_overall_score, _check_lightning, _check_ollama_sync, _check_sqlite, _generate_recommendations, + get_shutdown_info, + is_shutting_down, + request_shutdown, ) # --------------------------------------------------------------------------- @@ -497,3 +503,283 @@ class TestSnapshotEndpoint: data = client.get("/health/snapshot").json() assert data["overall_status"] == "unknown" + + +# ----------------------------------------------------------------------------- +# Shutdown State Tests +# ----------------------------------------------------------------------------- + + +class TestShutdownState: + """Tests for shutdown state tracking.""" + + @pytest.fixture(autouse=True) + def _reset_shutdown_state(self): + """Reset shutdown state before each test.""" + import dashboard.routes.health as mod + + mod._shutdown_requested = False + mod._shutdown_reason = None + mod._shutdown_start_time = None + yield + mod._shutdown_requested = False + mod._shutdown_reason = None + mod._shutdown_start_time = None + + def test_is_shutting_down_initial(self): + assert is_shutting_down() is False + + def test_request_shutdown_sets_state(self): + request_shutdown(reason="test") + assert is_shutting_down() is True + + def test_get_shutdown_info_when_not_shutting_down(self): + info = get_shutdown_info() + assert info is None + + def test_get_shutdown_info_when_shutting_down(self): + request_shutdown(reason="test_reason") + info = get_shutdown_info() + assert info is not None + assert info["requested"] is True + assert info["reason"] == "test_reason" + assert "elapsed_seconds" in info + assert "timeout_seconds" in info + + +# ----------------------------------------------------------------------------- +# Detailed Health Endpoint Tests +# ----------------------------------------------------------------------------- + + +class TestDetailedHealthEndpoint: + """Tests for GET /health/detailed.""" + + def test_returns_200_when_healthy(self, client): + with patch( + "dashboard.routes.health._check_ollama_sync", + return_value=DependencyStatus( + name="Ollama AI", status="healthy", sovereignty_score=10, details={} + ), + ): + response = client.get("/health/detailed") + + assert response.status_code == 200 + data = response.json() + assert data["status"] in ["healthy", "degraded", "unhealthy"] + assert "timestamp" in data + assert "version" in data + assert "uptime_seconds" in data + assert "services" in data + assert "system" in data + + def test_returns_503_when_service_unhealthy(self, client): + with patch( + "dashboard.routes.health._check_ollama_sync", + return_value=DependencyStatus( + name="Ollama AI", + status="unavailable", + sovereignty_score=10, + details={"error": "down"}, + ), + ): + response = client.get("/health/detailed") + + assert response.status_code == 503 + data = response.json() + assert data["status"] == "unhealthy" + + def test_includes_shutdown_info_when_shutting_down(self, client): + with patch( + "dashboard.routes.health._check_ollama_sync", + return_value=DependencyStatus( + name="Ollama AI", status="healthy", sovereignty_score=10, details={} + ), + ): + with patch("dashboard.routes.health.is_shutting_down", return_value=True): + with patch( + "dashboard.routes.health.get_shutdown_info", + return_value={ + "requested": True, + "reason": "test", + "elapsed_seconds": 1.5, + "timeout_seconds": 30.0, + }, + ): + response = client.get("/health/detailed") + + assert response.status_code == 503 + data = response.json() + assert "shutdown" in data + assert data["shutdown"]["requested"] is True + + def test_services_structure(self, client): + with patch( + "dashboard.routes.health._check_ollama_sync", + return_value=DependencyStatus( + name="Ollama AI", status="healthy", sovereignty_score=10, details={"model": "test"} + ), + ): + response = client.get("/health/detailed") + + data = response.json() + assert "services" in data + assert "ollama" in data["services"] + assert "sqlite" in data["services"] + # Each service should have status, healthy flag, and details + for _svc_name, svc_data in data["services"].items(): + assert "status" in svc_data + assert "healthy" in svc_data + assert isinstance(svc_data["healthy"], bool) + + +# ----------------------------------------------------------------------------- +# Readiness Probe Tests +# ----------------------------------------------------------------------------- + + +class TestReadinessProbe: + """Tests for GET /ready.""" + + def test_returns_200_when_ready(self, client): + # Wait for startup to complete + response = client.get("/ready") + data = response.json() + + # Should return either 200 (ready) or 503 (not ready) + assert response.status_code in [200, 503] + assert "ready" in data + assert isinstance(data["ready"], bool) + assert "timestamp" in data + assert "checks" in data + + def test_checks_structure(self, client): + response = client.get("/ready") + data = response.json() + + assert "checks" in data + checks = data["checks"] + # Core checks that should be present + assert "startup_complete" in checks + assert "database" in checks + assert "not_shutting_down" in checks + + def test_not_ready_during_shutdown(self, client): + with patch("dashboard.routes.health.is_shutting_down", return_value=True): + with patch( + "dashboard.routes.health._shutdown_reason", + "test shutdown", + ): + response = client.get("/ready") + + assert response.status_code == 503 + data = response.json() + assert data["ready"] is False + assert data["checks"]["not_shutting_down"] is False + assert "reason" in data + + +# ----------------------------------------------------------------------------- +# Liveness Probe Tests +# ----------------------------------------------------------------------------- + + +class TestLivenessProbe: + """Tests for GET /live.""" + + def test_returns_200_when_alive(self, client): + response = client.get("/live") + + assert response.status_code == 200 + data = response.json() + assert data["alive"] is True + assert "timestamp" in data + assert "uptime_seconds" in data + assert "shutdown_requested" in data + + def test_shutdown_requested_field(self, client): + with patch("dashboard.routes.health.is_shutting_down", return_value=False): + response = client.get("/live") + + data = response.json() + assert data["shutdown_requested"] is False + + def test_alive_false_after_shutdown_timeout(self, client): + import dashboard.routes.health as mod + + with patch.object(mod, "_shutdown_requested", True): + with patch.object(mod, "_shutdown_start_time", time.monotonic() - 999): + with patch.object(mod, "GRACEFUL_SHUTDOWN_TIMEOUT", 30.0): + response = client.get("/live") + + assert response.status_code == 503 + data = response.json() + assert data["alive"] is False + assert data["shutdown_requested"] is True + + +# ----------------------------------------------------------------------------- +# New Pydantic Model Tests +# ----------------------------------------------------------------------------- + + +class TestDetailedHealthStatusModel: + """Validate DetailedHealthStatus model.""" + + def test_fields(self): + hs = DetailedHealthStatus( + status="healthy", + timestamp="2026-01-01T00:00:00+00:00", + version="2.0.0", + uptime_seconds=42.5, + services={"db": {"status": "up", "healthy": True, "details": {}}}, + system={"memory_mb": 100.5}, + ) + assert hs.status == "healthy" + assert hs.services["db"]["healthy"] is True + + +class TestReadinessStatusModel: + """Validate ReadinessStatus model.""" + + def test_fields(self): + rs = ReadinessStatus( + ready=True, + timestamp="2026-01-01T00:00:00+00:00", + checks={"db": True, "cache": True}, + ) + assert rs.ready is True + assert rs.checks["db"] is True + + def test_with_reason(self): + rs = ReadinessStatus( + ready=False, + timestamp="2026-01-01T00:00:00+00:00", + checks={"db": False}, + reason="Database unavailable", + ) + assert rs.ready is False + assert rs.reason == "Database unavailable" + + +class TestLivenessStatusModel: + """Validate LivenessStatus model.""" + + def test_fields(self): + ls = LivenessStatus( + alive=True, + timestamp="2026-01-01T00:00:00+00:00", + uptime_seconds=3600.0, + shutdown_requested=False, + ) + assert ls.alive is True + assert ls.uptime_seconds == 3600.0 + assert ls.shutdown_requested is False + + def test_defaults(self): + ls = LivenessStatus( + alive=True, + timestamp="2026-01-01T00:00:00+00:00", + uptime_seconds=0.0, + ) + assert ls.shutdown_requested is False