[kimi] Implement graceful shutdown and health checks (#1397) (#1457)
Some checks failed
Tests / lint (push) Failing after 7s
Tests / test (push) Has been skipped

This commit was merged in pull request #1457.
This commit is contained in:
2026-03-24 19:31:14 +00:00
parent 640d78742a
commit 8518db921e
3 changed files with 641 additions and 11 deletions

View File

@@ -6,6 +6,7 @@ for the Mission Control dashboard.
import asyncio
import logging
import os
import sqlite3
import time
from contextlib import closing
@@ -14,7 +15,7 @@ from pathlib import Path
from typing import Any
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse
from fastapi.responses import HTMLResponse, JSONResponse
from pydantic import BaseModel
from config import APP_START_TIME as _START_TIME
@@ -24,6 +25,47 @@ logger = logging.getLogger(__name__)
router = APIRouter(tags=["health"])
# Shutdown state tracking for graceful shutdown
_shutdown_requested = False
_shutdown_reason: str | None = None
_shutdown_start_time: float | None = None
# Default graceful shutdown timeout (seconds)
GRACEFUL_SHUTDOWN_TIMEOUT = float(os.getenv("GRACEFUL_SHUTDOWN_TIMEOUT", "30"))
def request_shutdown(reason: str = "unknown") -> None:
"""Signal that a graceful shutdown has been requested.
This is called by signal handlers to inform health checks
that the service is shutting down.
"""
global _shutdown_requested, _shutdown_reason, _shutdown_start_time # noqa: PLW0603
_shutdown_requested = True
_shutdown_reason = reason
_shutdown_start_time = time.monotonic()
logger.info("Shutdown requested: %s", reason)
def is_shutting_down() -> bool:
"""Check if the service is in the process of shutting down."""
return _shutdown_requested
def get_shutdown_info() -> dict[str, Any] | None:
"""Get information about the shutdown state, if active."""
if not _shutdown_requested:
return None
elapsed = None
if _shutdown_start_time:
elapsed = time.monotonic() - _shutdown_start_time
return {
"requested": _shutdown_requested,
"reason": _shutdown_reason,
"elapsed_seconds": elapsed,
"timeout_seconds": GRACEFUL_SHUTDOWN_TIMEOUT,
}
class DependencyStatus(BaseModel):
"""Status of a single dependency."""
@@ -52,6 +94,36 @@ class HealthStatus(BaseModel):
uptime_seconds: float
class DetailedHealthStatus(BaseModel):
"""Detailed health status with all service checks."""
status: str # "healthy", "degraded", "unhealthy"
timestamp: str
version: str
uptime_seconds: float
services: dict[str, dict[str, Any]]
system: dict[str, Any]
shutdown: dict[str, Any] | None = None
class ReadinessStatus(BaseModel):
"""Readiness probe response."""
ready: bool
timestamp: str
checks: dict[str, bool]
reason: str | None = None
class LivenessStatus(BaseModel):
"""Liveness probe response."""
alive: bool
timestamp: str
uptime_seconds: float
shutdown_requested: bool = False
# Simple uptime tracking
# Ollama health cache (30-second TTL)
@@ -326,3 +398,178 @@ async def health_snapshot():
},
"tokens": {"status": "unknown", "message": "Snapshot failed"},
}
# -----------------------------------------------------------------------------
# Production Health Check Endpoints (Readiness & Liveness Probes)
# -----------------------------------------------------------------------------
@router.get("/health/detailed")
async def health_detailed() -> JSONResponse:
"""Comprehensive health check with all service statuses.
Returns 200 if healthy, 503 if degraded/unhealthy.
Includes shutdown state for graceful shutdown awareness.
"""
uptime = (datetime.now(UTC) - _START_TIME).total_seconds()
# Check all services in parallel
ollama_dep, sqlite_dep = await asyncio.gather(
_check_ollama(),
asyncio.to_thread(_check_sqlite),
)
# Build service status map
services = {
"ollama": {
"status": ollama_dep.status,
"healthy": ollama_dep.status == "healthy",
"details": ollama_dep.details,
},
"sqlite": {
"status": sqlite_dep.status,
"healthy": sqlite_dep.status == "healthy",
"details": sqlite_dep.details,
},
}
# Determine overall status
all_healthy = all(s["healthy"] for s in services.values())
any_unhealthy = any(s["status"] == "unavailable" for s in services.values())
if all_healthy:
status = "healthy"
status_code = 200
elif any_unhealthy:
status = "unhealthy"
status_code = 503
else:
status = "degraded"
status_code = 503
# Add shutdown state if shutting down
shutdown_info = get_shutdown_info()
# System info
import psutil
try:
process = psutil.Process()
memory_info = process.memory_info()
system = {
"memory_mb": round(memory_info.rss / (1024 * 1024), 2),
"cpu_percent": process.cpu_percent(interval=0.1),
"threads": process.num_threads(),
}
except Exception as exc:
logger.debug("Could not get system info: %s", exc)
system = {"error": "unavailable"}
response_data = {
"status": status,
"timestamp": datetime.now(UTC).isoformat(),
"version": "2.0.0",
"uptime_seconds": uptime,
"services": services,
"system": system,
}
if shutdown_info:
response_data["shutdown"] = shutdown_info
# Force 503 if shutting down
status_code = 503
return JSONResponse(content=response_data, status_code=status_code)
@router.get("/ready")
async def readiness_probe() -> JSONResponse:
"""Readiness probe for Kubernetes/Docker.
Returns 200 when the service is ready to receive traffic.
Returns 503 during startup or shutdown.
"""
uptime = (datetime.now(UTC) - _START_TIME).total_seconds()
# Minimum uptime before ready (allow startup to complete)
MIN_READY_UPTIME = 5.0
checks = {
"startup_complete": uptime >= MIN_READY_UPTIME,
"database": False,
"not_shutting_down": not is_shutting_down(),
}
# Check database connectivity
try:
db_path = Path(settings.repo_root) / "data" / "timmy.db"
if db_path.exists():
with closing(sqlite3.connect(str(db_path))) as conn:
conn.execute("SELECT 1")
checks["database"] = True
except Exception as exc:
logger.debug("Readiness DB check failed: %s", exc)
ready = all(checks.values())
response_data = {
"ready": ready,
"timestamp": datetime.now(UTC).isoformat(),
"checks": checks,
}
if not ready and is_shutting_down():
response_data["reason"] = f"Service shutting down: {_shutdown_reason}"
status_code = 200 if ready else 503
return JSONResponse(content=response_data, status_code=status_code)
@router.get("/live")
async def liveness_probe() -> JSONResponse:
"""Liveness probe for Kubernetes/Docker.
Returns 200 if the service is alive and functioning.
Returns 503 if the service is deadlocked or should be restarted.
"""
uptime = (datetime.now(UTC) - _START_TIME).total_seconds()
# Basic liveness: we respond, so we're alive
alive = True
# If shutting down and past timeout, report not alive to force restart
if is_shutting_down() and _shutdown_start_time:
elapsed = time.monotonic() - _shutdown_start_time
if elapsed > GRACEFUL_SHUTDOWN_TIMEOUT:
alive = False
logger.warning("Liveness probe failed: shutdown timeout exceeded")
response_data = {
"alive": alive,
"timestamp": datetime.now(UTC).isoformat(),
"uptime_seconds": uptime,
"shutdown_requested": is_shutting_down(),
}
status_code = 200 if alive else 503
return JSONResponse(content=response_data, status_code=status_code)
@router.get("/health/shutdown", include_in_schema=False)
async def shutdown_status() -> JSONResponse:
"""Get shutdown status (internal/debug endpoint).
Returns shutdown state information for debugging graceful shutdown.
"""
shutdown_info = get_shutdown_info()
response_data = {
"shutting_down": is_shutting_down(),
"timestamp": datetime.now(UTC).isoformat(),
}
if shutdown_info:
response_data.update(shutdown_info)
return JSONResponse(content=response_data)

