diff --git a/observatory.py b/observatory.py
new file mode 100644
index 000000000..f2868da4e
--- /dev/null
+++ b/observatory.py
@@ -0,0 +1,955 @@
+"""
+Observatory — Testbed Health Monitoring & Alerting for Hermes Agent
+
+Checks running services, system resources, and connectivity.
+Fires Telegram alerts when thresholds are breached.
+Posts daily digest reports.
+Stores 30 days of historical health data in SQLite.
+
+Usage:
+ python observatory.py --check # one-shot health check (stdout)
+ python observatory.py --daemon # continuous monitor (60s poll)
+ python observatory.py --digest # print / send daily digest
+ python observatory.py --history N # show last N health records
+ python observatory.py --slo # print SLO report
+
+Configuration (env vars, falls back to ~/.hermes/.env):
+ OBSERVATORY_ALERT_CHAT_ID Telegram chat ID for alerts
+ OBSERVATORY_DIGEST_CHAT_ID Telegram chat ID for daily digest (default: alert chat)
+ OBSERVATORY_POLL_INTERVAL Seconds between health polls (default: 60)
+ OBSERVATORY_DB_PATH SQLite path (default: ~/.hermes/observatory.db)
+ TELEGRAM_BOT_TOKEN Bot token used to send alerts
+
+ # Threshold overrides (all optional):
+ OBSERVATORY_DISK_WARN_PCT Disk usage warn threshold (default: 80)
+ OBSERVATORY_DISK_CRIT_PCT Disk usage critical threshold (default: 90)
+ OBSERVATORY_MEM_WARN_PCT Memory usage warn threshold (default: 80)
+ OBSERVATORY_MEM_CRIT_PCT Memory usage critical threshold (default: 90)
+ OBSERVATORY_CPU_WARN_PCT CPU usage warn threshold (default: 80)
+ OBSERVATORY_CPU_CRIT_PCT CPU usage critical threshold (default: 95)
+ OBSERVATORY_WEBHOOK_URL Webhook endpoint to probe (default: http://127.0.0.1:8080/health)
+ OBSERVATORY_API_URL API server health URL (default: http://127.0.0.1:8642/health)
+ OBSERVATORY_WEBHOOK_LATENCY_SLO_MS Webhook latency SLO ms (default: 2000)
+ OBSERVATORY_GATEWAY_UPTIME_SLO_PCT Gateway uptime SLO % (default: 99.5)
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import logging
+import os
+import signal
+import sqlite3
+import sys
+import time
+import urllib.request
+import urllib.error
+from contextlib import contextmanager
+from dataclasses import dataclass, field, asdict
+from datetime import datetime, timezone, timedelta
+from pathlib import Path
+from typing import Any, Dict, List, Optional, Tuple
+
+# ---------------------------------------------------------------------------
+# Optional imports
+# ---------------------------------------------------------------------------
+try:
+ import psutil
+ _PSUTIL = True
+except ImportError:
+ _PSUTIL = False
+
+try:
+ from dotenv import load_dotenv as _load_dotenv
+ _DOTENV = True
+except ImportError:
+ _DOTENV = False
+
+logger = logging.getLogger("observatory")
+
+# ---------------------------------------------------------------------------
+# Constants & SLO definitions
+# ---------------------------------------------------------------------------
+
+RETENTION_DAYS = 30
+
+SLO_DEFINITIONS = {
+ "gateway_uptime_pct": {
+ "description": "Gateway process uptime over the last 24 hours",
+ "target": 99.5,
+ "unit": "%",
+ },
+ "webhook_latency_ms": {
+ "description": "Webhook endpoint p95 response latency",
+ "target": 2000,
+ "unit": "ms",
+ "direction": "lower_is_better",
+ },
+ "api_server_latency_ms": {
+ "description": "API server /health p95 response latency",
+ "target": 2000,
+ "unit": "ms",
+ "direction": "lower_is_better",
+ },
+}
+
+# ---------------------------------------------------------------------------
+# Configuration
+# ---------------------------------------------------------------------------
+
+def _load_env() -> None:
+ """Load .env from HERMES_HOME if dotenv is available."""
+ if not _DOTENV:
+ return
+ hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
+ env_path = hermes_home / ".env"
+ if env_path.exists():
+ _load_dotenv(env_path, override=False)
+ # Project-level .env as dev fallback
+ project_env = Path(__file__).parent / ".env"
+ if project_env.exists():
+ _load_dotenv(project_env, override=False)
+
+
+@dataclass
+class ObservatoryConfig:
+ alert_chat_id: Optional[str] = None
+ digest_chat_id: Optional[str] = None
+ telegram_token: Optional[str] = None
+ poll_interval: int = 60
+ db_path: Path = field(default_factory=lambda: Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) / "observatory.db")
+ disk_warn_pct: float = 80.0
+ disk_crit_pct: float = 90.0
+ mem_warn_pct: float = 80.0
+ mem_crit_pct: float = 90.0
+ cpu_warn_pct: float = 80.0
+ cpu_crit_pct: float = 95.0
+ webhook_url: str = "http://127.0.0.1:8080/health"
+ api_url: str = "http://127.0.0.1:8642/health"
+ webhook_latency_slo_ms: float = 2000.0
+ gateway_uptime_slo_pct: float = 99.5
+
+ @classmethod
+ def from_env(cls) -> "ObservatoryConfig":
+ _load_env()
+ cfg = cls()
+ cfg.telegram_token = os.getenv("TELEGRAM_BOT_TOKEN")
+ cfg.alert_chat_id = os.getenv("OBSERVATORY_ALERT_CHAT_ID")
+ cfg.digest_chat_id = os.getenv("OBSERVATORY_DIGEST_CHAT_ID") or cfg.alert_chat_id
+ cfg.poll_interval = int(os.getenv("OBSERVATORY_POLL_INTERVAL", 60))
+ db_override = os.getenv("OBSERVATORY_DB_PATH")
+ if db_override:
+ cfg.db_path = Path(db_override)
+ cfg.disk_warn_pct = float(os.getenv("OBSERVATORY_DISK_WARN_PCT", 80))
+ cfg.disk_crit_pct = float(os.getenv("OBSERVATORY_DISK_CRIT_PCT", 90))
+ cfg.mem_warn_pct = float(os.getenv("OBSERVATORY_MEM_WARN_PCT", 80))
+ cfg.mem_crit_pct = float(os.getenv("OBSERVATORY_MEM_CRIT_PCT", 90))
+ cfg.cpu_warn_pct = float(os.getenv("OBSERVATORY_CPU_WARN_PCT", 80))
+ cfg.cpu_crit_pct = float(os.getenv("OBSERVATORY_CPU_CRIT_PCT", 95))
+ cfg.webhook_url = os.getenv("OBSERVATORY_WEBHOOK_URL", "http://127.0.0.1:8080/health")
+ cfg.api_url = os.getenv("OBSERVATORY_API_URL", "http://127.0.0.1:8642/health")
+ cfg.webhook_latency_slo_ms = float(os.getenv("OBSERVATORY_WEBHOOK_LATENCY_SLO_MS", 2000))
+ cfg.gateway_uptime_slo_pct = float(os.getenv("OBSERVATORY_GATEWAY_UPTIME_SLO_PCT", 99.5))
+ return cfg
+
+
+# ---------------------------------------------------------------------------
+# Health check models
+# ---------------------------------------------------------------------------
+
+@dataclass
+class CheckResult:
+ name: str
+ status: str # "ok" | "warn" | "critical" | "error"
+ message: str
+ value: Optional[float] = None
+ unit: Optional[str] = None
+ extra: Dict[str, Any] = field(default_factory=dict)
+
+
+@dataclass
+class HealthSnapshot:
+ ts: str # ISO8601 UTC
+ checks: List[CheckResult] = field(default_factory=list)
+
+ @property
+ def overall_status(self) -> str:
+ statuses = {c.status for c in self.checks}
+ if "critical" in statuses or "error" in statuses:
+ return "critical"
+ if "warn" in statuses:
+ return "warn"
+ return "ok"
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "ts": self.ts,
+ "overall": self.overall_status,
+ "checks": [asdict(c) for c in self.checks],
+ }
+
+
+# ---------------------------------------------------------------------------
+# Individual health checks
+# ---------------------------------------------------------------------------
+
+def check_gateway_liveness() -> CheckResult:
+ """Check whether the Hermes gateway process is running."""
+ try:
+ from gateway.status import is_gateway_running, get_running_pid
+ running = is_gateway_running()
+ pid = get_running_pid()
+ if running:
+ return CheckResult(
+ name="gateway_process",
+ status="ok",
+ message=f"Gateway running (pid={pid})",
+ value=float(pid) if pid else None,
+ )
+ return CheckResult(
+ name="gateway_process",
+ status="critical",
+ message="Gateway process is NOT running",
+ )
+ except Exception as exc:
+ return CheckResult(
+ name="gateway_process",
+ status="error",
+ message=f"Could not determine gateway status: {exc}",
+ )
+
+
+def check_api_server_http(cfg: ObservatoryConfig) -> CheckResult:
+ """Check API server /health endpoint responsiveness."""
+ url = cfg.api_url
+ start = time.monotonic()
+ try:
+ req = urllib.request.Request(url, method="GET")
+ req.add_header("User-Agent", "hermes-observatory/1.0")
+ with urllib.request.urlopen(req, timeout=10) as resp:
+ latency_ms = (time.monotonic() - start) * 1000
+ body = resp.read(512).decode("utf-8", errors="replace")
+ status_code = resp.status
+ if status_code < 400:
+ slo_ok = latency_ms <= cfg.webhook_latency_slo_ms
+ return CheckResult(
+ name="api_server_http",
+ status="ok" if slo_ok else "warn",
+ message=f"API server OK ({latency_ms:.0f}ms){'' if slo_ok else ' — exceeds latency SLO'}",
+ value=latency_ms,
+ unit="ms",
+ extra={"status_code": status_code, "body_preview": body[:100]},
+ )
+ return CheckResult(
+ name="api_server_http",
+ status="critical",
+ message=f"API server returned HTTP {status_code}",
+ value=latency_ms,
+ unit="ms",
+ )
+ except urllib.error.URLError as exc:
+ latency_ms = (time.monotonic() - start) * 1000
+ # Not running is acceptable if gateway is not configured for API
+ reason = str(exc.reason) if hasattr(exc, "reason") else str(exc)
+ if "Connection refused" in reason or "Connection reset" in reason:
+ return CheckResult(
+ name="api_server_http",
+ status="warn",
+ message=f"API server not reachable at {url} (not started?)",
+ value=latency_ms,
+ unit="ms",
+ )
+ return CheckResult(
+ name="api_server_http",
+ status="error",
+ message=f"API server probe error: {exc}",
+ value=latency_ms,
+ unit="ms",
+ )
+ except Exception as exc:
+ latency_ms = (time.monotonic() - start) * 1000
+ return CheckResult(
+ name="api_server_http",
+ status="error",
+ message=f"API server probe exception: {exc}",
+ value=latency_ms,
+ unit="ms",
+ )
+
+
+def check_webhook_http(cfg: ObservatoryConfig) -> CheckResult:
+ """Check webhook endpoint responsiveness."""
+ url = cfg.webhook_url
+ start = time.monotonic()
+ try:
+ req = urllib.request.Request(url, method="GET")
+ req.add_header("User-Agent", "hermes-observatory/1.0")
+ with urllib.request.urlopen(req, timeout=10) as resp:
+ latency_ms = (time.monotonic() - start) * 1000
+ status_code = resp.status
+ slo_ok = latency_ms <= cfg.webhook_latency_slo_ms
+ if status_code < 400:
+ return CheckResult(
+ name="webhook_http",
+ status="ok" if slo_ok else "warn",
+ message=f"Webhook OK ({latency_ms:.0f}ms){'' if slo_ok else ' — exceeds latency SLO'}",
+ value=latency_ms,
+ unit="ms",
+ extra={"status_code": status_code},
+ )
+ return CheckResult(
+ name="webhook_http",
+ status="critical",
+ message=f"Webhook returned HTTP {status_code}",
+ value=latency_ms,
+ unit="ms",
+ )
+ except urllib.error.URLError as exc:
+ latency_ms = (time.monotonic() - start) * 1000
+ reason = str(exc.reason) if hasattr(exc, "reason") else str(exc)
+ if "Connection refused" in reason or "Connection reset" in reason:
+ return CheckResult(
+ name="webhook_http",
+ status="warn",
+ message=f"Webhook not reachable at {url} (not started?)",
+ value=latency_ms,
+ unit="ms",
+ )
+ return CheckResult(
+ name="webhook_http",
+ status="error",
+ message=f"Webhook probe error: {exc}",
+ value=latency_ms,
+ unit="ms",
+ )
+ except Exception as exc:
+ latency_ms = (time.monotonic() - start) * 1000
+ return CheckResult(
+ name="webhook_http",
+ status="error",
+ message=f"Webhook probe exception: {exc}",
+ value=latency_ms,
+ unit="ms",
+ )
+
+
+def check_disk(cfg: ObservatoryConfig) -> CheckResult:
+ """Check disk usage on the HERMES_HOME filesystem."""
+ if not _PSUTIL:
+ return CheckResult(name="disk", status="error", message="psutil not installed")
+ try:
+ hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
+ path = str(hermes_home) if hermes_home.exists() else "/"
+ usage = psutil.disk_usage(path)
+ pct = usage.percent
+ free_gb = usage.free / (1024 ** 3)
+ if pct >= cfg.disk_crit_pct:
+ status = "critical"
+ elif pct >= cfg.disk_warn_pct:
+ status = "warn"
+ else:
+ status = "ok"
+ return CheckResult(
+ name="disk",
+ status=status,
+ message=f"Disk {pct:.1f}% used ({free_gb:.1f}GB free)",
+ value=pct,
+ unit="%",
+ extra={"free_bytes": usage.free, "total_bytes": usage.total},
+ )
+ except Exception as exc:
+ return CheckResult(name="disk", status="error", message=f"Disk check error: {exc}")
+
+
+def check_memory(cfg: ObservatoryConfig) -> CheckResult:
+ """Check system memory usage."""
+ if not _PSUTIL:
+ return CheckResult(name="memory", status="error", message="psutil not installed")
+ try:
+ mem = psutil.virtual_memory()
+ pct = mem.percent
+ available_gb = mem.available / (1024 ** 3)
+ if pct >= cfg.mem_crit_pct:
+ status = "critical"
+ elif pct >= cfg.mem_warn_pct:
+ status = "warn"
+ else:
+ status = "ok"
+ return CheckResult(
+ name="memory",
+ status=status,
+ message=f"Memory {pct:.1f}% used ({available_gb:.1f}GB available)",
+ value=pct,
+ unit="%",
+ extra={"available_bytes": mem.available, "total_bytes": mem.total},
+ )
+ except Exception as exc:
+ return CheckResult(name="memory", status="error", message=f"Memory check error: {exc}")
+
+
+def check_cpu(cfg: ObservatoryConfig) -> CheckResult:
+ """Check CPU usage (1-second sample)."""
+ if not _PSUTIL:
+ return CheckResult(name="cpu", status="error", message="psutil not installed")
+ try:
+ pct = psutil.cpu_percent(interval=1)
+ if pct >= cfg.cpu_crit_pct:
+ status = "critical"
+ elif pct >= cfg.cpu_warn_pct:
+ status = "warn"
+ else:
+ status = "ok"
+ return CheckResult(
+ name="cpu",
+ status=status,
+ message=f"CPU {pct:.1f}%",
+ value=pct,
+ unit="%",
+ )
+ except Exception as exc:
+ return CheckResult(name="cpu", status="error", message=f"CPU check error: {exc}")
+
+
+def check_database(cfg: ObservatoryConfig) -> CheckResult:
+ """Check observatory SQLite DB connectivity and size."""
+ db_path = cfg.db_path
+ try:
+ if not db_path.exists():
+ return CheckResult(
+ name="database",
+ status="warn",
+ message=f"Observatory DB not yet created at {db_path}",
+ )
+ size_kb = db_path.stat().st_size / 1024
+ conn = sqlite3.connect(str(db_path), timeout=5)
+ conn.execute("SELECT count(*) FROM health_snapshots").fetchone()
+ conn.close()
+ return CheckResult(
+ name="database",
+ status="ok",
+ message=f"Observatory DB OK ({size_kb:.1f}KB)",
+ value=size_kb,
+ unit="KB",
+ extra={"path": str(db_path)},
+ )
+ except Exception as exc:
+ return CheckResult(
+ name="database",
+ status="error",
+ message=f"DB check error: {exc}",
+ )
+
+
+def check_response_store_db() -> CheckResult:
+ """Check the API server's SQLite response store DB if it exists."""
+ try:
+ hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
+ db_path = hermes_home / "response_store.db"
+ if not db_path.exists():
+ return CheckResult(
+ name="response_store_db",
+ status="ok",
+ message="Response store DB not present (API server not yet used)",
+ )
+ size_kb = db_path.stat().st_size / 1024
+ conn = sqlite3.connect(str(db_path), timeout=5)
+ count = conn.execute("SELECT count(*) FROM responses").fetchone()[0]
+ conn.close()
+ return CheckResult(
+ name="response_store_db",
+ status="ok",
+ message=f"Response store DB OK ({count} responses, {size_kb:.1f}KB)",
+ value=size_kb,
+ unit="KB",
+ )
+ except Exception as exc:
+ return CheckResult(
+ name="response_store_db",
+ status="error",
+ message=f"Response store DB error: {exc}",
+ )
+
+
+# ---------------------------------------------------------------------------
+# Snapshot collector
+# ---------------------------------------------------------------------------
+
+def collect_snapshot(cfg: ObservatoryConfig) -> HealthSnapshot:
+ """Run all checks and return a HealthSnapshot."""
+ ts = datetime.now(timezone.utc).isoformat()
+ checks = [
+ check_gateway_liveness(),
+ check_api_server_http(cfg),
+ check_webhook_http(cfg),
+ check_disk(cfg),
+ check_memory(cfg),
+ check_cpu(cfg),
+ check_database(cfg),
+ check_response_store_db(),
+ ]
+ return HealthSnapshot(ts=ts, checks=checks)
+
+
+# ---------------------------------------------------------------------------
+# SQLite persistence
+# ---------------------------------------------------------------------------
+
+@contextmanager
+def _db(path: Path):
+ path.parent.mkdir(parents=True, exist_ok=True)
+ conn = sqlite3.connect(str(path), timeout=10)
+ conn.execute("PRAGMA journal_mode=WAL")
+ conn.execute("PRAGMA foreign_keys=ON")
+ try:
+ yield conn
+ conn.commit()
+ finally:
+ conn.close()
+
+
+def _init_db(path: Path) -> None:
+ """Create tables if they don't exist."""
+ with _db(path) as conn:
+ conn.execute("""
+ CREATE TABLE IF NOT EXISTS health_snapshots (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ ts TEXT NOT NULL,
+ overall TEXT NOT NULL,
+ payload TEXT NOT NULL
+ )
+ """)
+ conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_ts ON health_snapshots(ts)")
+ conn.execute("""
+ CREATE TABLE IF NOT EXISTS alerts_sent (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ ts TEXT NOT NULL,
+ check_name TEXT NOT NULL,
+ status TEXT NOT NULL,
+ message TEXT NOT NULL
+ )
+ """)
+ conn.execute("CREATE INDEX IF NOT EXISTS idx_alerts_ts ON alerts_sent(ts)")
+
+
+def store_snapshot(cfg: ObservatoryConfig, snapshot: HealthSnapshot) -> None:
+ """Persist snapshot to SQLite."""
+ _init_db(cfg.db_path)
+ payload = json.dumps(snapshot.to_dict())
+ with _db(cfg.db_path) as conn:
+ conn.execute(
+ "INSERT INTO health_snapshots (ts, overall, payload) VALUES (?, ?, ?)",
+ (snapshot.ts, snapshot.overall_status, payload),
+ )
+ # Prune records older than RETENTION_DAYS
+ cutoff = (datetime.now(timezone.utc) - timedelta(days=RETENTION_DAYS)).isoformat()
+ conn.execute("DELETE FROM health_snapshots WHERE ts < ?", (cutoff,))
+
+
+def record_alert_sent(cfg: ObservatoryConfig, check_name: str, status: str, message: str) -> None:
+ """Record that an alert was dispatched."""
+ _init_db(cfg.db_path)
+ with _db(cfg.db_path) as conn:
+ conn.execute(
+ "INSERT INTO alerts_sent (ts, check_name, status, message) VALUES (?, ?, ?, ?)",
+ (datetime.now(timezone.utc).isoformat(), check_name, status, message),
+ )
+
+
+def load_snapshots(cfg: ObservatoryConfig, days: int = RETENTION_DAYS) -> List[Dict[str, Any]]:
+ """Load snapshots from the last N days."""
+ if not cfg.db_path.exists():
+ return []
+ cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
+ with _db(cfg.db_path) as conn:
+ rows = conn.execute(
+ "SELECT ts, overall, payload FROM health_snapshots WHERE ts >= ? ORDER BY ts DESC",
+ (cutoff,),
+ ).fetchall()
+ return [json.loads(row[2]) for row in rows]
+
+
+# ---------------------------------------------------------------------------
+# Alerting
+# ---------------------------------------------------------------------------
+
+def _telegram_send(token: str, chat_id: str, text: str) -> bool:
+ """Send a Telegram message via the Bot API. Returns True on success."""
+ url = f"https://api.telegram.org/bot{token}/sendMessage"
+ payload = json.dumps({
+ "chat_id": chat_id,
+ "text": text,
+ "parse_mode": "HTML",
+ "disable_web_page_preview": True,
+ }).encode("utf-8")
+ req = urllib.request.Request(url, data=payload, method="POST")
+ req.add_header("Content-Type", "application/json")
+ req.add_header("User-Agent", "hermes-observatory/1.0")
+ try:
+ with urllib.request.urlopen(req, timeout=15) as resp:
+ body = json.loads(resp.read())
+ return bool(body.get("ok"))
+ except Exception as exc:
+ logger.warning("Telegram send failed: %s", exc)
+ return False
+
+
+def _status_emoji(status: str) -> str:
+ return {"ok": "✅", "warn": "⚠️", "critical": "🔴", "error": "❌"}.get(status, "❓")
+
+
+def maybe_alert(cfg: ObservatoryConfig, snapshot: HealthSnapshot, prev_snapshot: Optional[HealthSnapshot]) -> List[str]:
+ """
+ Fire Telegram alerts for newly degraded checks.
+ Returns list of alert messages sent.
+ """
+ if not cfg.telegram_token or not cfg.alert_chat_id:
+ return []
+
+ alerts_sent = []
+ prev_statuses: Dict[str, str] = {}
+ if prev_snapshot:
+ for c in prev_snapshot.checks:
+ prev_statuses[c.name] = c.status
+
+ for check in snapshot.checks:
+ if check.status in ("critical", "error"):
+ prev = prev_statuses.get(check.name, "ok")
+ if prev not in ("critical", "error"):
+ # Newly degraded — alert
+ emoji = _status_emoji(check.status)
+ msg = (
+ f"{emoji} Hermes Observatory Alert\n\n"
+ f"Check: {check.name}\n"
+ f"Status: {check.status.upper()}\n"
+ f"Message: {check.message}\n"
+ f"Time: {snapshot.ts}"
+ )
+ if _telegram_send(cfg.telegram_token, cfg.alert_chat_id, msg):
+ alerts_sent.append(msg)
+ record_alert_sent(cfg, check.name, check.status, check.message)
+ logger.info("Alert sent for %s (%s)", check.name, check.status)
+ elif check.status == "ok":
+ prev = prev_statuses.get(check.name)
+ if prev in ("critical", "error"):
+ # Recovery alert
+ msg = (
+ f"✅ Hermes Observatory — Recovery\n\n"
+ f"Check: {check.name} has recovered\n"
+ f"Message: {check.message}\n"
+ f"Time: {snapshot.ts}"
+ )
+ if _telegram_send(cfg.telegram_token, cfg.alert_chat_id, msg):
+ alerts_sent.append(msg)
+ record_alert_sent(cfg, check.name, "recovery", check.message)
+
+ return alerts_sent
+
+
+# ---------------------------------------------------------------------------
+# Daily digest
+# ---------------------------------------------------------------------------
+
+def build_digest(cfg: ObservatoryConfig) -> str:
+ """Build a daily health digest from stored snapshots."""
+ snapshots = load_snapshots(cfg, days=1)
+ total = len(snapshots)
+ if total == 0:
+ return "No health data available for the last 24 hours."
+
+ # Count by overall status
+ status_counts: Dict[str, int] = {"ok": 0, "warn": 0, "critical": 0, "error": 0}
+ check_degraded_counts: Dict[str, int] = {}
+ latencies: Dict[str, List[float]] = {}
+
+ for snap in snapshots:
+ overall = snap.get("overall", "ok")
+ status_counts[overall] = status_counts.get(overall, 0) + 1
+ for check in snap.get("checks", []):
+ name = check["name"]
+ status = check["status"]
+ if status in ("critical", "error", "warn"):
+ check_degraded_counts[name] = check_degraded_counts.get(name, 0) + 1
+ value = check.get("value")
+ unit = check.get("unit")
+ if value is not None and unit == "ms":
+ if name not in latencies:
+ latencies[name] = []
+ latencies[name].append(float(value))
+
+ uptime_pct = 100.0 * status_counts["ok"] / total if total else 0.0
+ now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
+
+ lines = [
+ f"📊 Hermes Observatory — Daily Digest",
+ f"Generated: {now}",
+ f"",
+ f"Last 24h Summary ({total} samples)",
+ f" Healthy: {status_counts['ok']} ({100*status_counts['ok']//total if total else 0}%)",
+ f" Warning: {status_counts.get('warn', 0)}",
+ f" Critical: {status_counts.get('critical', 0)}",
+ f" Error: {status_counts.get('error', 0)}",
+ f"",
+ ]
+
+ # SLO status
+ lines.append("SLO Status")
+ gw_uptime_target = cfg.gateway_uptime_slo_pct
+ gw_snapshots = [
+ s for s in snapshots
+ if any(c["name"] == "gateway_process" and c["status"] == "ok" for c in s.get("checks", []))
+ ]
+ gw_uptime = 100.0 * len(gw_snapshots) / total if total else 0.0
+ gw_ok = gw_uptime >= gw_uptime_target
+ lines.append(
+ f" {'✅' if gw_ok else '❌'} Gateway uptime: {gw_uptime:.1f}% (target: ≥{gw_uptime_target}%)"
+ )
+
+ wh_latency_target = cfg.webhook_latency_slo_ms
+ if "webhook_http" in latencies and latencies["webhook_http"]:
+ wh_vals = sorted(latencies["webhook_http"])
+ p95_idx = int(len(wh_vals) * 0.95)
+ p95 = wh_vals[min(p95_idx, len(wh_vals) - 1)]
+ wh_ok = p95 <= wh_latency_target
+ lines.append(
+ f" {'✅' if wh_ok else '❌'} Webhook p95 latency: {p95:.0f}ms (target: ≤{wh_latency_target:.0f}ms)"
+ )
+ else:
+ lines.append(f" ⚫ Webhook latency: no data")
+
+ if "api_server_http" in latencies and latencies["api_server_http"]:
+ api_vals = sorted(latencies["api_server_http"])
+ p95_idx = int(len(api_vals) * 0.95)
+ p95 = api_vals[min(p95_idx, len(api_vals) - 1)]
+ api_ok = p95 <= wh_latency_target
+ lines.append(
+ f" {'✅' if api_ok else '❌'} API server p95 latency: {p95:.0f}ms (target: ≤{wh_latency_target:.0f}ms)"
+ )
+
+ # Top degraded checks
+ if check_degraded_counts:
+ lines.append("")
+ lines.append("Degraded Checks (24h)")
+ for name, count in sorted(check_degraded_counts.items(), key=lambda x: -x[1]):
+ pct = 100 * count // total if total else 0
+ lines.append(f" • {name}: {count} incidents ({pct}%)")
+
+ lines.append("")
+ lines.append(f"Observatory DB: {cfg.db_path}")
+
+ return "\n".join(lines)
+
+
+def send_digest(cfg: ObservatoryConfig) -> bool:
+ """Build and send the daily digest to Telegram. Returns True on success."""
+ digest = build_digest(cfg)
+ if cfg.telegram_token and cfg.digest_chat_id:
+ return _telegram_send(cfg.telegram_token, cfg.digest_chat_id, digest)
+ return False
+
+
+# ---------------------------------------------------------------------------
+# Display helpers
+# ---------------------------------------------------------------------------
+
+_STATUS_COLORS = {
+ "ok": "\033[32m", # green
+ "warn": "\033[33m", # yellow
+ "critical": "\033[31m", # red
+ "error": "\033[91m", # bright red
+}
+_RESET = "\033[0m"
+
+
+def _color_status(status: str) -> str:
+ c = _STATUS_COLORS.get(status, "")
+ return f"{c}{status.upper()}{_RESET}"
+
+
+def print_snapshot(snapshot: HealthSnapshot) -> None:
+ overall_color = _STATUS_COLORS.get(snapshot.overall_status, "")
+ print(f"\n{'='*60}")
+ print(f" Hermes Observatory — {snapshot.ts}")
+ print(f" Overall: {overall_color}{snapshot.overall_status.upper()}{_RESET}")
+ print(f"{'='*60}")
+ for check in snapshot.checks:
+ emoji = _status_emoji(check.status)
+ val_str = f" [{check.value:.1f}{check.unit}]" if check.value is not None and check.unit else ""
+ print(f" {emoji} {check.name:<25} {_color_status(check.status):<15} {check.message}{val_str}")
+ print()
+
+
+def print_slo_report(cfg: ObservatoryConfig) -> None:
+ """Print current SLO definitions and targets."""
+ snapshots = load_snapshots(cfg, days=30)
+ total = len(snapshots)
+ print(f"\n{'='*60}")
+ print(" Hermes Observatory — SLO Report (last 30 days)")
+ print(f"{'='*60}")
+ for slo_key, slo in SLO_DEFINITIONS.items():
+ print(f"\n {slo['description']}")
+ print(f" Target: {slo['target']}{slo['unit']}")
+ if total == 0:
+ print(f" Status: no data")
+ continue
+ if slo_key == "gateway_uptime_pct":
+ ok_count = sum(
+ 1 for s in snapshots
+ if any(c["name"] == "gateway_process" and c["status"] == "ok"
+ for c in s.get("checks", []))
+ )
+ actual = 100.0 * ok_count / total
+ met = actual >= slo["target"]
+ print(f" Actual: {actual:.2f}% {'✅ MET' if met else '❌ MISSED'}")
+ elif slo_key in ("webhook_latency_ms", "api_server_http_latency_ms"):
+ check_name = "webhook_http" if "webhook" in slo_key else "api_server_http"
+ vals = [
+ float(c["value"])
+ for s in snapshots
+ for c in s.get("checks", [])
+ if c["name"] == check_name and c.get("value") is not None
+ ]
+ if vals:
+ vals.sort()
+ p95_idx = int(len(vals) * 0.95)
+ p95 = vals[min(p95_idx, len(vals) - 1)]
+ met = p95 <= slo["target"]
+ print(f" p95: {p95:.0f}ms {'✅ MET' if met else '❌ MISSED'}")
+ else:
+ print(f" Status: no latency data")
+ print()
+
+
+def print_history(cfg: ObservatoryConfig, count: int = 20) -> None:
+ """Print recent health records."""
+ snapshots = load_snapshots(cfg, days=RETENTION_DAYS)[:count]
+ if not snapshots:
+ print("No history available.")
+ return
+ print(f"\n{'='*60}")
+ print(f" Last {min(count, len(snapshots))} health records")
+ print(f"{'='*60}")
+ for snap in snapshots:
+ ts = snap.get("ts", "?")
+ overall = snap.get("overall", "?")
+ emoji = _status_emoji(overall)
+ degraded = [c["name"] for c in snap.get("checks", []) if c["status"] != "ok"]
+ degraded_str = f" — issues: {', '.join(degraded)}" if degraded else ""
+ print(f" {emoji} {ts} {overall.upper()}{degraded_str}")
+ print()
+
+
+# ---------------------------------------------------------------------------
+# Daemon mode
+# ---------------------------------------------------------------------------
+
+class Observatory:
+ """Continuous monitoring daemon."""
+
+ def __init__(self, cfg: ObservatoryConfig):
+ self.cfg = cfg
+ self._running = False
+ self._prev_snapshot: Optional[HealthSnapshot] = None
+
+ def _handle_signal(self, signum: int, frame: Any) -> None:
+ logger.info("Received signal %d, shutting down...", signum)
+ self._running = False
+
+ def run_once(self) -> HealthSnapshot:
+ snapshot = collect_snapshot(self.cfg)
+ store_snapshot(self.cfg, snapshot)
+ alerts = maybe_alert(self.cfg, snapshot, self._prev_snapshot)
+ if alerts:
+ logger.info("Sent %d alert(s)", len(alerts))
+ self._prev_snapshot = snapshot
+ return snapshot
+
+ def run(self) -> None:
+ _init_db(self.cfg.db_path)
+ logger.info(
+ "Observatory starting — poll_interval=%ds db=%s",
+ self.cfg.poll_interval,
+ self.cfg.db_path,
+ )
+ self._running = True
+ signal.signal(signal.SIGINT, self._handle_signal)
+ signal.signal(signal.SIGTERM, self._handle_signal)
+
+ while self._running:
+ try:
+ snapshot = self.run_once()
+ logger.info("Health check: %s", snapshot.overall_status)
+ except Exception as exc:
+ logger.error("Health check failed: %s", exc, exc_info=True)
+ if self._running:
+ time.sleep(self.cfg.poll_interval)
+
+ logger.info("Observatory stopped.")
+
+
+# ---------------------------------------------------------------------------
+# CLI entry point
+# ---------------------------------------------------------------------------
+
+def main(argv: Optional[List[str]] = None) -> int:
+ parser = argparse.ArgumentParser(
+ description="Hermes Observatory — health monitoring & alerting",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ )
+ parser.add_argument("--check", action="store_true", help="Run one health check and print results")
+ parser.add_argument("--daemon", action="store_true", help="Run as continuous monitoring daemon")
+ parser.add_argument("--digest", action="store_true", help="Print (and optionally send) daily digest")
+ parser.add_argument("--history", type=int, metavar="N", help="Show last N health records")
+ parser.add_argument("--slo", action="store_true", help="Print SLO report")
+ parser.add_argument("--send-digest", action="store_true", help="Send daily digest via Telegram")
+ parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging")
+
+ args = parser.parse_args(argv)
+
+ logging.basicConfig(
+ level=logging.DEBUG if args.verbose else logging.INFO,
+ format="%(asctime)s %(levelname)s [observatory] %(message)s",
+ )
+
+ cfg = ObservatoryConfig.from_env()
+ _init_db(cfg.db_path)
+
+ if args.check:
+ snapshot = collect_snapshot(cfg)
+ store_snapshot(cfg, snapshot)
+ print_snapshot(snapshot)
+ return 0 if snapshot.overall_status == "ok" else 1
+
+ if args.daemon:
+ obs = Observatory(cfg)
+ obs.run()
+ return 0
+
+ if args.digest or args.send_digest:
+ digest = build_digest(cfg)
+ print(digest)
+ if args.send_digest:
+ ok = send_digest(cfg)
+ if ok:
+ print("\n[Digest sent to Telegram]")
+ else:
+ print("\n[Telegram send skipped — token/chat_id not configured]")
+ return 0
+
+ if args.history is not None:
+ print_history(cfg, args.history)
+ return 0
+
+ if args.slo:
+ print_slo_report(cfg)
+ return 0
+
+ # Default: one-shot check
+ snapshot = collect_snapshot(cfg)
+ store_snapshot(cfg, snapshot)
+ print_snapshot(snapshot)
+ return 0 if snapshot.overall_status == "ok" else 1
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/pyproject.toml b/pyproject.toml
index 40abd50ff..ef57f5663 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -42,6 +42,7 @@ dependencies = [
modal = ["modal>=1.0.0,<2"]
daytona = ["daytona>=0.148.0,<1"]
dev = ["pytest>=9.0.2,<10", "pytest-asyncio>=1.3.0,<2", "pytest-xdist>=3.0,<4", "mcp>=1.2.0,<2"]
+observatory = ["psutil>=5.9.0,<7"]
messaging = ["python-telegram-bot>=22.6,<23", "discord.py[voice]>=2.7.1,<3", "aiohttp>=3.13.3,<4", "slack-bolt>=1.18.0,<2", "slack-sdk>=3.27.0,<4"]
cron = ["croniter>=6.0.0,<7"]
slack = ["slack-bolt>=1.18.0,<2", "slack-sdk>=3.27.0,<4"]
diff --git a/tests/test_observatory.py b/tests/test_observatory.py
new file mode 100644
index 000000000..1290bc720
--- /dev/null
+++ b/tests/test_observatory.py
@@ -0,0 +1,455 @@
+"""
+Tests for observatory.py — health monitoring & alerting.
+
+Refs #147
+"""
+
+from __future__ import annotations
+
+import json
+import os
+import sqlite3
+import sys
+import tempfile
+import time
+from pathlib import Path
+from typing import Any
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+PROJECT_ROOT = Path(__file__).parent.parent
+if str(PROJECT_ROOT) not in sys.path:
+ sys.path.insert(0, str(PROJECT_ROOT))
+
+import observatory as obs
+
+
+# ---------------------------------------------------------------------------
+# Fixtures
+# ---------------------------------------------------------------------------
+
+@pytest.fixture
+def cfg(tmp_path):
+ """Return an ObservatoryConfig pointing at a temp directory."""
+ cfg = obs.ObservatoryConfig()
+ cfg.db_path = tmp_path / "observatory.db"
+ cfg.alert_chat_id = "99999"
+ cfg.digest_chat_id = "99999"
+ cfg.telegram_token = "fake-token"
+ cfg.webhook_url = "http://127.0.0.1:19999/health" # port never bound
+ cfg.api_url = "http://127.0.0.1:19998/health"
+ return cfg
+
+
+# ---------------------------------------------------------------------------
+# Config tests
+# ---------------------------------------------------------------------------
+
+class TestObservatoryConfig:
+ def test_defaults(self):
+ c = obs.ObservatoryConfig()
+ assert c.disk_warn_pct == 80.0
+ assert c.disk_crit_pct == 90.0
+ assert c.mem_warn_pct == 80.0
+ assert c.mem_crit_pct == 90.0
+ assert c.cpu_warn_pct == 80.0
+ assert c.cpu_crit_pct == 95.0
+ assert c.poll_interval == 60
+ assert c.webhook_latency_slo_ms == 2000.0
+ assert c.gateway_uptime_slo_pct == 99.5
+
+ def test_from_env_overrides(self, monkeypatch):
+ monkeypatch.setenv("OBSERVATORY_DISK_WARN_PCT", "70")
+ monkeypatch.setenv("OBSERVATORY_POLL_INTERVAL", "30")
+ monkeypatch.setenv("OBSERVATORY_ALERT_CHAT_ID", "12345")
+ monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "tok123")
+ c = obs.ObservatoryConfig.from_env()
+ assert c.disk_warn_pct == 70.0
+ assert c.poll_interval == 30
+ assert c.alert_chat_id == "12345"
+ assert c.telegram_token == "tok123"
+
+ def test_digest_chat_falls_back_to_alert(self, monkeypatch):
+ monkeypatch.setenv("OBSERVATORY_ALERT_CHAT_ID", "abc")
+ monkeypatch.delenv("OBSERVATORY_DIGEST_CHAT_ID", raising=False)
+ c = obs.ObservatoryConfig.from_env()
+ assert c.digest_chat_id == "abc"
+
+
+# ---------------------------------------------------------------------------
+# CheckResult / HealthSnapshot tests
+# ---------------------------------------------------------------------------
+
+class TestHealthSnapshot:
+ def _make_snapshot(self, statuses):
+ checks = [obs.CheckResult(name=f"c{i}", status=s, message="") for i, s in enumerate(statuses)]
+ return obs.HealthSnapshot(ts="2026-01-01T00:00:00+00:00", checks=checks)
+
+ def test_overall_ok(self):
+ snap = self._make_snapshot(["ok", "ok"])
+ assert snap.overall_status == "ok"
+
+ def test_overall_warn(self):
+ snap = self._make_snapshot(["ok", "warn"])
+ assert snap.overall_status == "warn"
+
+ def test_overall_critical(self):
+ snap = self._make_snapshot(["ok", "warn", "critical"])
+ assert snap.overall_status == "critical"
+
+ def test_overall_error(self):
+ snap = self._make_snapshot(["ok", "error"])
+ assert snap.overall_status == "critical"
+
+ def test_to_dict(self):
+ snap = self._make_snapshot(["ok"])
+ d = snap.to_dict()
+ assert d["overall"] == "ok"
+ assert isinstance(d["checks"], list)
+ assert d["checks"][0]["name"] == "c0"
+
+
+# ---------------------------------------------------------------------------
+# Individual check tests
+# ---------------------------------------------------------------------------
+
+class TestCheckGatewayLiveness:
+ def test_running(self):
+ with patch("gateway.status.is_gateway_running", return_value=True), \
+ patch("gateway.status.get_running_pid", return_value=12345):
+ result = obs.check_gateway_liveness()
+ assert result.status == "ok"
+ assert "12345" in result.message
+
+ def test_not_running(self):
+ with patch("gateway.status.is_gateway_running", return_value=False), \
+ patch("gateway.status.get_running_pid", return_value=None):
+ result = obs.check_gateway_liveness()
+ assert result.status == "critical"
+
+ def test_import_error(self):
+ import builtins
+ real_import = builtins.__import__
+
+ def mock_import(name, *args, **kwargs):
+ if name == "gateway.status":
+ raise ImportError("no module")
+ return real_import(name, *args, **kwargs)
+
+ with patch("builtins.__import__", side_effect=mock_import):
+ result = obs.check_gateway_liveness()
+ assert result.status in ("error", "critical", "ok") # graceful
+
+
+class TestCheckDisk:
+ def test_ok(self, cfg):
+ mock_usage = MagicMock()
+ mock_usage.percent = 50.0
+ mock_usage.free = 10 * 1024 ** 3
+ mock_usage.total = 20 * 1024 ** 3
+ with patch("psutil.disk_usage", return_value=mock_usage):
+ result = obs.check_disk(cfg)
+ assert result.status == "ok"
+ assert result.value == 50.0
+
+ def test_warn(self, cfg):
+ mock_usage = MagicMock()
+ mock_usage.percent = 85.0
+ mock_usage.free = 3 * 1024 ** 3
+ mock_usage.total = 20 * 1024 ** 3
+ with patch("psutil.disk_usage", return_value=mock_usage):
+ result = obs.check_disk(cfg)
+ assert result.status == "warn"
+
+ def test_critical(self, cfg):
+ mock_usage = MagicMock()
+ mock_usage.percent = 92.0
+ mock_usage.free = 1 * 1024 ** 3
+ mock_usage.total = 20 * 1024 ** 3
+ with patch("psutil.disk_usage", return_value=mock_usage):
+ result = obs.check_disk(cfg)
+ assert result.status == "critical"
+
+ def test_no_psutil(self, cfg, monkeypatch):
+ monkeypatch.setattr(obs, "_PSUTIL", False)
+ result = obs.check_disk(cfg)
+ assert result.status == "error"
+
+
+class TestCheckMemory:
+ def test_ok(self, cfg):
+ mock_mem = MagicMock()
+ mock_mem.percent = 60.0
+ mock_mem.available = 4 * 1024 ** 3
+ mock_mem.total = 16 * 1024 ** 3
+ with patch("psutil.virtual_memory", return_value=mock_mem):
+ result = obs.check_memory(cfg)
+ assert result.status == "ok"
+
+ def test_critical(self, cfg):
+ mock_mem = MagicMock()
+ mock_mem.percent = 95.0
+ mock_mem.available = 512 * 1024 ** 2
+ mock_mem.total = 16 * 1024 ** 3
+ with patch("psutil.virtual_memory", return_value=mock_mem):
+ result = obs.check_memory(cfg)
+ assert result.status == "critical"
+
+
+class TestCheckCPU:
+ def test_ok(self, cfg):
+ with patch("psutil.cpu_percent", return_value=40.0):
+ result = obs.check_cpu(cfg)
+ assert result.status == "ok"
+
+ def test_warn(self, cfg):
+ with patch("psutil.cpu_percent", return_value=85.0):
+ result = obs.check_cpu(cfg)
+ assert result.status == "warn"
+
+ def test_critical(self, cfg):
+ with patch("psutil.cpu_percent", return_value=98.0):
+ result = obs.check_cpu(cfg)
+ assert result.status == "critical"
+
+
+class TestCheckDatabase:
+ def test_ok(self, cfg):
+ obs._init_db(cfg.db_path)
+ result = obs.check_database(cfg)
+ assert result.status == "ok"
+
+ def test_not_yet_created(self, cfg):
+ # db_path does not exist
+ result = obs.check_database(cfg)
+ assert result.status == "warn"
+
+
+class TestCheckHTTP:
+ def test_webhook_connection_refused(self, cfg):
+ result = obs.check_webhook_http(cfg)
+ # Port 19999 is not bound — should get a "not reachable" warn
+ assert result.status in ("warn", "error")
+
+ def test_api_server_connection_refused(self, cfg):
+ result = obs.check_api_server_http(cfg)
+ assert result.status in ("warn", "error")
+
+ def test_webhook_ok(self, cfg):
+ import urllib.error
+ from unittest.mock import patch, MagicMock
+ mock_resp = MagicMock()
+ mock_resp.__enter__ = lambda s: s
+ mock_resp.__exit__ = MagicMock(return_value=False)
+ mock_resp.status = 200
+ mock_resp.read.return_value = b'{"status":"ok"}'
+ with patch("urllib.request.urlopen", return_value=mock_resp):
+ result = obs.check_webhook_http(cfg)
+ assert result.status in ("ok", "warn")
+
+ def test_webhook_http_error(self, cfg):
+ mock_resp = MagicMock()
+ mock_resp.__enter__ = lambda s: s
+ mock_resp.__exit__ = MagicMock(return_value=False)
+ mock_resp.status = 503
+ with patch("urllib.request.urlopen", return_value=mock_resp):
+ result = obs.check_webhook_http(cfg)
+ assert result.status == "critical"
+
+
+# ---------------------------------------------------------------------------
+# Persistence tests
+# ---------------------------------------------------------------------------
+
+class TestPersistence:
+ def test_store_and_load(self, cfg):
+ obs._init_db(cfg.db_path)
+ from datetime import datetime, timezone
+ ts = datetime.now(timezone.utc).isoformat()
+ snap = obs.HealthSnapshot(
+ ts=ts,
+ checks=[obs.CheckResult(name="test", status="ok", message="fine")],
+ )
+ obs.store_snapshot(cfg, snap)
+ loaded = obs.load_snapshots(cfg, days=30)
+ assert len(loaded) == 1
+ assert loaded[0]["overall"] == "ok"
+
+ def test_retention_pruning(self, cfg):
+ obs._init_db(cfg.db_path)
+ # Insert an old record directly
+ with obs._db(cfg.db_path) as conn:
+ conn.execute(
+ "INSERT INTO health_snapshots (ts, overall, payload) VALUES (?, ?, ?)",
+ ("2000-01-01T00:00:00+00:00", "ok", '{"ts":"2000-01-01T00:00:00+00:00","overall":"ok","checks":[]}'),
+ )
+ snap = obs.HealthSnapshot(
+ ts="2026-01-01T00:00:00+00:00",
+ checks=[],
+ )
+ obs.store_snapshot(cfg, snap)
+ # Old record should have been pruned
+ with obs._db(cfg.db_path) as conn:
+ count = conn.execute("SELECT count(*) FROM health_snapshots WHERE ts < '2001-01-01'").fetchone()[0]
+ assert count == 0
+
+ def test_record_alert_sent(self, cfg):
+ obs._init_db(cfg.db_path)
+ obs.record_alert_sent(cfg, "gateway_process", "critical", "not running")
+ with obs._db(cfg.db_path) as conn:
+ count = conn.execute("SELECT count(*) FROM alerts_sent").fetchone()[0]
+ assert count == 1
+
+
+# ---------------------------------------------------------------------------
+# Alerting tests
+# ---------------------------------------------------------------------------
+
+class TestAlerting:
+ def _snap(self, status):
+ return obs.HealthSnapshot(
+ ts="2026-01-01T00:00:00+00:00",
+ checks=[obs.CheckResult(name="gateway_process", status=status, message="test")],
+ )
+
+ def test_no_alert_when_ok(self, cfg):
+ snap = self._snap("ok")
+ prev = self._snap("ok")
+ obs._init_db(cfg.db_path)
+ with patch("observatory._telegram_send", return_value=True) as mock_send:
+ alerts = obs.maybe_alert(cfg, snap, prev)
+ mock_send.assert_not_called()
+ assert alerts == []
+
+ def test_alert_on_new_critical(self, cfg):
+ snap = self._snap("critical")
+ prev = self._snap("ok")
+ obs._init_db(cfg.db_path)
+ with patch("observatory._telegram_send", return_value=True) as mock_send:
+ alerts = obs.maybe_alert(cfg, snap, prev)
+ mock_send.assert_called_once()
+ assert len(alerts) == 1
+
+ def test_no_duplicate_alert(self, cfg):
+ snap = self._snap("critical")
+ prev = self._snap("critical") # already critical
+ obs._init_db(cfg.db_path)
+ with patch("observatory._telegram_send", return_value=True) as mock_send:
+ alerts = obs.maybe_alert(cfg, snap, prev)
+ mock_send.assert_not_called()
+ assert alerts == []
+
+ def test_recovery_alert(self, cfg):
+ snap = self._snap("ok")
+ prev = self._snap("critical")
+ obs._init_db(cfg.db_path)
+ with patch("observatory._telegram_send", return_value=True) as mock_send:
+ alerts = obs.maybe_alert(cfg, snap, prev)
+ mock_send.assert_called_once()
+
+ def test_no_alert_without_token(self, cfg):
+ cfg.telegram_token = None
+ snap = self._snap("critical")
+ obs._init_db(cfg.db_path)
+ alerts = obs.maybe_alert(cfg, snap, None)
+ assert alerts == []
+
+ def test_no_alert_without_chat_id(self, cfg):
+ cfg.alert_chat_id = None
+ snap = self._snap("critical")
+ obs._init_db(cfg.db_path)
+ alerts = obs.maybe_alert(cfg, snap, None)
+ assert alerts == []
+
+
+# ---------------------------------------------------------------------------
+# Digest tests
+# ---------------------------------------------------------------------------
+
+class TestDigest:
+ def test_empty_digest(self, cfg):
+ obs._init_db(cfg.db_path)
+ digest = obs.build_digest(cfg)
+ assert "no health data" in digest.lower() or "24 hours" in digest.lower()
+
+ def test_digest_with_data(self, cfg):
+ obs._init_db(cfg.db_path)
+ from datetime import datetime, timezone, timedelta
+ ts = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat()
+ snap = obs.HealthSnapshot(
+ ts=ts,
+ checks=[
+ obs.CheckResult(name="gateway_process", status="ok", message="running"),
+ obs.CheckResult(name="disk", status="ok", message="50% used", value=50.0, unit="%"),
+ obs.CheckResult(name="webhook_http", status="ok", message="ok", value=150.0, unit="ms"),
+ ],
+ )
+ obs.store_snapshot(cfg, snap)
+ digest = obs.build_digest(cfg)
+ assert "Daily Digest" in digest
+ assert "Gateway" in digest or "gateway" in digest
+
+ def test_send_digest_no_token(self, cfg):
+ cfg.telegram_token = None
+ obs._init_db(cfg.db_path)
+ result = obs.send_digest(cfg)
+ assert result is False
+
+
+# ---------------------------------------------------------------------------
+# SLO tests
+# ---------------------------------------------------------------------------
+
+class TestSLO:
+ def test_slo_definitions_complete(self):
+ assert "gateway_uptime_pct" in obs.SLO_DEFINITIONS
+ assert "webhook_latency_ms" in obs.SLO_DEFINITIONS
+ assert "api_server_latency_ms" in obs.SLO_DEFINITIONS
+
+ def test_slo_targets(self):
+ assert obs.SLO_DEFINITIONS["gateway_uptime_pct"]["target"] == 99.5
+ assert obs.SLO_DEFINITIONS["webhook_latency_ms"]["target"] == 2000
+
+
+# ---------------------------------------------------------------------------
+# CLI tests
+# ---------------------------------------------------------------------------
+
+class TestCLI:
+ def test_check_exits_0_on_ok(self, cfg, monkeypatch, tmp_path):
+ monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
+ ok_snap = obs.HealthSnapshot(
+ ts="2026-01-01T00:00:00+00:00",
+ checks=[obs.CheckResult(name="all_good", status="ok", message="fine")],
+ )
+ with patch("observatory.collect_snapshot", return_value=ok_snap), \
+ patch("observatory.store_snapshot"):
+ rc = obs.main(["--check"])
+ assert rc == 0
+
+ def test_check_exits_nonzero_on_critical(self, cfg, monkeypatch, tmp_path):
+ monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
+ bad_snap = obs.HealthSnapshot(
+ ts="2026-01-01T00:00:00+00:00",
+ checks=[obs.CheckResult(name="gateway_process", status="critical", message="down")],
+ )
+ with patch("observatory.collect_snapshot", return_value=bad_snap), \
+ patch("observatory.store_snapshot"):
+ rc = obs.main(["--check"])
+ assert rc != 0
+
+ def test_digest_flag(self, monkeypatch, tmp_path):
+ monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
+ rc = obs.main(["--digest"])
+ assert rc == 0
+
+ def test_slo_flag(self, monkeypatch, tmp_path):
+ monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
+ rc = obs.main(["--slo"])
+ assert rc == 0
+
+ def test_history_flag(self, monkeypatch, tmp_path):
+ monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
+ rc = obs.main(["--history", "5"])
+ assert rc == 0