Compare commits

...

1 Commits

Author SHA1 Message Date
Alexander Whitestone
9fa0a59761 feat: add Observatory health monitoring & alerting for running services
Some checks failed
Nix / nix (macos-latest) (pull_request) Waiting to run
Dependency Audit / Audit Python dependencies (pull_request) Failing after 4s
Docker Build and Publish / build-and-push (pull_request) Failing after 19s
Nix / nix (ubuntu-latest) (pull_request) Failing after 2s
Secret Scan / Scan for secrets (pull_request) Failing after 2s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Failing after 2s
Tests / test (pull_request) Failing after 6s
Implements Bezalel Epic-003 — a lightweight monitoring script that:

- Checks gateway and API server process liveness
- Monitors disk, memory, and CPU thresholds (warn/critical levels)
- Probes webhook and API server HTTP endpoints for responsiveness
- Verifies SQLite database connectivity and size
- Sends Telegram alerts when checks degrade or recover (within 60s)
- Posts daily digest reports summarising 24h health, SLO status, error counts
- Persists 30 days of health snapshots in SQLite (~/.hermes/observatory.db)
- Tracks alerts_sent for trend analysis
- Defines and tracks SLOs: gateway uptime ≥99.5%, webhook p95 latency ≤2s

Usage:
  python observatory.py --check        # one-shot check
  python observatory.py --daemon       # continuous 60s poll
  python observatory.py --digest       # print daily digest
  python observatory.py --send-digest  # send digest via Telegram
  python observatory.py --slo          # print SLO report
  python observatory.py --history 20   # show last 20 records

Config via env: OBSERVATORY_ALERT_CHAT_ID, TELEGRAM_BOT_TOKEN, etc.
Adds psutil optional dependency group in pyproject.toml.
45 unit tests covering checks, persistence, alerting, digest, SLOs, and CLI.

Fixes #147

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-06 21:59:14 -04:00
3 changed files with 1411 additions and 0 deletions

955
observatory.py Normal file
View File