View File

@@ -2,6 +2,7 @@
import asyncio
import logging
import signal
from contextlib import asynccontextmanager
from pathlib import Path
@@ -19,6 +20,9 @@ from dashboard.schedulers import (
logger = logging.getLogger(__name__)
# Global event to signal shutdown request
_shutdown_event = asyncio.Event()
def _startup_init() -> None:
"""Validate config and enable event persistence."""
@@ -131,6 +135,65 @@ def _startup_pruning() -> None:
_check_vault_size()
def _setup_signal_handlers() -> None:
"""Setup signal handlers for graceful shutdown.
Handles SIGTERM (Docker stop, Kubernetes delete) and SIGINT (Ctrl+C)
by setting the shutdown event and notifying health checks.
Note: Signal handlers can only be registered in the main thread.
In test environments (running in separate threads), this is skipped.
"""
import threading
# Signal handlers can only be set in the main thread
if threading.current_thread() is not threading.main_thread():
logger.debug("Skipping signal handler setup: not in main thread")
return
loop = asyncio.get_running_loop()
def _signal_handler(sig: signal.Signals) -> None:
sig_name = sig.name if hasattr(sig, "name") else str(sig)
logger.info("Received signal %s, initiating graceful shutdown...", sig_name)
# Notify health module about shutdown
try:
from dashboard.routes.health import request_shutdown
request_shutdown(reason=f"signal:{sig_name}")
except Exception as exc:
logger.debug("Failed to set shutdown state: %s", exc)
# Set the shutdown event to unblock lifespan
_shutdown_event.set()
# Register handlers for common shutdown signals
for sig in (signal.SIGTERM, signal.SIGINT):
try:
loop.add_signal_handler(sig, lambda s=sig: _signal_handler(s))
logger.debug("Registered handler for %s", sig.name if hasattr(sig, "name") else sig)
except (NotImplementedError, ValueError) as exc:
# Windows or non-main thread - signal handlers not available
logger.debug("Could not register signal handler for %s: %s", sig, exc)
async def _wait_for_shutdown(timeout: float | None = None) -> bool:
"""Wait for shutdown signal or timeout.
Returns True if shutdown was requested, False if timeout expired.
"""
if timeout:
try:
await asyncio.wait_for(_shutdown_event.wait(), timeout=timeout)
return True
except TimeoutError:
return False
else:
await _shutdown_event.wait()
return True
async def _shutdown_cleanup(
bg_tasks: list[asyncio.Task],
workshop_heartbeat,
@@ -161,11 +224,25 @@ async def _shutdown_cleanup(
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager with non-blocking startup."""
"""Application lifespan manager with non-blocking startup and graceful shutdown.
Handles SIGTERM/SIGINT signals for graceful shutdown in container environments.
When a shutdown signal is received:
1. Health checks are notified (readiness returns 503)
2. Active requests are allowed to complete (with timeout)
3. Background tasks are cancelled
4. Cleanup operations run
"""
# Reset shutdown state for fresh start
_shutdown_event.clear()
_startup_init()
bg_tasks = _startup_background_tasks()
_startup_pruning()
# Setup signal handlers for graceful shutdown
_setup_signal_handlers()
# Start Workshop presence heartbeat with WS relay
from dashboard.routes.world import broadcast_world_state
from timmy.workshop_state import WorkshopHeartbeat
@@ -191,15 +268,35 @@ async def lifespan(app: FastAPI):
logger.debug("Failed to mark sovereignty session start")
logger.info("✓ Dashboard ready for requests")
logger.info(" Graceful shutdown enabled (SIGTERM/SIGINT)")
yield
await _shutdown_cleanup(bg_tasks, workshop_heartbeat)
# Generate and commit sovereignty session report
# Wait for shutdown signal or continue until cancelled
# The yield allows FastAPI to serve requests
try:
from timmy.sovereignty import generate_and_commit_report
yield
except asyncio.CancelledError:
# FastAPI cancelled the lifespan (normal during shutdown)
logger.debug("Lifespan cancelled, beginning cleanup...")
finally:
# Cleanup phase - this runs during shutdown
logger.info("Beginning graceful shutdown...")
await generate_and_commit_report()
except Exception as exc:
logger.warning("Sovereignty report generation failed at shutdown: %s", exc)
# Notify health checks that we're shutting down
try:
from dashboard.routes.health import request_shutdown
request_shutdown(reason="lifespan_cleanup")
except Exception as exc:
logger.debug("Failed to set shutdown state: %s", exc)
await _shutdown_cleanup(bg_tasks, workshop_heartbeat)
# Generate and commit sovereignty session report
try:
from timmy.sovereignty import generate_and_commit_report
await generate_and_commit_report()
except Exception as exc:
logger.warning("Sovereignty report generation failed at shutdown: %s", exc)
logger.info("✓ Graceful shutdown complete")

View File

@@ -15,13 +15,19 @@ import pytest
from dashboard.routes.health import (
DependencyStatus,
DetailedHealthStatus,
HealthStatus,
LivenessStatus,
ReadinessStatus,
SovereigntyReport,
_calculate_overall_score,
_check_lightning,
_check_ollama_sync,
_check_sqlite,
_generate_recommendations,
get_shutdown_info,
is_shutting_down,
request_shutdown,
)
# ---------------------------------------------------------------------------
@@ -497,3 +503,283 @@ class TestSnapshotEndpoint:
data = client.get("/health/snapshot").json()
assert data["overall_status"] == "unknown"
# -----------------------------------------------------------------------------
# Shutdown State Tests
# -----------------------------------------------------------------------------
class TestShutdownState:
"""Tests for shutdown state tracking."""
@pytest.fixture(autouse=True)
def _reset_shutdown_state(self):
"""Reset shutdown state before each test."""
import dashboard.routes.health as mod
mod._shutdown_requested = False
mod._shutdown_reason = None
mod._shutdown_start_time = None
yield
mod._shutdown_requested = False
mod._shutdown_reason = None
mod._shutdown_start_time = None
def test_is_shutting_down_initial(self):
assert is_shutting_down() is False
def test_request_shutdown_sets_state(self):
request_shutdown(reason="test")
assert is_shutting_down() is True
def test_get_shutdown_info_when_not_shutting_down(self):
info = get_shutdown_info()
assert info is None
def test_get_shutdown_info_when_shutting_down(self):
request_shutdown(reason="test_reason")
info = get_shutdown_info()
assert info is not None
assert info["requested"] is True
assert info["reason"] == "test_reason"
assert "elapsed_seconds" in info
assert "timeout_seconds" in info
# -----------------------------------------------------------------------------
# Detailed Health Endpoint Tests
# -----------------------------------------------------------------------------
class TestDetailedHealthEndpoint:
"""Tests for GET /health/detailed."""
def test_returns_200_when_healthy(self, client):
with patch(
"dashboard.routes.health._check_ollama_sync",
return_value=DependencyStatus(
name="Ollama AI", status="healthy", sovereignty_score=10, details={}
),
):
response = client.get("/health/detailed")
assert response.status_code == 200
data = response.json()
assert data["status"] in ["healthy", "degraded", "unhealthy"]
assert "timestamp" in data
assert "version" in data
assert "uptime_seconds" in data
assert "services" in data
assert "system" in data
def test_returns_503_when_service_unhealthy(self, client):
with patch(
"dashboard.routes.health._check_ollama_sync",
return_value=DependencyStatus(
name="Ollama AI",
status="unavailable",
sovereignty_score=10,
details={"error": "down"},
),
):
response = client.get("/health/detailed")
assert response.status_code == 503
data = response.json()
assert data["status"] == "unhealthy"
def test_includes_shutdown_info_when_shutting_down(self, client):
with patch(
"dashboard.routes.health._check_ollama_sync",
return_value=DependencyStatus(
name="Ollama AI", status="healthy", sovereignty_score=10, details={}
),
):
with patch("dashboard.routes.health.is_shutting_down", return_value=True):
with patch(
"dashboard.routes.health.get_shutdown_info",
return_value={
"requested": True,
"reason": "test",
"elapsed_seconds": 1.5,
"timeout_seconds": 30.0,
},
):
response = client.get("/health/detailed")
assert response.status_code == 503
data = response.json()
assert "shutdown" in data
assert data["shutdown"]["requested"] is True
def test_services_structure(self, client):
with patch(
"dashboard.routes.health._check_ollama_sync",
return_value=DependencyStatus(
name="Ollama AI", status="healthy", sovereignty_score=10, details={"model": "test"}
),
):
response = client.get("/health/detailed")
data = response.json()
assert "services" in data
assert "ollama" in data["services"]
assert "sqlite" in data["services"]
# Each service should have status, healthy flag, and details
for _svc_name, svc_data in data["services"].items():
assert "status" in svc_data
assert "healthy" in svc_data
assert isinstance(svc_data["healthy"], bool)
# -----------------------------------------------------------------------------
# Readiness Probe Tests
# -----------------------------------------------------------------------------
class TestReadinessProbe:
"""Tests for GET /ready."""
def test_returns_200_when_ready(self, client):
# Wait for startup to complete
response = client.get("/ready")
data = response.json()
# Should return either 200 (ready) or 503 (not ready)
assert response.status_code in [200, 503]
assert "ready" in data
assert isinstance(data["ready"], bool)
assert "timestamp" in data
assert "checks" in data
def test_checks_structure(self, client):
response = client.get("/ready")
data = response.json()
assert "checks" in data
checks = data["checks"]
# Core checks that should be present
assert "startup_complete" in checks
assert "database" in checks
assert "not_shutting_down" in checks
def test_not_ready_during_shutdown(self, client):
with patch("dashboard.routes.health.is_shutting_down", return_value=True):
with patch(
"dashboard.routes.health._shutdown_reason",
"test shutdown",
):
response = client.get("/ready")
assert response.status_code == 503
data = response.json()
assert data["ready"] is False
assert data["checks"]["not_shutting_down"] is False
assert "reason" in data
# -----------------------------------------------------------------------------
# Liveness Probe Tests
# -----------------------------------------------------------------------------
class TestLivenessProbe:
"""Tests for GET /live."""
def test_returns_200_when_alive(self, client):
response = client.get("/live")
assert response.status_code == 200
data = response.json()
assert data["alive"] is True
assert "timestamp" in data
assert "uptime_seconds" in data
assert "shutdown_requested" in data
def test_shutdown_requested_field(self, client):
with patch("dashboard.routes.health.is_shutting_down", return_value=False):
response = client.get("/live")
data = response.json()
assert data["shutdown_requested"] is False
def test_alive_false_after_shutdown_timeout(self, client):
import dashboard.routes.health as mod
with patch.object(mod, "_shutdown_requested", True):
with patch.object(mod, "_shutdown_start_time", time.monotonic() - 999):
with patch.object(mod, "GRACEFUL_SHUTDOWN_TIMEOUT", 30.0):
response = client.get("/live")
assert response.status_code == 503
data = response.json()
assert data["alive"] is False
assert data["shutdown_requested"] is True
# -----------------------------------------------------------------------------
# New Pydantic Model Tests
# -----------------------------------------------------------------------------
class TestDetailedHealthStatusModel:
"""Validate DetailedHealthStatus model."""
def test_fields(self):
hs = DetailedHealthStatus(
status="healthy",
timestamp="2026-01-01T00:00:00+00:00",
version="2.0.0",
uptime_seconds=42.5,
services={"db": {"status": "up", "healthy": True, "details": {}}},
system={"memory_mb": 100.5},
)
assert hs.status == "healthy"
assert hs.services["db"]["healthy"] is True
class TestReadinessStatusModel:
"""Validate ReadinessStatus model."""
def test_fields(self):
rs = ReadinessStatus(
ready=True,
timestamp="2026-01-01T00:00:00+00:00",
checks={"db": True, "cache": True},
)
assert rs.ready is True
assert rs.checks["db"] is True
def test_with_reason(self):
rs = ReadinessStatus(
ready=False,
timestamp="2026-01-01T00:00:00+00:00",
checks={"db": False},
reason="Database unavailable",
)
assert rs.ready is False
assert rs.reason == "Database unavailable"
class TestLivenessStatusModel:
"""Validate LivenessStatus model."""
def test_fields(self):
ls = LivenessStatus(
alive=True,
timestamp="2026-01-01T00:00:00+00:00",
uptime_seconds=3600.0,
shutdown_requested=False,
)
assert ls.alive is True
assert ls.uptime_seconds == 3600.0
assert ls.shutdown_requested is False
def test_defaults(self):
ls = LivenessStatus(
alive=True,
timestamp="2026-01-01T00:00:00+00:00",
uptime_seconds=0.0,
)
assert ls.shutdown_requested is False