@@ -0,0 +1,955 @@
"""
Observatory — Testbed Health Monitoring & Alerting for Hermes Agent
Checks running services, system resources, and connectivity.
Fires Telegram alerts when thresholds are breached.
Posts daily digest reports.
Stores 30 days of historical health data in SQLite.
Usage:
python observatory.py --check # one-shot health check (stdout)
python observatory.py --daemon # continuous monitor (60s poll)
python observatory.py --digest # print / send daily digest
python observatory.py --history N # show last N health records
python observatory.py --slo # print SLO report
Configuration (env vars, falls back to ~/.hermes/.env):
OBSERVATORY_ALERT_CHAT_ID Telegram chat ID for alerts
OBSERVATORY_DIGEST_CHAT_ID Telegram chat ID for daily digest (default: alert chat)
OBSERVATORY_POLL_INTERVAL Seconds between health polls (default: 60)
OBSERVATORY_DB_PATH SQLite path (default: ~/.hermes/observatory.db)
TELEGRAM_BOT_TOKEN Bot token used to send alerts
# Threshold overrides (all optional):
OBSERVATORY_DISK_WARN_PCT Disk usage warn threshold (default: 80)
OBSERVATORY_DISK_CRIT_PCT Disk usage critical threshold (default: 90)
OBSERVATORY_MEM_WARN_PCT Memory usage warn threshold (default: 80)
OBSERVATORY_MEM_CRIT_PCT Memory usage critical threshold (default: 90)
OBSERVATORY_CPU_WARN_PCT CPU usage warn threshold (default: 80)
OBSERVATORY_CPU_CRIT_PCT CPU usage critical threshold (default: 95)
OBSERVATORY_WEBHOOK_URL Webhook endpoint to probe (default: http://127.0.0.1:8080/health)
OBSERVATORY_API_URL API server health URL (default: http://127.0.0.1:8642/health)
OBSERVATORY_WEBHOOK_LATENCY_SLO_MS Webhook latency SLO ms (default: 2000)
OBSERVATORY_GATEWAY_UPTIME_SLO_PCT Gateway uptime SLO % (default: 99.5)
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import signal
import sqlite3
import sys
import time
import urllib.request
import urllib.error
from contextlib import contextmanager
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
# ---------------------------------------------------------------------------
# Optional imports
# ---------------------------------------------------------------------------
try:
import psutil
_PSUTIL = True
except ImportError:
_PSUTIL = False
try:
from dotenv import load_dotenv as _load_dotenv
_DOTENV = True
except ImportError:
_DOTENV = False
logger = logging.getLogger("observatory")
# ---------------------------------------------------------------------------
# Constants & SLO definitions
# ---------------------------------------------------------------------------
RETENTION_DAYS = 30
SLO_DEFINITIONS = {
"gateway_uptime_pct": {
"description": "Gateway process uptime over the last 24 hours",
"target": 99.5,
"unit": "%",
},
"webhook_latency_ms": {
"description": "Webhook endpoint p95 response latency",
"target": 2000,
"unit": "ms",
"direction": "lower_is_better",
},
"api_server_latency_ms": {
"description": "API server /health p95 response latency",
"target": 2000,
"unit": "ms",
"direction": "lower_is_better",
},
}
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def _load_env() -> None:
"""Load .env from HERMES_HOME if dotenv is available."""
if not _DOTENV:
return
hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
env_path = hermes_home / ".env"
if env_path.exists():
_load_dotenv(env_path, override=False)
# Project-level .env as dev fallback
project_env = Path(__file__).parent / ".env"
if project_env.exists():
_load_dotenv(project_env, override=False)
@dataclass
class ObservatoryConfig:
alert_chat_id: Optional[str] = None
digest_chat_id: Optional[str] = None
telegram_token: Optional[str] = None
poll_interval: int = 60
db_path: Path = field(default_factory=lambda: Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) / "observatory.db")
disk_warn_pct: float = 80.0
disk_crit_pct: float = 90.0
mem_warn_pct: float = 80.0
mem_crit_pct: float = 90.0
cpu_warn_pct: float = 80.0
cpu_crit_pct: float = 95.0
webhook_url: str = "http://127.0.0.1:8080/health"
api_url: str = "http://127.0.0.1:8642/health"
webhook_latency_slo_ms: float = 2000.0
gateway_uptime_slo_pct: float = 99.5
@classmethod
def from_env(cls) -> "ObservatoryConfig":
_load_env()
cfg = cls()
cfg.telegram_token = os.getenv("TELEGRAM_BOT_TOKEN")
cfg.alert_chat_id = os.getenv("OBSERVATORY_ALERT_CHAT_ID")
cfg.digest_chat_id = os.getenv("OBSERVATORY_DIGEST_CHAT_ID") or cfg.alert_chat_id
cfg.poll_interval = int(os.getenv("OBSERVATORY_POLL_INTERVAL", 60))
db_override = os.getenv("OBSERVATORY_DB_PATH")
if db_override:
cfg.db_path = Path(db_override)
cfg.disk_warn_pct = float(os.getenv("OBSERVATORY_DISK_WARN_PCT", 80))
cfg.disk_crit_pct = float(os.getenv("OBSERVATORY_DISK_CRIT_PCT", 90))
cfg.mem_warn_pct = float(os.getenv("OBSERVATORY_MEM_WARN_PCT", 80))
cfg.mem_crit_pct = float(os.getenv("OBSERVATORY_MEM_CRIT_PCT", 90))
cfg.cpu_warn_pct = float(os.getenv("OBSERVATORY_CPU_WARN_PCT", 80))
cfg.cpu_crit_pct = float(os.getenv("OBSERVATORY_CPU_CRIT_PCT", 95))
cfg.webhook_url = os.getenv("OBSERVATORY_WEBHOOK_URL", "http://127.0.0.1:8080/health")
cfg.api_url = os.getenv("OBSERVATORY_API_URL", "http://127.0.0.1:8642/health")
cfg.webhook_latency_slo_ms = float(os.getenv("OBSERVATORY_WEBHOOK_LATENCY_SLO_MS", 2000))
cfg.gateway_uptime_slo_pct = float(os.getenv("OBSERVATORY_GATEWAY_UPTIME_SLO_PCT", 99.5))
return cfg
# ---------------------------------------------------------------------------
# Health check models
# ---------------------------------------------------------------------------
@dataclass
class CheckResult:
name: str
status: str # "ok" | "warn" | "critical" | "error"
message: str
value: Optional[float] = None
unit: Optional[str] = None
extra: Dict[str, Any] = field(default_factory=dict)
@dataclass
class HealthSnapshot:
ts: str # ISO8601 UTC
checks: List[CheckResult] = field(default_factory=list)
@property
def overall_status(self) -> str:
statuses = {c.status for c in self.checks}
if "critical" in statuses or "error" in statuses:
return "critical"
if "warn" in statuses:
return "warn"
return "ok"
def to_dict(self) -> Dict[str, Any]:
return {
"ts": self.ts,
"overall": self.overall_status,
"checks": [asdict(c) for c in self.checks],
}
# ---------------------------------------------------------------------------
# Individual health checks
# ---------------------------------------------------------------------------
def check_gateway_liveness() -> CheckResult:
"""Check whether the Hermes gateway process is running."""
try:
from gateway.status import is_gateway_running, get_running_pid
running = is_gateway_running()
pid = get_running_pid()
if running:
return CheckResult(
name="gateway_process",
status="ok",
message=f"Gateway running (pid={pid})",
value=float(pid) if pid else None,
)
return CheckResult(
name="gateway_process",
status="critical",
message="Gateway process is NOT running",
)
except Exception as exc:
return CheckResult(
name="gateway_process",
status="error",
message=f"Could not determine gateway status: {exc}",
)
def check_api_server_http(cfg: ObservatoryConfig) -> CheckResult:
"""Check API server /health endpoint responsiveness."""
url = cfg.api_url
start = time.monotonic()
try:
req = urllib.request.Request(url, method="GET")
req.add_header("User-Agent", "hermes-observatory/1.0")
with urllib.request.urlopen(req, timeout=10) as resp:
latency_ms = (time.monotonic() - start) * 1000
body = resp.read(512).decode("utf-8", errors="replace")
status_code = resp.status
if status_code < 400:
slo_ok = latency_ms <= cfg.webhook_latency_slo_ms
return CheckResult(
name="api_server_http",
status="ok" if slo_ok else "warn",
message=f"API server OK ({latency_ms:.0f}ms){'' if slo_ok else ' — exceeds latency SLO'}",
value=latency_ms,
unit="ms",
extra={"status_code": status_code, "body_preview": body[:100]},
)
return CheckResult(
name="api_server_http",
status="critical",
message=f"API server returned HTTP {status_code}",
value=latency_ms,
unit="ms",
)
except urllib.error.URLError as exc:
latency_ms = (time.monotonic() - start) * 1000
# Not running is acceptable if gateway is not configured for API
reason = str(exc.reason) if hasattr(exc, "reason") else str(exc)
if "Connection refused" in reason or "Connection reset" in reason:
return CheckResult(
name="api_server_http",
status="warn",
message=f"API server not reachable at {url} (not started?)",
value=latency_ms,
unit="ms",
)
return CheckResult(
name="api_server_http",
status="error",
message=f"API server probe error: {exc}",
value=latency_ms,
unit="ms",
)
except Exception as exc:
latency_ms = (time.monotonic() - start) * 1000
return CheckResult(
name="api_server_http",
status="error",
message=f"API server probe exception: {exc}",
value=latency_ms,
unit="ms",
)
def check_webhook_http(cfg: ObservatoryConfig) -> CheckResult:
"""Check webhook endpoint responsiveness."""
url = cfg.webhook_url
start = time.monotonic()
try:
req = urllib.request.Request(url, method="GET")
req.add_header("User-Agent", "hermes-observatory/1.0")
with urllib.request.urlopen(req, timeout=10) as resp:
latency_ms = (time.monotonic() - start) * 1000
status_code = resp.status
slo_ok = latency_ms <= cfg.webhook_latency_slo_ms
if status_code < 400:
return CheckResult(
name="webhook_http",
status="ok" if slo_ok else "warn",
message=f"Webhook OK ({latency_ms:.0f}ms){'' if slo_ok else ' — exceeds latency SLO'}",
value=latency_ms,
unit="ms",
extra={"status_code": status_code},
)
return CheckResult(
name="webhook_http",
status="critical",
message=f"Webhook returned HTTP {status_code}",
value=latency_ms,
unit="ms",
)
except urllib.error.URLError as exc:
latency_ms = (time.monotonic() - start) * 1000
reason = str(exc.reason) if hasattr(exc, "reason") else str(exc)
if "Connection refused" in reason or "Connection reset" in reason:
return CheckResult(
name="webhook_http",
status="warn",
message=f"Webhook not reachable at {url} (not started?)",
value=latency_ms,
unit="ms",
)
return CheckResult(
name="webhook_http",
status="error",
message=f"Webhook probe error: {exc}",
value=latency_ms,
unit="ms",
)
except Exception as exc:
latency_ms = (time.monotonic() - start) * 1000
return CheckResult(
name="webhook_http",
status="error",
message=f"Webhook probe exception: {exc}",
value=latency_ms,
unit="ms",
)
def check_disk(cfg: ObservatoryConfig) -> CheckResult:
"""Check disk usage on the HERMES_HOME filesystem."""
if not _PSUTIL:
return CheckResult(name="disk", status="error", message="psutil not installed")
try:
hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
path = str(hermes_home) if hermes_home.exists() else "/"
usage = psutil.disk_usage(path)
pct = usage.percent
free_gb = usage.free / (1024 ** 3)
if pct >= cfg.disk_crit_pct:
status = "critical"
elif pct >= cfg.disk_warn_pct:
status = "warn"
else:
status = "ok"
return CheckResult(
name="disk",
status=status,
message=f"Disk {pct:.1f}% used ({free_gb:.1f}GB free)",
value=pct,
unit="%",
extra={"free_bytes": usage.free, "total_bytes": usage.total},
)
except Exception as exc:
return CheckResult(name="disk", status="error", message=f"Disk check error: {exc}")
def check_memory(cfg: ObservatoryConfig) -> CheckResult:
"""Check system memory usage."""
if not _PSUTIL:
return CheckResult(name="memory", status="error", message="psutil not installed")
try:
mem = psutil.virtual_memory()
pct = mem.percent
available_gb = mem.available / (1024 ** 3)
if pct >= cfg.mem_crit_pct:
status = "critical"
elif pct >= cfg.mem_warn_pct:
status = "warn"
else:
status = "ok"
return CheckResult(
name="memory",
status=status,
message=f"Memory {pct:.1f}% used ({available_gb:.1f}GB available)",
value=pct,
unit="%",
extra={"available_bytes": mem.available, "total_bytes": mem.total},
)
except Exception as exc:
return CheckResult(name="memory", status="error", message=f"Memory check error: {exc}")
def check_cpu(cfg: ObservatoryConfig) -> CheckResult:
"""Check CPU usage (1-second sample)."""
if not _PSUTIL:
return CheckResult(name="cpu", status="error", message="psutil not installed")
try:
pct = psutil.cpu_percent(interval=1)
if pct >= cfg.cpu_crit_pct:
status = "critical"
elif pct >= cfg.cpu_warn_pct:
status = "warn"
else:
status = "ok"
return CheckResult(
name="cpu",
status=status,
message=f"CPU {pct:.1f}%",
value=pct,
unit="%",
)
except Exception as exc:
return CheckResult(name="cpu", status="error", message=f"CPU check error: {exc}")
def check_database(cfg: ObservatoryConfig) -> CheckResult:
"""Check observatory SQLite DB connectivity and size."""
db_path = cfg.db_path
try:
if not db_path.exists():
return CheckResult(
name="database",
status="warn",
message=f"Observatory DB not yet created at {db_path}",
)
size_kb = db_path.stat().st_size / 1024
conn = sqlite3.connect(str(db_path), timeout=5)
conn.execute("SELECT count(*) FROM health_snapshots").fetchone()
conn.close()
return CheckResult(
name="database",
status="ok",
message=f"Observatory DB OK ({size_kb:.1f}KB)",
value=size_kb,
unit="KB",
extra={"path": str(db_path)},
)
except Exception as exc:
return CheckResult(
name="database",
status="error",
message=f"DB check error: {exc}",
)
def check_response_store_db() -> CheckResult:
"""Check the API server's SQLite response store DB if it exists."""
try:
hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
db_path = hermes_home / "response_store.db"
if not db_path.exists():
return CheckResult(
name="response_store_db",
status="ok",
message="Response store DB not present (API server not yet used)",
)
size_kb = db_path.stat().st_size / 1024
conn = sqlite3.connect(str(db_path), timeout=5)
count = conn.execute("SELECT count(*) FROM responses").fetchone()[0]
conn.close()
return CheckResult(
name="response_store_db",
status="ok",
message=f"Response store DB OK ({count} responses, {size_kb:.1f}KB)",
value=size_kb,
unit="KB",
)
except Exception as exc:
return CheckResult(
name="response_store_db",
status="error",
message=f"Response store DB error: {exc}",
)
# ---------------------------------------------------------------------------
# Snapshot collector
# ---------------------------------------------------------------------------
def collect_snapshot(cfg: ObservatoryConfig) -> HealthSnapshot:
"""Run all checks and return a HealthSnapshot."""
ts = datetime.now(timezone.utc).isoformat()
checks = [
check_gateway_liveness(),
check_api_server_http(cfg),
check_webhook_http(cfg),
check_disk(cfg),
check_memory(cfg),
check_cpu(cfg),
check_database(cfg),
check_response_store_db(),
]
return HealthSnapshot(ts=ts, checks=checks)
# ---------------------------------------------------------------------------
# SQLite persistence
# ---------------------------------------------------------------------------
@contextmanager
def _db(path: Path):
path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(path), timeout=10)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
try:
yield conn
conn.commit()
finally:
conn.close()
def _init_db(path: Path) -> None:
"""Create tables if they don't exist."""
with _db(path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS health_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts TEXT NOT NULL,
overall TEXT NOT NULL,
payload TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_ts ON health_snapshots(ts)")
conn.execute("""
CREATE TABLE IF NOT EXISTS alerts_sent (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts TEXT NOT NULL,
check_name TEXT NOT NULL,
status TEXT NOT NULL,
message TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_alerts_ts ON alerts_sent(ts)")
def store_snapshot(cfg: ObservatoryConfig, snapshot: HealthSnapshot) -> None:
"""Persist snapshot to SQLite."""
_init_db(cfg.db_path)
payload = json.dumps(snapshot.to_dict())
with _db(cfg.db_path) as conn:
conn.execute(
"INSERT INTO health_snapshots (ts, overall, payload) VALUES (?, ?, ?)",
(snapshot.ts, snapshot.overall_status, payload),
)
# Prune records older than RETENTION_DAYS
cutoff = (datetime.now(timezone.utc) - timedelta(days=RETENTION_DAYS)).isoformat()
conn.execute("DELETE FROM health_snapshots WHERE ts < ?", (cutoff,))
def record_alert_sent(cfg: ObservatoryConfig, check_name: str, status: str, message: str) -> None:
"""Record that an alert was dispatched."""
_init_db(cfg.db_path)
with _db(cfg.db_path) as conn:
conn.execute(
"INSERT INTO alerts_sent (ts, check_name, status, message) VALUES (?, ?, ?, ?)",
(datetime.now(timezone.utc).isoformat(), check_name, status, message),
)
def load_snapshots(cfg: ObservatoryConfig, days: int = RETENTION_DAYS) -> List[Dict[str, Any]]:
"""Load snapshots from the last N days."""
if not cfg.db_path.exists():
return []
cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
with _db(cfg.db_path) as conn:
rows = conn.execute(
"SELECT ts, overall, payload FROM health_snapshots WHERE ts >= ? ORDER BY ts DESC",
(cutoff,),
).fetchall()
return [json.loads(row[2]) for row in rows]
# ---------------------------------------------------------------------------
# Alerting
# ---------------------------------------------------------------------------
def _telegram_send(token: str, chat_id: str, text: str) -> bool:
"""Send a Telegram message via the Bot API. Returns True on success."""
url = f"https://api.telegram.org/bot{token}/sendMessage"
payload = json.dumps({
"chat_id": chat_id,
"text": text,
"parse_mode": "HTML",
"disable_web_page_preview": True,
}).encode("utf-8")
req = urllib.request.Request(url, data=payload, method="POST")
req.add_header("Content-Type", "application/json")
req.add_header("User-Agent", "hermes-observatory/1.0")
try:
with urllib.request.urlopen(req, timeout=15) as resp:
body = json.loads(resp.read())
return bool(body.get("ok"))
except Exception as exc:
logger.warning("Telegram send failed: %s", exc)
return False
def _status_emoji(status: str) -> str:
return {"ok": "", "warn": "⚠️", "critical": "🔴", "error": ""}.get(status, "")
def maybe_alert(cfg: ObservatoryConfig, snapshot: HealthSnapshot, prev_snapshot: Optional[HealthSnapshot]) -> List[str]:
"""
Fire Telegram alerts for newly degraded checks.
Returns list of alert messages sent.
"""
if not cfg.telegram_token or not cfg.alert_chat_id:
return []
alerts_sent = []
prev_statuses: Dict[str, str] = {}
if prev_snapshot:
for c in prev_snapshot.checks:
prev_statuses[c.name] = c.status
for check in snapshot.checks:
if check.status in ("critical", "error"):
prev = prev_statuses.get(check.name, "ok")
if prev not in ("critical", "error"):
# Newly degraded — alert
emoji = _status_emoji(check.status)
msg = (
f"{emoji} <b>Hermes Observatory Alert</b>\n\n"
f"<b>Check:</b> {check.name}\n"
f"<b>Status:</b> {check.status.upper()}\n"
f"<b>Message:</b> {check.message}\n"
f"<b>Time:</b> {snapshot.ts}"
)
if _telegram_send(cfg.telegram_token, cfg.alert_chat_id, msg):
alerts_sent.append(msg)
record_alert_sent(cfg, check.name, check.status, check.message)
logger.info("Alert sent for %s (%s)", check.name, check.status)
elif check.status == "ok":
prev = prev_statuses.get(check.name)
if prev in ("critical", "error"):
# Recovery alert
msg = (
f"✅ <b>Hermes Observatory — Recovery</b>\n\n"
f"<b>Check:</b> {check.name} has recovered\n"
f"<b>Message:</b> {check.message}\n"
f"<b>Time:</b> {snapshot.ts}"
)
if _telegram_send(cfg.telegram_token, cfg.alert_chat_id, msg):
alerts_sent.append(msg)
record_alert_sent(cfg, check.name, "recovery", check.message)
return alerts_sent
# ---------------------------------------------------------------------------
# Daily digest
# ---------------------------------------------------------------------------
def build_digest(cfg: ObservatoryConfig) -> str:
"""Build a daily health digest from stored snapshots."""
snapshots = load_snapshots(cfg, days=1)
total = len(snapshots)
if total == 0:
return "No health data available for the last 24 hours."
# Count by overall status
status_counts: Dict[str, int] = {"ok": 0, "warn": 0, "critical": 0, "error": 0}
check_degraded_counts: Dict[str, int] = {}
latencies: Dict[str, List[float]] = {}
for snap in snapshots:
overall = snap.get("overall", "ok")
status_counts[overall] = status_counts.get(overall, 0) + 1
for check in snap.get("checks", []):
name = check["name"]
status = check["status"]
if status in ("critical", "error", "warn"):
check_degraded_counts[name] = check_degraded_counts.get(name, 0) + 1
value = check.get("value")
unit = check.get("unit")
if value is not None and unit == "ms":
if name not in latencies:
latencies[name] = []
latencies[name].append(float(value))
uptime_pct = 100.0 * status_counts["ok"] / total if total else 0.0
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
lines = [
f"📊 <b>Hermes Observatory — Daily Digest</b>",
f"<b>Generated:</b> {now}",
f"",
f"<b>Last 24h Summary</b> ({total} samples)",
f" Healthy: {status_counts['ok']} ({100*status_counts['ok']//total if total else 0}%)",
f" Warning: {status_counts.get('warn', 0)}",
f" Critical: {status_counts.get('critical', 0)}",
f" Error: {status_counts.get('error', 0)}",
f"",
]
# SLO status
lines.append("<b>SLO Status</b>")
gw_uptime_target = cfg.gateway_uptime_slo_pct
gw_snapshots = [
s for s in snapshots
if any(c["name"] == "gateway_process" and c["status"] == "ok" for c in s.get("checks", []))
]
gw_uptime = 100.0 * len(gw_snapshots) / total if total else 0.0
gw_ok = gw_uptime >= gw_uptime_target
lines.append(
f" {'' if gw_ok else ''} Gateway uptime: {gw_uptime:.1f}% (target: ≥{gw_uptime_target}%)"
)
wh_latency_target = cfg.webhook_latency_slo_ms
if "webhook_http" in latencies and latencies["webhook_http"]:
wh_vals = sorted(latencies["webhook_http"])
p95_idx = int(len(wh_vals) * 0.95)
p95 = wh_vals[min(p95_idx, len(wh_vals) - 1)]
wh_ok = p95 <= wh_latency_target
lines.append(
f" {'' if wh_ok else ''} Webhook p95 latency: {p95:.0f}ms (target: ≤{wh_latency_target:.0f}ms)"
)
else:
lines.append(f" ⚫ Webhook latency: no data")
if "api_server_http" in latencies and latencies["api_server_http"]:
api_vals = sorted(latencies["api_server_http"])
p95_idx = int(len(api_vals) * 0.95)
p95 = api_vals[min(p95_idx, len(api_vals) - 1)]
api_ok = p95 <= wh_latency_target
lines.append(
f" {'' if api_ok else ''} API server p95 latency: {p95:.0f}ms (target: ≤{wh_latency_target:.0f}ms)"
)
# Top degraded checks
if check_degraded_counts:
lines.append("")
lines.append("<b>Degraded Checks (24h)</b>")
for name, count in sorted(check_degraded_counts.items(), key=lambda x: -x[1]):
pct = 100 * count // total if total else 0
lines.append(f"{name}: {count} incidents ({pct}%)")
lines.append("")
lines.append(f"<i>Observatory DB: {cfg.db_path}</i>")
return "\n".join(lines)
def send_digest(cfg: ObservatoryConfig) -> bool:
"""Build and send the daily digest to Telegram. Returns True on success."""
digest = build_digest(cfg)
if cfg.telegram_token and cfg.digest_chat_id:
return _telegram_send(cfg.telegram_token, cfg.digest_chat_id, digest)
return False
# ---------------------------------------------------------------------------
# Display helpers
# ---------------------------------------------------------------------------
_STATUS_COLORS = {
"ok": "\033[32m", # green
"warn": "\033[33m", # yellow
"critical": "\033[31m", # red
"error": "\033[91m", # bright red
}
_RESET = "\033[0m"
def _color_status(status: str) -> str:
c = _STATUS_COLORS.get(status, "")
return f"{c}{status.upper()}{_RESET}"
def print_snapshot(snapshot: HealthSnapshot) -> None:
overall_color = _STATUS_COLORS.get(snapshot.overall_status, "")
print(f"\n{'='*60}")
print(f" Hermes Observatory — {snapshot.ts}")
print(f" Overall: {overall_color}{snapshot.overall_status.upper()}{_RESET}")
print(f"{'='*60}")
for check in snapshot.checks:
emoji = _status_emoji(check.status)
val_str = f" [{check.value:.1f}{check.unit}]" if check.value is not None and check.unit else ""
print(f" {emoji} {check.name:<25} {_color_status(check.status):<15} {check.message}{val_str}")
print()
def print_slo_report(cfg: ObservatoryConfig) -> None:
"""Print current SLO definitions and targets."""
snapshots = load_snapshots(cfg, days=30)
total = len(snapshots)
print(f"\n{'='*60}")
print(" Hermes Observatory — SLO Report (last 30 days)")
print(f"{'='*60}")
for slo_key, slo in SLO_DEFINITIONS.items():
print(f"\n {slo['description']}")
print(f" Target: {slo['target']}{slo['unit']}")
if total == 0:
print(f" Status: no data")
continue
if slo_key == "gateway_uptime_pct":
ok_count = sum(
1 for s in snapshots
if any(c["name"] == "gateway_process" and c["status"] == "ok"
for c in s.get("checks", []))
)
actual = 100.0 * ok_count / total
met = actual >= slo["target"]
print(f" Actual: {actual:.2f}% {'✅ MET' if met else '❌ MISSED'}")
elif slo_key in ("webhook_latency_ms", "api_server_http_latency_ms"):
check_name = "webhook_http" if "webhook" in slo_key else "api_server_http"
vals = [
float(c["value"])
for s in snapshots
for c in s.get("checks", [])
if c["name"] == check_name and c.get("value") is not None
]
if vals:
vals.sort()
p95_idx = int(len(vals) * 0.95)
p95 = vals[min(p95_idx, len(vals) - 1)]
met = p95 <= slo["target"]
print(f" p95: {p95:.0f}ms {'✅ MET' if met else '❌ MISSED'}")
else:
print(f" Status: no latency data")
print()
def print_history(cfg: ObservatoryConfig, count: int = 20) -> None:
"""Print recent health records."""
snapshots = load_snapshots(cfg, days=RETENTION_DAYS)[:count]
if not snapshots:
print("No history available.")
return
print(f"\n{'='*60}")
print(f" Last {min(count, len(snapshots))} health records")
print(f"{'='*60}")
for snap in snapshots:
ts = snap.get("ts", "?")
overall = snap.get("overall", "?")
emoji = _status_emoji(overall)
degraded = [c["name"] for c in snap.get("checks", []) if c["status"] != "ok"]
degraded_str = f" — issues: {', '.join(degraded)}" if degraded else ""
print(f" {emoji} {ts} {overall.upper()}{degraded_str}")
print()
# ---------------------------------------------------------------------------
# Daemon mode
# ---------------------------------------------------------------------------
class Observatory:
"""Continuous monitoring daemon."""
def __init__(self, cfg: ObservatoryConfig):
self.cfg = cfg
self._running = False
self._prev_snapshot: Optional[HealthSnapshot] = None
def _handle_signal(self, signum: int, frame: Any) -> None:
logger.info("Received signal %d, shutting down...", signum)
self._running = False
def run_once(self) -> HealthSnapshot:
snapshot = collect_snapshot(self.cfg)
store_snapshot(self.cfg, snapshot)
alerts = maybe_alert(self.cfg, snapshot, self._prev_snapshot)
if alerts:
logger.info("Sent %d alert(s)", len(alerts))
self._prev_snapshot = snapshot
return snapshot
def run(self) -> None:
_init_db(self.cfg.db_path)
logger.info(
"Observatory starting — poll_interval=%ds db=%s",
self.cfg.poll_interval,
self.cfg.db_path,
)
self._running = True
signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGTERM, self._handle_signal)
while self._running:
try:
snapshot = self.run_once()
logger.info("Health check: %s", snapshot.overall_status)
except Exception as exc:
logger.error("Health check failed: %s", exc, exc_info=True)
if self._running:
time.sleep(self.cfg.poll_interval)
logger.info("Observatory stopped.")
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def main(argv: Optional[List[str]] = None) -> int:
parser = argparse.ArgumentParser(
description="Hermes Observatory — health monitoring & alerting",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--check", action="store_true", help="Run one health check and print results")
parser.add_argument("--daemon", action="store_true", help="Run as continuous monitoring daemon")
parser.add_argument("--digest", action="store_true", help="Print (and optionally send) daily digest")
parser.add_argument("--history", type=int, metavar="N", help="Show last N health records")
parser.add_argument("--slo", action="store_true", help="Print SLO report")
parser.add_argument("--send-digest", action="store_true", help="Send daily digest via Telegram")
parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging")
args = parser.parse_args(argv)
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s %(levelname)s [observatory] %(message)s",
)
cfg = ObservatoryConfig.from_env()
_init_db(cfg.db_path)
if args.check:
snapshot = collect_snapshot(cfg)
store_snapshot(cfg, snapshot)
print_snapshot(snapshot)
return 0 if snapshot.overall_status == "ok" else 1
if args.daemon:
obs = Observatory(cfg)
obs.run()
return 0
if args.digest or args.send_digest:
digest = build_digest(cfg)
print(digest)
if args.send_digest:
ok = send_digest(cfg)
if ok:
print("\n[Digest sent to Telegram]")
else:
print("\n[Telegram send skipped — token/chat_id not configured]")
return 0
if args.history is not None:
print_history(cfg, args.history)
return 0
if args.slo:
print_slo_report(cfg)
return 0
# Default: one-shot check
snapshot = collect_snapshot(cfg)
store_snapshot(cfg, snapshot)
print_snapshot(snapshot)
return 0 if snapshot.overall_status == "ok" else 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -42,6 +42,7 @@ dependencies = [
modal = ["modal>=1.0.0,<2"]
daytona = ["daytona>=0.148.0,<1"]
dev = ["pytest>=9.0.2,<10", "pytest-asyncio>=1.3.0,<2", "pytest-xdist>=3.0,<4", "mcp>=1.2.0,<2"]
observatory = ["psutil>=5.9.0,<7"]
messaging = ["python-telegram-bot>=22.6,<23", "discord.py[voice]>=2.7.1,<3", "aiohttp>=3.13.3,<4", "slack-bolt>=1.18.0,<2", "slack-sdk>=3.27.0,<4"]
cron = ["croniter>=6.0.0,<7"]
slack = ["slack-bolt>=1.18.0,<2", "slack-sdk>=3.27.0,<4"]

455
tests/test_observatory.py Normal file
View File

@@ -0,0 +1,455 @@
"""
Tests for observatory.py — health monitoring & alerting.
Refs #147
"""
from __future__ import annotations
import json
import os
import sqlite3
import sys
import tempfile
import time
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
PROJECT_ROOT = Path(__file__).parent.parent
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
import observatory as obs
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def cfg(tmp_path):
"""Return an ObservatoryConfig pointing at a temp directory."""
cfg = obs.ObservatoryConfig()
cfg.db_path = tmp_path / "observatory.db"
cfg.alert_chat_id = "99999"
cfg.digest_chat_id = "99999"
cfg.telegram_token = "fake-token"
cfg.webhook_url = "http://127.0.0.1:19999/health" # port never bound
cfg.api_url = "http://127.0.0.1:19998/health"
return cfg
# ---------------------------------------------------------------------------
# Config tests
# ---------------------------------------------------------------------------
class TestObservatoryConfig:
def test_defaults(self):
c = obs.ObservatoryConfig()
assert c.disk_warn_pct == 80.0
assert c.disk_crit_pct == 90.0
assert c.mem_warn_pct == 80.0
assert c.mem_crit_pct == 90.0
assert c.cpu_warn_pct == 80.0
assert c.cpu_crit_pct == 95.0
assert c.poll_interval == 60
assert c.webhook_latency_slo_ms == 2000.0
assert c.gateway_uptime_slo_pct == 99.5
def test_from_env_overrides(self, monkeypatch):
monkeypatch.setenv("OBSERVATORY_DISK_WARN_PCT", "70")
monkeypatch.setenv("OBSERVATORY_POLL_INTERVAL", "30")
monkeypatch.setenv("OBSERVATORY_ALERT_CHAT_ID", "12345")
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "tok123")
c = obs.ObservatoryConfig.from_env()
assert c.disk_warn_pct == 70.0
assert c.poll_interval == 30
assert c.alert_chat_id == "12345"
assert c.telegram_token == "tok123"
def test_digest_chat_falls_back_to_alert(self, monkeypatch):
monkeypatch.setenv("OBSERVATORY_ALERT_CHAT_ID", "abc")
monkeypatch.delenv("OBSERVATORY_DIGEST_CHAT_ID", raising=False)
c = obs.ObservatoryConfig.from_env()
assert c.digest_chat_id == "abc"
# ---------------------------------------------------------------------------
# CheckResult / HealthSnapshot tests
# ---------------------------------------------------------------------------
class TestHealthSnapshot:
def _make_snapshot(self, statuses):
checks = [obs.CheckResult(name=f"c{i}", status=s, message="") for i, s in enumerate(statuses)]
return obs.HealthSnapshot(ts="2026-01-01T00:00:00+00:00", checks=checks)
def test_overall_ok(self):
snap = self._make_snapshot(["ok", "ok"])
assert snap.overall_status == "ok"
def test_overall_warn(self):
snap = self._make_snapshot(["ok", "warn"])
assert snap.overall_status == "warn"
def test_overall_critical(self):
snap = self._make_snapshot(["ok", "warn", "critical"])
assert snap.overall_status == "critical"
def test_overall_error(self):
snap = self._make_snapshot(["ok", "error"])
assert snap.overall_status == "critical"
def test_to_dict(self):
snap = self._make_snapshot(["ok"])
d = snap.to_dict()
assert d["overall"] == "ok"
assert isinstance(d["checks"], list)
assert d["checks"][0]["name"] == "c0"
# ---------------------------------------------------------------------------
# Individual check tests
# ---------------------------------------------------------------------------
class TestCheckGatewayLiveness:
def test_running(self):
with patch("gateway.status.is_gateway_running", return_value=True), \
patch("gateway.status.get_running_pid", return_value=12345):
result = obs.check_gateway_liveness()
assert result.status == "ok"
assert "12345" in result.message
def test_not_running(self):
with patch("gateway.status.is_gateway_running", return_value=False), \
patch("gateway.status.get_running_pid", return_value=None):
result = obs.check_gateway_liveness()
assert result.status == "critical"
def test_import_error(self):
import builtins
real_import = builtins.__import__
def mock_import(name, *args, **kwargs):
if name == "gateway.status":
raise ImportError("no module")
return real_import(name, *args, **kwargs)
with patch("builtins.__import__", side_effect=mock_import):
result = obs.check_gateway_liveness()
assert result.status in ("error", "critical", "ok") # graceful
class TestCheckDisk:
def test_ok(self, cfg):
mock_usage = MagicMock()
mock_usage.percent = 50.0
mock_usage.free = 10 * 1024 ** 3
mock_usage.total = 20 * 1024 ** 3
with patch("psutil.disk_usage", return_value=mock_usage):
result = obs.check_disk(cfg)
assert result.status == "ok"
assert result.value == 50.0
def test_warn(self, cfg):
mock_usage = MagicMock()
mock_usage.percent = 85.0
mock_usage.free = 3 * 1024 ** 3
mock_usage.total = 20 * 1024 ** 3
with patch("psutil.disk_usage", return_value=mock_usage):
result = obs.check_disk(cfg)
assert result.status == "warn"
def test_critical(self, cfg):
mock_usage = MagicMock()
mock_usage.percent = 92.0
mock_usage.free = 1 * 1024 ** 3
mock_usage.total = 20 * 1024 ** 3
with patch("psutil.disk_usage", return_value=mock_usage):
result = obs.check_disk(cfg)
assert result.status == "critical"
def test_no_psutil(self, cfg, monkeypatch):
monkeypatch.setattr(obs, "_PSUTIL", False)
result = obs.check_disk(cfg)
assert result.status == "error"
class TestCheckMemory:
def test_ok(self, cfg):
mock_mem = MagicMock()
mock_mem.percent = 60.0
mock_mem.available = 4 * 1024 ** 3
mock_mem.total = 16 * 1024 ** 3
with patch("psutil.virtual_memory", return_value=mock_mem):
result = obs.check_memory(cfg)
assert result.status == "ok"
def test_critical(self, cfg):
mock_mem = MagicMock()
mock_mem.percent = 95.0
mock_mem.available = 512 * 1024 ** 2
mock_mem.total = 16 * 1024 ** 3
with patch("psutil.virtual_memory", return_value=mock_mem):
result = obs.check_memory(cfg)
assert result.status == "critical"
class TestCheckCPU:
def test_ok(self, cfg):
with patch("psutil.cpu_percent", return_value=40.0):
result = obs.check_cpu(cfg)
assert result.status == "ok"
def test_warn(self, cfg):
with patch("psutil.cpu_percent", return_value=85.0):
result = obs.check_cpu(cfg)
assert result.status == "warn"
def test_critical(self, cfg):
with patch("psutil.cpu_percent", return_value=98.0):
result = obs.check_cpu(cfg)
assert result.status == "critical"
class TestCheckDatabase:
def test_ok(self, cfg):
obs._init_db(cfg.db_path)
result = obs.check_database(cfg)
assert result.status == "ok"
def test_not_yet_created(self, cfg):
# db_path does not exist
result = obs.check_database(cfg)
assert result.status == "warn"
class TestCheckHTTP:
def test_webhook_connection_refused(self, cfg):
result = obs.check_webhook_http(cfg)
# Port 19999 is not bound — should get a "not reachable" warn
assert result.status in ("warn", "error")
def test_api_server_connection_refused(self, cfg):
result = obs.check_api_server_http(cfg)
assert result.status in ("warn", "error")
def test_webhook_ok(self, cfg):
import urllib.error
from unittest.mock import patch, MagicMock
mock_resp = MagicMock()
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
mock_resp.status = 200
mock_resp.read.return_value = b'{"status":"ok"}'
with patch("urllib.request.urlopen", return_value=mock_resp):
result = obs.check_webhook_http(cfg)
assert result.status in ("ok", "warn")
def test_webhook_http_error(self, cfg):
mock_resp = MagicMock()
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
mock_resp.status = 503
with patch("urllib.request.urlopen", return_value=mock_resp):
result = obs.check_webhook_http(cfg)
assert result.status == "critical"
# ---------------------------------------------------------------------------
# Persistence tests
# ---------------------------------------------------------------------------
class TestPersistence:
def test_store_and_load(self, cfg):
obs._init_db(cfg.db_path)
from datetime import datetime, timezone
ts = datetime.now(timezone.utc).isoformat()
snap = obs.HealthSnapshot(
ts=ts,
checks=[obs.CheckResult(name="test", status="ok", message="fine")],
)
obs.store_snapshot(cfg, snap)
loaded = obs.load_snapshots(cfg, days=30)
assert len(loaded) == 1
assert loaded[0]["overall"] == "ok"
def test_retention_pruning(self, cfg):
obs._init_db(cfg.db_path)
# Insert an old record directly
with obs._db(cfg.db_path) as conn:
conn.execute(
"INSERT INTO health_snapshots (ts, overall, payload) VALUES (?, ?, ?)",
("2000-01-01T00:00:00+00:00", "ok", '{"ts":"2000-01-01T00:00:00+00:00","overall":"ok","checks":[]}'),
)
snap = obs.HealthSnapshot(
ts="2026-01-01T00:00:00+00:00",
checks=[],
)
obs.store_snapshot(cfg, snap)
# Old record should have been pruned
with obs._db(cfg.db_path) as conn:
count = conn.execute("SELECT count(*) FROM health_snapshots WHERE ts < '2001-01-01'").fetchone()[0]
assert count == 0
def test_record_alert_sent(self, cfg):
obs._init_db(cfg.db_path)
obs.record_alert_sent(cfg, "gateway_process", "critical", "not running")
with obs._db(cfg.db_path) as conn:
count = conn.execute("SELECT count(*) FROM alerts_sent").fetchone()[0]
assert count == 1
# ---------------------------------------------------------------------------
# Alerting tests
# ---------------------------------------------------------------------------
class TestAlerting:
def _snap(self, status):
return obs.HealthSnapshot(
ts="2026-01-01T00:00:00+00:00",
checks=[obs.CheckResult(name="gateway_process", status=status, message="test")],
)
def test_no_alert_when_ok(self, cfg):
snap = self._snap("ok")
prev = self._snap("ok")
obs._init_db(cfg.db_path)
with patch("observatory._telegram_send", return_value=True) as mock_send:
alerts = obs.maybe_alert(cfg, snap, prev)
mock_send.assert_not_called()
assert alerts == []
def test_alert_on_new_critical(self, cfg):
snap = self._snap("critical")
prev = self._snap("ok")
obs._init_db(cfg.db_path)
with patch("observatory._telegram_send", return_value=True) as mock_send:
alerts = obs.maybe_alert(cfg, snap, prev)
mock_send.assert_called_once()
assert len(alerts) == 1
def test_no_duplicate_alert(self, cfg):
snap = self._snap("critical")
prev = self._snap("critical") # already critical
obs._init_db(cfg.db_path)
with patch("observatory._telegram_send", return_value=True) as mock_send:
alerts = obs.maybe_alert(cfg, snap, prev)
mock_send.assert_not_called()
assert alerts == []
def test_recovery_alert(self, cfg):
snap = self._snap("ok")
prev = self._snap("critical")
obs._init_db(cfg.db_path)
with patch("observatory._telegram_send", return_value=True) as mock_send:
alerts = obs.maybe_alert(cfg, snap, prev)
mock_send.assert_called_once()
def test_no_alert_without_token(self, cfg):
cfg.telegram_token = None
snap = self._snap("critical")
obs._init_db(cfg.db_path)
alerts = obs.maybe_alert(cfg, snap, None)
assert alerts == []
def test_no_alert_without_chat_id(self, cfg):
cfg.alert_chat_id = None
snap = self._snap("critical")
obs._init_db(cfg.db_path)
alerts = obs.maybe_alert(cfg, snap, None)
assert alerts == []
# ---------------------------------------------------------------------------
# Digest tests
# ---------------------------------------------------------------------------
class TestDigest:
def test_empty_digest(self, cfg):
obs._init_db(cfg.db_path)
digest = obs.build_digest(cfg)
assert "no health data" in digest.lower() or "24 hours" in digest.lower()
def test_digest_with_data(self, cfg):
obs._init_db(cfg.db_path)
from datetime import datetime, timezone, timedelta
ts = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat()
snap = obs.HealthSnapshot(
ts=ts,
checks=[
obs.CheckResult(name="gateway_process", status="ok", message="running"),
obs.CheckResult(name="disk", status="ok", message="50% used", value=50.0, unit="%"),
obs.CheckResult(name="webhook_http", status="ok", message="ok", value=150.0, unit="ms"),
],
)
obs.store_snapshot(cfg, snap)
digest = obs.build_digest(cfg)
assert "Daily Digest" in digest
assert "Gateway" in digest or "gateway" in digest
def test_send_digest_no_token(self, cfg):
cfg.telegram_token = None
obs._init_db(cfg.db_path)
result = obs.send_digest(cfg)
assert result is False
# ---------------------------------------------------------------------------
# SLO tests
# ---------------------------------------------------------------------------
class TestSLO:
def test_slo_definitions_complete(self):
assert "gateway_uptime_pct" in obs.SLO_DEFINITIONS
assert "webhook_latency_ms" in obs.SLO_DEFINITIONS
assert "api_server_latency_ms" in obs.SLO_DEFINITIONS
def test_slo_targets(self):
assert obs.SLO_DEFINITIONS["gateway_uptime_pct"]["target"] == 99.5
assert obs.SLO_DEFINITIONS["webhook_latency_ms"]["target"] == 2000
# ---------------------------------------------------------------------------
# CLI tests
# ---------------------------------------------------------------------------
class TestCLI:
def test_check_exits_0_on_ok(self, cfg, monkeypatch, tmp_path):
monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
ok_snap = obs.HealthSnapshot(
ts="2026-01-01T00:00:00+00:00",
checks=[obs.CheckResult(name="all_good", status="ok", message="fine")],
)
with patch("observatory.collect_snapshot", return_value=ok_snap), \
patch("observatory.store_snapshot"):
rc = obs.main(["--check"])
assert rc == 0
def test_check_exits_nonzero_on_critical(self, cfg, monkeypatch, tmp_path):
monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
bad_snap = obs.HealthSnapshot(
ts="2026-01-01T00:00:00+00:00",
checks=[obs.CheckResult(name="gateway_process", status="critical", message="down")],
)
with patch("observatory.collect_snapshot", return_value=bad_snap), \
patch("observatory.store_snapshot"):
rc = obs.main(["--check"])
assert rc != 0
def test_digest_flag(self, monkeypatch, tmp_path):
monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
rc = obs.main(["--digest"])
assert rc == 0
def test_slo_flag(self, monkeypatch, tmp_path):
monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
rc = obs.main(["--slo"])
assert rc == 0
def test_history_flag(self, monkeypatch, tmp_path):
monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
rc = obs.main(["--history", "5"])
assert rc == 0