diff --git a/observatory.py b/observatory.py new file mode 100644 index 000000000..f2868da4e --- /dev/null +++ b/observatory.py @@ -0,0 +1,955 @@ +""" +Observatory — Testbed Health Monitoring & Alerting for Hermes Agent + +Checks running services, system resources, and connectivity. +Fires Telegram alerts when thresholds are breached. +Posts daily digest reports. +Stores 30 days of historical health data in SQLite. + +Usage: + python observatory.py --check # one-shot health check (stdout) + python observatory.py --daemon # continuous monitor (60s poll) + python observatory.py --digest # print / send daily digest + python observatory.py --history N # show last N health records + python observatory.py --slo # print SLO report + +Configuration (env vars, falls back to ~/.hermes/.env): + OBSERVATORY_ALERT_CHAT_ID Telegram chat ID for alerts + OBSERVATORY_DIGEST_CHAT_ID Telegram chat ID for daily digest (default: alert chat) + OBSERVATORY_POLL_INTERVAL Seconds between health polls (default: 60) + OBSERVATORY_DB_PATH SQLite path (default: ~/.hermes/observatory.db) + TELEGRAM_BOT_TOKEN Bot token used to send alerts + + # Threshold overrides (all optional): + OBSERVATORY_DISK_WARN_PCT Disk usage warn threshold (default: 80) + OBSERVATORY_DISK_CRIT_PCT Disk usage critical threshold (default: 90) + OBSERVATORY_MEM_WARN_PCT Memory usage warn threshold (default: 80) + OBSERVATORY_MEM_CRIT_PCT Memory usage critical threshold (default: 90) + OBSERVATORY_CPU_WARN_PCT CPU usage warn threshold (default: 80) + OBSERVATORY_CPU_CRIT_PCT CPU usage critical threshold (default: 95) + OBSERVATORY_WEBHOOK_URL Webhook endpoint to probe (default: http://127.0.0.1:8080/health) + OBSERVATORY_API_URL API server health URL (default: http://127.0.0.1:8642/health) + OBSERVATORY_WEBHOOK_LATENCY_SLO_MS Webhook latency SLO ms (default: 2000) + OBSERVATORY_GATEWAY_UPTIME_SLO_PCT Gateway uptime SLO % (default: 99.5) +""" + +from __future__ import annotations + +import argparse +import json +import logging +import os +import signal +import sqlite3 +import sys +import time +import urllib.request +import urllib.error +from contextlib import contextmanager +from dataclasses import dataclass, field, asdict +from datetime import datetime, timezone, timedelta +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +# --------------------------------------------------------------------------- +# Optional imports +# --------------------------------------------------------------------------- +try: + import psutil + _PSUTIL = True +except ImportError: + _PSUTIL = False + +try: + from dotenv import load_dotenv as _load_dotenv + _DOTENV = True +except ImportError: + _DOTENV = False + +logger = logging.getLogger("observatory") + +# --------------------------------------------------------------------------- +# Constants & SLO definitions +# --------------------------------------------------------------------------- + +RETENTION_DAYS = 30 + +SLO_DEFINITIONS = { + "gateway_uptime_pct": { + "description": "Gateway process uptime over the last 24 hours", + "target": 99.5, + "unit": "%", + }, + "webhook_latency_ms": { + "description": "Webhook endpoint p95 response latency", + "target": 2000, + "unit": "ms", + "direction": "lower_is_better", + }, + "api_server_latency_ms": { + "description": "API server /health p95 response latency", + "target": 2000, + "unit": "ms", + "direction": "lower_is_better", + }, +} + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +def _load_env() -> None: + """Load .env from HERMES_HOME if dotenv is available.""" + if not _DOTENV: + return + hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) + env_path = hermes_home / ".env" + if env_path.exists(): + _load_dotenv(env_path, override=False) + # Project-level .env as dev fallback + project_env = Path(__file__).parent / ".env" + if project_env.exists(): + _load_dotenv(project_env, override=False) + + +@dataclass +class ObservatoryConfig: + alert_chat_id: Optional[str] = None + digest_chat_id: Optional[str] = None + telegram_token: Optional[str] = None + poll_interval: int = 60 + db_path: Path = field(default_factory=lambda: Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) / "observatory.db") + disk_warn_pct: float = 80.0 + disk_crit_pct: float = 90.0 + mem_warn_pct: float = 80.0 + mem_crit_pct: float = 90.0 + cpu_warn_pct: float = 80.0 + cpu_crit_pct: float = 95.0 + webhook_url: str = "http://127.0.0.1:8080/health" + api_url: str = "http://127.0.0.1:8642/health" + webhook_latency_slo_ms: float = 2000.0 + gateway_uptime_slo_pct: float = 99.5 + + @classmethod + def from_env(cls) -> "ObservatoryConfig": + _load_env() + cfg = cls() + cfg.telegram_token = os.getenv("TELEGRAM_BOT_TOKEN") + cfg.alert_chat_id = os.getenv("OBSERVATORY_ALERT_CHAT_ID") + cfg.digest_chat_id = os.getenv("OBSERVATORY_DIGEST_CHAT_ID") or cfg.alert_chat_id + cfg.poll_interval = int(os.getenv("OBSERVATORY_POLL_INTERVAL", 60)) + db_override = os.getenv("OBSERVATORY_DB_PATH") + if db_override: + cfg.db_path = Path(db_override) + cfg.disk_warn_pct = float(os.getenv("OBSERVATORY_DISK_WARN_PCT", 80)) + cfg.disk_crit_pct = float(os.getenv("OBSERVATORY_DISK_CRIT_PCT", 90)) + cfg.mem_warn_pct = float(os.getenv("OBSERVATORY_MEM_WARN_PCT", 80)) + cfg.mem_crit_pct = float(os.getenv("OBSERVATORY_MEM_CRIT_PCT", 90)) + cfg.cpu_warn_pct = float(os.getenv("OBSERVATORY_CPU_WARN_PCT", 80)) + cfg.cpu_crit_pct = float(os.getenv("OBSERVATORY_CPU_CRIT_PCT", 95)) + cfg.webhook_url = os.getenv("OBSERVATORY_WEBHOOK_URL", "http://127.0.0.1:8080/health") + cfg.api_url = os.getenv("OBSERVATORY_API_URL", "http://127.0.0.1:8642/health") + cfg.webhook_latency_slo_ms = float(os.getenv("OBSERVATORY_WEBHOOK_LATENCY_SLO_MS", 2000)) + cfg.gateway_uptime_slo_pct = float(os.getenv("OBSERVATORY_GATEWAY_UPTIME_SLO_PCT", 99.5)) + return cfg + + +# --------------------------------------------------------------------------- +# Health check models +# --------------------------------------------------------------------------- + +@dataclass +class CheckResult: + name: str + status: str # "ok" | "warn" | "critical" | "error" + message: str + value: Optional[float] = None + unit: Optional[str] = None + extra: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class HealthSnapshot: + ts: str # ISO8601 UTC + checks: List[CheckResult] = field(default_factory=list) + + @property + def overall_status(self) -> str: + statuses = {c.status for c in self.checks} + if "critical" in statuses or "error" in statuses: + return "critical" + if "warn" in statuses: + return "warn" + return "ok" + + def to_dict(self) -> Dict[str, Any]: + return { + "ts": self.ts, + "overall": self.overall_status, + "checks": [asdict(c) for c in self.checks], + } + + +# --------------------------------------------------------------------------- +# Individual health checks +# --------------------------------------------------------------------------- + +def check_gateway_liveness() -> CheckResult: + """Check whether the Hermes gateway process is running.""" + try: + from gateway.status import is_gateway_running, get_running_pid + running = is_gateway_running() + pid = get_running_pid() + if running: + return CheckResult( + name="gateway_process", + status="ok", + message=f"Gateway running (pid={pid})", + value=float(pid) if pid else None, + ) + return CheckResult( + name="gateway_process", + status="critical", + message="Gateway process is NOT running", + ) + except Exception as exc: + return CheckResult( + name="gateway_process", + status="error", + message=f"Could not determine gateway status: {exc}", + ) + + +def check_api_server_http(cfg: ObservatoryConfig) -> CheckResult: + """Check API server /health endpoint responsiveness.""" + url = cfg.api_url + start = time.monotonic() + try: + req = urllib.request.Request(url, method="GET") + req.add_header("User-Agent", "hermes-observatory/1.0") + with urllib.request.urlopen(req, timeout=10) as resp: + latency_ms = (time.monotonic() - start) * 1000 + body = resp.read(512).decode("utf-8", errors="replace") + status_code = resp.status + if status_code < 400: + slo_ok = latency_ms <= cfg.webhook_latency_slo_ms + return CheckResult( + name="api_server_http", + status="ok" if slo_ok else "warn", + message=f"API server OK ({latency_ms:.0f}ms){'' if slo_ok else ' — exceeds latency SLO'}", + value=latency_ms, + unit="ms", + extra={"status_code": status_code, "body_preview": body[:100]}, + ) + return CheckResult( + name="api_server_http", + status="critical", + message=f"API server returned HTTP {status_code}", + value=latency_ms, + unit="ms", + ) + except urllib.error.URLError as exc: + latency_ms = (time.monotonic() - start) * 1000 + # Not running is acceptable if gateway is not configured for API + reason = str(exc.reason) if hasattr(exc, "reason") else str(exc) + if "Connection refused" in reason or "Connection reset" in reason: + return CheckResult( + name="api_server_http", + status="warn", + message=f"API server not reachable at {url} (not started?)", + value=latency_ms, + unit="ms", + ) + return CheckResult( + name="api_server_http", + status="error", + message=f"API server probe error: {exc}", + value=latency_ms, + unit="ms", + ) + except Exception as exc: + latency_ms = (time.monotonic() - start) * 1000 + return CheckResult( + name="api_server_http", + status="error", + message=f"API server probe exception: {exc}", + value=latency_ms, + unit="ms", + ) + + +def check_webhook_http(cfg: ObservatoryConfig) -> CheckResult: + """Check webhook endpoint responsiveness.""" + url = cfg.webhook_url + start = time.monotonic() + try: + req = urllib.request.Request(url, method="GET") + req.add_header("User-Agent", "hermes-observatory/1.0") + with urllib.request.urlopen(req, timeout=10) as resp: + latency_ms = (time.monotonic() - start) * 1000 + status_code = resp.status + slo_ok = latency_ms <= cfg.webhook_latency_slo_ms + if status_code < 400: + return CheckResult( + name="webhook_http", + status="ok" if slo_ok else "warn", + message=f"Webhook OK ({latency_ms:.0f}ms){'' if slo_ok else ' — exceeds latency SLO'}", + value=latency_ms, + unit="ms", + extra={"status_code": status_code}, + ) + return CheckResult( + name="webhook_http", + status="critical", + message=f"Webhook returned HTTP {status_code}", + value=latency_ms, + unit="ms", + ) + except urllib.error.URLError as exc: + latency_ms = (time.monotonic() - start) * 1000 + reason = str(exc.reason) if hasattr(exc, "reason") else str(exc) + if "Connection refused" in reason or "Connection reset" in reason: + return CheckResult( + name="webhook_http", + status="warn", + message=f"Webhook not reachable at {url} (not started?)", + value=latency_ms, + unit="ms", + ) + return CheckResult( + name="webhook_http", + status="error", + message=f"Webhook probe error: {exc}", + value=latency_ms, + unit="ms", + ) + except Exception as exc: + latency_ms = (time.monotonic() - start) * 1000 + return CheckResult( + name="webhook_http", + status="error", + message=f"Webhook probe exception: {exc}", + value=latency_ms, + unit="ms", + ) + + +def check_disk(cfg: ObservatoryConfig) -> CheckResult: + """Check disk usage on the HERMES_HOME filesystem.""" + if not _PSUTIL: + return CheckResult(name="disk", status="error", message="psutil not installed") + try: + hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) + path = str(hermes_home) if hermes_home.exists() else "/" + usage = psutil.disk_usage(path) + pct = usage.percent + free_gb = usage.free / (1024 ** 3) + if pct >= cfg.disk_crit_pct: + status = "critical" + elif pct >= cfg.disk_warn_pct: + status = "warn" + else: + status = "ok" + return CheckResult( + name="disk", + status=status, + message=f"Disk {pct:.1f}% used ({free_gb:.1f}GB free)", + value=pct, + unit="%", + extra={"free_bytes": usage.free, "total_bytes": usage.total}, + ) + except Exception as exc: + return CheckResult(name="disk", status="error", message=f"Disk check error: {exc}") + + +def check_memory(cfg: ObservatoryConfig) -> CheckResult: + """Check system memory usage.""" + if not _PSUTIL: + return CheckResult(name="memory", status="error", message="psutil not installed") + try: + mem = psutil.virtual_memory() + pct = mem.percent + available_gb = mem.available / (1024 ** 3) + if pct >= cfg.mem_crit_pct: + status = "critical" + elif pct >= cfg.mem_warn_pct: + status = "warn" + else: + status = "ok" + return CheckResult( + name="memory", + status=status, + message=f"Memory {pct:.1f}% used ({available_gb:.1f}GB available)", + value=pct, + unit="%", + extra={"available_bytes": mem.available, "total_bytes": mem.total}, + ) + except Exception as exc: + return CheckResult(name="memory", status="error", message=f"Memory check error: {exc}") + + +def check_cpu(cfg: ObservatoryConfig) -> CheckResult: + """Check CPU usage (1-second sample).""" + if not _PSUTIL: + return CheckResult(name="cpu", status="error", message="psutil not installed") + try: + pct = psutil.cpu_percent(interval=1) + if pct >= cfg.cpu_crit_pct: + status = "critical" + elif pct >= cfg.cpu_warn_pct: + status = "warn" + else: + status = "ok" + return CheckResult( + name="cpu", + status=status, + message=f"CPU {pct:.1f}%", + value=pct, + unit="%", + ) + except Exception as exc: + return CheckResult(name="cpu", status="error", message=f"CPU check error: {exc}") + + +def check_database(cfg: ObservatoryConfig) -> CheckResult: + """Check observatory SQLite DB connectivity and size.""" + db_path = cfg.db_path + try: + if not db_path.exists(): + return CheckResult( + name="database", + status="warn", + message=f"Observatory DB not yet created at {db_path}", + ) + size_kb = db_path.stat().st_size / 1024 + conn = sqlite3.connect(str(db_path), timeout=5) + conn.execute("SELECT count(*) FROM health_snapshots").fetchone() + conn.close() + return CheckResult( + name="database", + status="ok", + message=f"Observatory DB OK ({size_kb:.1f}KB)", + value=size_kb, + unit="KB", + extra={"path": str(db_path)}, + ) + except Exception as exc: + return CheckResult( + name="database", + status="error", + message=f"DB check error: {exc}", + ) + + +def check_response_store_db() -> CheckResult: + """Check the API server's SQLite response store DB if it exists.""" + try: + hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) + db_path = hermes_home / "response_store.db" + if not db_path.exists(): + return CheckResult( + name="response_store_db", + status="ok", + message="Response store DB not present (API server not yet used)", + ) + size_kb = db_path.stat().st_size / 1024 + conn = sqlite3.connect(str(db_path), timeout=5) + count = conn.execute("SELECT count(*) FROM responses").fetchone()[0] + conn.close() + return CheckResult( + name="response_store_db", + status="ok", + message=f"Response store DB OK ({count} responses, {size_kb:.1f}KB)", + value=size_kb, + unit="KB", + ) + except Exception as exc: + return CheckResult( + name="response_store_db", + status="error", + message=f"Response store DB error: {exc}", + ) + + +# --------------------------------------------------------------------------- +# Snapshot collector +# --------------------------------------------------------------------------- + +def collect_snapshot(cfg: ObservatoryConfig) -> HealthSnapshot: + """Run all checks and return a HealthSnapshot.""" + ts = datetime.now(timezone.utc).isoformat() + checks = [ + check_gateway_liveness(), + check_api_server_http(cfg), + check_webhook_http(cfg), + check_disk(cfg), + check_memory(cfg), + check_cpu(cfg), + check_database(cfg), + check_response_store_db(), + ] + return HealthSnapshot(ts=ts, checks=checks) + + +# --------------------------------------------------------------------------- +# SQLite persistence +# --------------------------------------------------------------------------- + +@contextmanager +def _db(path: Path): + path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(path), timeout=10) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA foreign_keys=ON") + try: + yield conn + conn.commit() + finally: + conn.close() + + +def _init_db(path: Path) -> None: + """Create tables if they don't exist.""" + with _db(path) as conn: + conn.execute(""" + CREATE TABLE IF NOT EXISTS health_snapshots ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts TEXT NOT NULL, + overall TEXT NOT NULL, + payload TEXT NOT NULL + ) + """) + conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_ts ON health_snapshots(ts)") + conn.execute(""" + CREATE TABLE IF NOT EXISTS alerts_sent ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts TEXT NOT NULL, + check_name TEXT NOT NULL, + status TEXT NOT NULL, + message TEXT NOT NULL + ) + """) + conn.execute("CREATE INDEX IF NOT EXISTS idx_alerts_ts ON alerts_sent(ts)") + + +def store_snapshot(cfg: ObservatoryConfig, snapshot: HealthSnapshot) -> None: + """Persist snapshot to SQLite.""" + _init_db(cfg.db_path) + payload = json.dumps(snapshot.to_dict()) + with _db(cfg.db_path) as conn: + conn.execute( + "INSERT INTO health_snapshots (ts, overall, payload) VALUES (?, ?, ?)", + (snapshot.ts, snapshot.overall_status, payload), + ) + # Prune records older than RETENTION_DAYS + cutoff = (datetime.now(timezone.utc) - timedelta(days=RETENTION_DAYS)).isoformat() + conn.execute("DELETE FROM health_snapshots WHERE ts < ?", (cutoff,)) + + +def record_alert_sent(cfg: ObservatoryConfig, check_name: str, status: str, message: str) -> None: + """Record that an alert was dispatched.""" + _init_db(cfg.db_path) + with _db(cfg.db_path) as conn: + conn.execute( + "INSERT INTO alerts_sent (ts, check_name, status, message) VALUES (?, ?, ?, ?)", + (datetime.now(timezone.utc).isoformat(), check_name, status, message), + ) + + +def load_snapshots(cfg: ObservatoryConfig, days: int = RETENTION_DAYS) -> List[Dict[str, Any]]: + """Load snapshots from the last N days.""" + if not cfg.db_path.exists(): + return [] + cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat() + with _db(cfg.db_path) as conn: + rows = conn.execute( + "SELECT ts, overall, payload FROM health_snapshots WHERE ts >= ? ORDER BY ts DESC", + (cutoff,), + ).fetchall() + return [json.loads(row[2]) for row in rows] + + +# --------------------------------------------------------------------------- +# Alerting +# --------------------------------------------------------------------------- + +def _telegram_send(token: str, chat_id: str, text: str) -> bool: + """Send a Telegram message via the Bot API. Returns True on success.""" + url = f"https://api.telegram.org/bot{token}/sendMessage" + payload = json.dumps({ + "chat_id": chat_id, + "text": text, + "parse_mode": "HTML", + "disable_web_page_preview": True, + }).encode("utf-8") + req = urllib.request.Request(url, data=payload, method="POST") + req.add_header("Content-Type", "application/json") + req.add_header("User-Agent", "hermes-observatory/1.0") + try: + with urllib.request.urlopen(req, timeout=15) as resp: + body = json.loads(resp.read()) + return bool(body.get("ok")) + except Exception as exc: + logger.warning("Telegram send failed: %s", exc) + return False + + +def _status_emoji(status: str) -> str: + return {"ok": "✅", "warn": "⚠️", "critical": "🔴", "error": "❌"}.get(status, "❓") + + +def maybe_alert(cfg: ObservatoryConfig, snapshot: HealthSnapshot, prev_snapshot: Optional[HealthSnapshot]) -> List[str]: + """ + Fire Telegram alerts for newly degraded checks. + Returns list of alert messages sent. + """ + if not cfg.telegram_token or not cfg.alert_chat_id: + return [] + + alerts_sent = [] + prev_statuses: Dict[str, str] = {} + if prev_snapshot: + for c in prev_snapshot.checks: + prev_statuses[c.name] = c.status + + for check in snapshot.checks: + if check.status in ("critical", "error"): + prev = prev_statuses.get(check.name, "ok") + if prev not in ("critical", "error"): + # Newly degraded — alert + emoji = _status_emoji(check.status) + msg = ( + f"{emoji} Hermes Observatory Alert\n\n" + f"Check: {check.name}\n" + f"Status: {check.status.upper()}\n" + f"Message: {check.message}\n" + f"Time: {snapshot.ts}" + ) + if _telegram_send(cfg.telegram_token, cfg.alert_chat_id, msg): + alerts_sent.append(msg) + record_alert_sent(cfg, check.name, check.status, check.message) + logger.info("Alert sent for %s (%s)", check.name, check.status) + elif check.status == "ok": + prev = prev_statuses.get(check.name) + if prev in ("critical", "error"): + # Recovery alert + msg = ( + f"✅ Hermes Observatory — Recovery\n\n" + f"Check: {check.name} has recovered\n" + f"Message: {check.message}\n" + f"Time: {snapshot.ts}" + ) + if _telegram_send(cfg.telegram_token, cfg.alert_chat_id, msg): + alerts_sent.append(msg) + record_alert_sent(cfg, check.name, "recovery", check.message) + + return alerts_sent + + +# --------------------------------------------------------------------------- +# Daily digest +# --------------------------------------------------------------------------- + +def build_digest(cfg: ObservatoryConfig) -> str: + """Build a daily health digest from stored snapshots.""" + snapshots = load_snapshots(cfg, days=1) + total = len(snapshots) + if total == 0: + return "No health data available for the last 24 hours." + + # Count by overall status + status_counts: Dict[str, int] = {"ok": 0, "warn": 0, "critical": 0, "error": 0} + check_degraded_counts: Dict[str, int] = {} + latencies: Dict[str, List[float]] = {} + + for snap in snapshots: + overall = snap.get("overall", "ok") + status_counts[overall] = status_counts.get(overall, 0) + 1 + for check in snap.get("checks", []): + name = check["name"] + status = check["status"] + if status in ("critical", "error", "warn"): + check_degraded_counts[name] = check_degraded_counts.get(name, 0) + 1 + value = check.get("value") + unit = check.get("unit") + if value is not None and unit == "ms": + if name not in latencies: + latencies[name] = [] + latencies[name].append(float(value)) + + uptime_pct = 100.0 * status_counts["ok"] / total if total else 0.0 + now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + + lines = [ + f"📊 Hermes Observatory — Daily Digest", + f"Generated: {now}", + f"", + f"Last 24h Summary ({total} samples)", + f" Healthy: {status_counts['ok']} ({100*status_counts['ok']//total if total else 0}%)", + f" Warning: {status_counts.get('warn', 0)}", + f" Critical: {status_counts.get('critical', 0)}", + f" Error: {status_counts.get('error', 0)}", + f"", + ] + + # SLO status + lines.append("SLO Status") + gw_uptime_target = cfg.gateway_uptime_slo_pct + gw_snapshots = [ + s for s in snapshots + if any(c["name"] == "gateway_process" and c["status"] == "ok" for c in s.get("checks", [])) + ] + gw_uptime = 100.0 * len(gw_snapshots) / total if total else 0.0 + gw_ok = gw_uptime >= gw_uptime_target + lines.append( + f" {'✅' if gw_ok else '❌'} Gateway uptime: {gw_uptime:.1f}% (target: ≥{gw_uptime_target}%)" + ) + + wh_latency_target = cfg.webhook_latency_slo_ms + if "webhook_http" in latencies and latencies["webhook_http"]: + wh_vals = sorted(latencies["webhook_http"]) + p95_idx = int(len(wh_vals) * 0.95) + p95 = wh_vals[min(p95_idx, len(wh_vals) - 1)] + wh_ok = p95 <= wh_latency_target + lines.append( + f" {'✅' if wh_ok else '❌'} Webhook p95 latency: {p95:.0f}ms (target: ≤{wh_latency_target:.0f}ms)" + ) + else: + lines.append(f" ⚫ Webhook latency: no data") + + if "api_server_http" in latencies and latencies["api_server_http"]: + api_vals = sorted(latencies["api_server_http"]) + p95_idx = int(len(api_vals) * 0.95) + p95 = api_vals[min(p95_idx, len(api_vals) - 1)] + api_ok = p95 <= wh_latency_target + lines.append( + f" {'✅' if api_ok else '❌'} API server p95 latency: {p95:.0f}ms (target: ≤{wh_latency_target:.0f}ms)" + ) + + # Top degraded checks + if check_degraded_counts: + lines.append("") + lines.append("Degraded Checks (24h)") + for name, count in sorted(check_degraded_counts.items(), key=lambda x: -x[1]): + pct = 100 * count // total if total else 0 + lines.append(f" • {name}: {count} incidents ({pct}%)") + + lines.append("") + lines.append(f"Observatory DB: {cfg.db_path}") + + return "\n".join(lines) + + +def send_digest(cfg: ObservatoryConfig) -> bool: + """Build and send the daily digest to Telegram. Returns True on success.""" + digest = build_digest(cfg) + if cfg.telegram_token and cfg.digest_chat_id: + return _telegram_send(cfg.telegram_token, cfg.digest_chat_id, digest) + return False + + +# --------------------------------------------------------------------------- +# Display helpers +# --------------------------------------------------------------------------- + +_STATUS_COLORS = { + "ok": "\033[32m", # green + "warn": "\033[33m", # yellow + "critical": "\033[31m", # red + "error": "\033[91m", # bright red +} +_RESET = "\033[0m" + + +def _color_status(status: str) -> str: + c = _STATUS_COLORS.get(status, "") + return f"{c}{status.upper()}{_RESET}" + + +def print_snapshot(snapshot: HealthSnapshot) -> None: + overall_color = _STATUS_COLORS.get(snapshot.overall_status, "") + print(f"\n{'='*60}") + print(f" Hermes Observatory — {snapshot.ts}") + print(f" Overall: {overall_color}{snapshot.overall_status.upper()}{_RESET}") + print(f"{'='*60}") + for check in snapshot.checks: + emoji = _status_emoji(check.status) + val_str = f" [{check.value:.1f}{check.unit}]" if check.value is not None and check.unit else "" + print(f" {emoji} {check.name:<25} {_color_status(check.status):<15} {check.message}{val_str}") + print() + + +def print_slo_report(cfg: ObservatoryConfig) -> None: + """Print current SLO definitions and targets.""" + snapshots = load_snapshots(cfg, days=30) + total = len(snapshots) + print(f"\n{'='*60}") + print(" Hermes Observatory — SLO Report (last 30 days)") + print(f"{'='*60}") + for slo_key, slo in SLO_DEFINITIONS.items(): + print(f"\n {slo['description']}") + print(f" Target: {slo['target']}{slo['unit']}") + if total == 0: + print(f" Status: no data") + continue + if slo_key == "gateway_uptime_pct": + ok_count = sum( + 1 for s in snapshots + if any(c["name"] == "gateway_process" and c["status"] == "ok" + for c in s.get("checks", [])) + ) + actual = 100.0 * ok_count / total + met = actual >= slo["target"] + print(f" Actual: {actual:.2f}% {'✅ MET' if met else '❌ MISSED'}") + elif slo_key in ("webhook_latency_ms", "api_server_http_latency_ms"): + check_name = "webhook_http" if "webhook" in slo_key else "api_server_http" + vals = [ + float(c["value"]) + for s in snapshots + for c in s.get("checks", []) + if c["name"] == check_name and c.get("value") is not None + ] + if vals: + vals.sort() + p95_idx = int(len(vals) * 0.95) + p95 = vals[min(p95_idx, len(vals) - 1)] + met = p95 <= slo["target"] + print(f" p95: {p95:.0f}ms {'✅ MET' if met else '❌ MISSED'}") + else: + print(f" Status: no latency data") + print() + + +def print_history(cfg: ObservatoryConfig, count: int = 20) -> None: + """Print recent health records.""" + snapshots = load_snapshots(cfg, days=RETENTION_DAYS)[:count] + if not snapshots: + print("No history available.") + return + print(f"\n{'='*60}") + print(f" Last {min(count, len(snapshots))} health records") + print(f"{'='*60}") + for snap in snapshots: + ts = snap.get("ts", "?") + overall = snap.get("overall", "?") + emoji = _status_emoji(overall) + degraded = [c["name"] for c in snap.get("checks", []) if c["status"] != "ok"] + degraded_str = f" — issues: {', '.join(degraded)}" if degraded else "" + print(f" {emoji} {ts} {overall.upper()}{degraded_str}") + print() + + +# --------------------------------------------------------------------------- +# Daemon mode +# --------------------------------------------------------------------------- + +class Observatory: + """Continuous monitoring daemon.""" + + def __init__(self, cfg: ObservatoryConfig): + self.cfg = cfg + self._running = False + self._prev_snapshot: Optional[HealthSnapshot] = None + + def _handle_signal(self, signum: int, frame: Any) -> None: + logger.info("Received signal %d, shutting down...", signum) + self._running = False + + def run_once(self) -> HealthSnapshot: + snapshot = collect_snapshot(self.cfg) + store_snapshot(self.cfg, snapshot) + alerts = maybe_alert(self.cfg, snapshot, self._prev_snapshot) + if alerts: + logger.info("Sent %d alert(s)", len(alerts)) + self._prev_snapshot = snapshot + return snapshot + + def run(self) -> None: + _init_db(self.cfg.db_path) + logger.info( + "Observatory starting — poll_interval=%ds db=%s", + self.cfg.poll_interval, + self.cfg.db_path, + ) + self._running = True + signal.signal(signal.SIGINT, self._handle_signal) + signal.signal(signal.SIGTERM, self._handle_signal) + + while self._running: + try: + snapshot = self.run_once() + logger.info("Health check: %s", snapshot.overall_status) + except Exception as exc: + logger.error("Health check failed: %s", exc, exc_info=True) + if self._running: + time.sleep(self.cfg.poll_interval) + + logger.info("Observatory stopped.") + + +# --------------------------------------------------------------------------- +# CLI entry point +# --------------------------------------------------------------------------- + +def main(argv: Optional[List[str]] = None) -> int: + parser = argparse.ArgumentParser( + description="Hermes Observatory — health monitoring & alerting", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("--check", action="store_true", help="Run one health check and print results") + parser.add_argument("--daemon", action="store_true", help="Run as continuous monitoring daemon") + parser.add_argument("--digest", action="store_true", help="Print (and optionally send) daily digest") + parser.add_argument("--history", type=int, metavar="N", help="Show last N health records") + parser.add_argument("--slo", action="store_true", help="Print SLO report") + parser.add_argument("--send-digest", action="store_true", help="Send daily digest via Telegram") + parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging") + + args = parser.parse_args(argv) + + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(asctime)s %(levelname)s [observatory] %(message)s", + ) + + cfg = ObservatoryConfig.from_env() + _init_db(cfg.db_path) + + if args.check: + snapshot = collect_snapshot(cfg) + store_snapshot(cfg, snapshot) + print_snapshot(snapshot) + return 0 if snapshot.overall_status == "ok" else 1 + + if args.daemon: + obs = Observatory(cfg) + obs.run() + return 0 + + if args.digest or args.send_digest: + digest = build_digest(cfg) + print(digest) + if args.send_digest: + ok = send_digest(cfg) + if ok: + print("\n[Digest sent to Telegram]") + else: + print("\n[Telegram send skipped — token/chat_id not configured]") + return 0 + + if args.history is not None: + print_history(cfg, args.history) + return 0 + + if args.slo: + print_slo_report(cfg) + return 0 + + # Default: one-shot check + snapshot = collect_snapshot(cfg) + store_snapshot(cfg, snapshot) + print_snapshot(snapshot) + return 0 if snapshot.overall_status == "ok" else 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/pyproject.toml b/pyproject.toml index 40abd50ff..ef57f5663 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,7 @@ dependencies = [ modal = ["modal>=1.0.0,<2"] daytona = ["daytona>=0.148.0,<1"] dev = ["pytest>=9.0.2,<10", "pytest-asyncio>=1.3.0,<2", "pytest-xdist>=3.0,<4", "mcp>=1.2.0,<2"] +observatory = ["psutil>=5.9.0,<7"] messaging = ["python-telegram-bot>=22.6,<23", "discord.py[voice]>=2.7.1,<3", "aiohttp>=3.13.3,<4", "slack-bolt>=1.18.0,<2", "slack-sdk>=3.27.0,<4"] cron = ["croniter>=6.0.0,<7"] slack = ["slack-bolt>=1.18.0,<2", "slack-sdk>=3.27.0,<4"] diff --git a/tests/test_observatory.py b/tests/test_observatory.py new file mode 100644 index 000000000..1290bc720 --- /dev/null +++ b/tests/test_observatory.py @@ -0,0 +1,455 @@ +""" +Tests for observatory.py — health monitoring & alerting. + +Refs #147 +""" + +from __future__ import annotations + +import json +import os +import sqlite3 +import sys +import tempfile +import time +from pathlib import Path +from typing import Any +from unittest.mock import MagicMock, patch + +import pytest + +PROJECT_ROOT = Path(__file__).parent.parent +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) + +import observatory as obs + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def cfg(tmp_path): + """Return an ObservatoryConfig pointing at a temp directory.""" + cfg = obs.ObservatoryConfig() + cfg.db_path = tmp_path / "observatory.db" + cfg.alert_chat_id = "99999" + cfg.digest_chat_id = "99999" + cfg.telegram_token = "fake-token" + cfg.webhook_url = "http://127.0.0.1:19999/health" # port never bound + cfg.api_url = "http://127.0.0.1:19998/health" + return cfg + + +# --------------------------------------------------------------------------- +# Config tests +# --------------------------------------------------------------------------- + +class TestObservatoryConfig: + def test_defaults(self): + c = obs.ObservatoryConfig() + assert c.disk_warn_pct == 80.0 + assert c.disk_crit_pct == 90.0 + assert c.mem_warn_pct == 80.0 + assert c.mem_crit_pct == 90.0 + assert c.cpu_warn_pct == 80.0 + assert c.cpu_crit_pct == 95.0 + assert c.poll_interval == 60 + assert c.webhook_latency_slo_ms == 2000.0 + assert c.gateway_uptime_slo_pct == 99.5 + + def test_from_env_overrides(self, monkeypatch): + monkeypatch.setenv("OBSERVATORY_DISK_WARN_PCT", "70") + monkeypatch.setenv("OBSERVATORY_POLL_INTERVAL", "30") + monkeypatch.setenv("OBSERVATORY_ALERT_CHAT_ID", "12345") + monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "tok123") + c = obs.ObservatoryConfig.from_env() + assert c.disk_warn_pct == 70.0 + assert c.poll_interval == 30 + assert c.alert_chat_id == "12345" + assert c.telegram_token == "tok123" + + def test_digest_chat_falls_back_to_alert(self, monkeypatch): + monkeypatch.setenv("OBSERVATORY_ALERT_CHAT_ID", "abc") + monkeypatch.delenv("OBSERVATORY_DIGEST_CHAT_ID", raising=False) + c = obs.ObservatoryConfig.from_env() + assert c.digest_chat_id == "abc" + + +# --------------------------------------------------------------------------- +# CheckResult / HealthSnapshot tests +# --------------------------------------------------------------------------- + +class TestHealthSnapshot: + def _make_snapshot(self, statuses): + checks = [obs.CheckResult(name=f"c{i}", status=s, message="") for i, s in enumerate(statuses)] + return obs.HealthSnapshot(ts="2026-01-01T00:00:00+00:00", checks=checks) + + def test_overall_ok(self): + snap = self._make_snapshot(["ok", "ok"]) + assert snap.overall_status == "ok" + + def test_overall_warn(self): + snap = self._make_snapshot(["ok", "warn"]) + assert snap.overall_status == "warn" + + def test_overall_critical(self): + snap = self._make_snapshot(["ok", "warn", "critical"]) + assert snap.overall_status == "critical" + + def test_overall_error(self): + snap = self._make_snapshot(["ok", "error"]) + assert snap.overall_status == "critical" + + def test_to_dict(self): + snap = self._make_snapshot(["ok"]) + d = snap.to_dict() + assert d["overall"] == "ok" + assert isinstance(d["checks"], list) + assert d["checks"][0]["name"] == "c0" + + +# --------------------------------------------------------------------------- +# Individual check tests +# --------------------------------------------------------------------------- + +class TestCheckGatewayLiveness: + def test_running(self): + with patch("gateway.status.is_gateway_running", return_value=True), \ + patch("gateway.status.get_running_pid", return_value=12345): + result = obs.check_gateway_liveness() + assert result.status == "ok" + assert "12345" in result.message + + def test_not_running(self): + with patch("gateway.status.is_gateway_running", return_value=False), \ + patch("gateway.status.get_running_pid", return_value=None): + result = obs.check_gateway_liveness() + assert result.status == "critical" + + def test_import_error(self): + import builtins + real_import = builtins.__import__ + + def mock_import(name, *args, **kwargs): + if name == "gateway.status": + raise ImportError("no module") + return real_import(name, *args, **kwargs) + + with patch("builtins.__import__", side_effect=mock_import): + result = obs.check_gateway_liveness() + assert result.status in ("error", "critical", "ok") # graceful + + +class TestCheckDisk: + def test_ok(self, cfg): + mock_usage = MagicMock() + mock_usage.percent = 50.0 + mock_usage.free = 10 * 1024 ** 3 + mock_usage.total = 20 * 1024 ** 3 + with patch("psutil.disk_usage", return_value=mock_usage): + result = obs.check_disk(cfg) + assert result.status == "ok" + assert result.value == 50.0 + + def test_warn(self, cfg): + mock_usage = MagicMock() + mock_usage.percent = 85.0 + mock_usage.free = 3 * 1024 ** 3 + mock_usage.total = 20 * 1024 ** 3 + with patch("psutil.disk_usage", return_value=mock_usage): + result = obs.check_disk(cfg) + assert result.status == "warn" + + def test_critical(self, cfg): + mock_usage = MagicMock() + mock_usage.percent = 92.0 + mock_usage.free = 1 * 1024 ** 3 + mock_usage.total = 20 * 1024 ** 3 + with patch("psutil.disk_usage", return_value=mock_usage): + result = obs.check_disk(cfg) + assert result.status == "critical" + + def test_no_psutil(self, cfg, monkeypatch): + monkeypatch.setattr(obs, "_PSUTIL", False) + result = obs.check_disk(cfg) + assert result.status == "error" + + +class TestCheckMemory: + def test_ok(self, cfg): + mock_mem = MagicMock() + mock_mem.percent = 60.0 + mock_mem.available = 4 * 1024 ** 3 + mock_mem.total = 16 * 1024 ** 3 + with patch("psutil.virtual_memory", return_value=mock_mem): + result = obs.check_memory(cfg) + assert result.status == "ok" + + def test_critical(self, cfg): + mock_mem = MagicMock() + mock_mem.percent = 95.0 + mock_mem.available = 512 * 1024 ** 2 + mock_mem.total = 16 * 1024 ** 3 + with patch("psutil.virtual_memory", return_value=mock_mem): + result = obs.check_memory(cfg) + assert result.status == "critical" + + +class TestCheckCPU: + def test_ok(self, cfg): + with patch("psutil.cpu_percent", return_value=40.0): + result = obs.check_cpu(cfg) + assert result.status == "ok" + + def test_warn(self, cfg): + with patch("psutil.cpu_percent", return_value=85.0): + result = obs.check_cpu(cfg) + assert result.status == "warn" + + def test_critical(self, cfg): + with patch("psutil.cpu_percent", return_value=98.0): + result = obs.check_cpu(cfg) + assert result.status == "critical" + + +class TestCheckDatabase: + def test_ok(self, cfg): + obs._init_db(cfg.db_path) + result = obs.check_database(cfg) + assert result.status == "ok" + + def test_not_yet_created(self, cfg): + # db_path does not exist + result = obs.check_database(cfg) + assert result.status == "warn" + + +class TestCheckHTTP: + def test_webhook_connection_refused(self, cfg): + result = obs.check_webhook_http(cfg) + # Port 19999 is not bound — should get a "not reachable" warn + assert result.status in ("warn", "error") + + def test_api_server_connection_refused(self, cfg): + result = obs.check_api_server_http(cfg) + assert result.status in ("warn", "error") + + def test_webhook_ok(self, cfg): + import urllib.error + from unittest.mock import patch, MagicMock + mock_resp = MagicMock() + mock_resp.__enter__ = lambda s: s + mock_resp.__exit__ = MagicMock(return_value=False) + mock_resp.status = 200 + mock_resp.read.return_value = b'{"status":"ok"}' + with patch("urllib.request.urlopen", return_value=mock_resp): + result = obs.check_webhook_http(cfg) + assert result.status in ("ok", "warn") + + def test_webhook_http_error(self, cfg): + mock_resp = MagicMock() + mock_resp.__enter__ = lambda s: s + mock_resp.__exit__ = MagicMock(return_value=False) + mock_resp.status = 503 + with patch("urllib.request.urlopen", return_value=mock_resp): + result = obs.check_webhook_http(cfg) + assert result.status == "critical" + + +# --------------------------------------------------------------------------- +# Persistence tests +# --------------------------------------------------------------------------- + +class TestPersistence: + def test_store_and_load(self, cfg): + obs._init_db(cfg.db_path) + from datetime import datetime, timezone + ts = datetime.now(timezone.utc).isoformat() + snap = obs.HealthSnapshot( + ts=ts, + checks=[obs.CheckResult(name="test", status="ok", message="fine")], + ) + obs.store_snapshot(cfg, snap) + loaded = obs.load_snapshots(cfg, days=30) + assert len(loaded) == 1 + assert loaded[0]["overall"] == "ok" + + def test_retention_pruning(self, cfg): + obs._init_db(cfg.db_path) + # Insert an old record directly + with obs._db(cfg.db_path) as conn: + conn.execute( + "INSERT INTO health_snapshots (ts, overall, payload) VALUES (?, ?, ?)", + ("2000-01-01T00:00:00+00:00", "ok", '{"ts":"2000-01-01T00:00:00+00:00","overall":"ok","checks":[]}'), + ) + snap = obs.HealthSnapshot( + ts="2026-01-01T00:00:00+00:00", + checks=[], + ) + obs.store_snapshot(cfg, snap) + # Old record should have been pruned + with obs._db(cfg.db_path) as conn: + count = conn.execute("SELECT count(*) FROM health_snapshots WHERE ts < '2001-01-01'").fetchone()[0] + assert count == 0 + + def test_record_alert_sent(self, cfg): + obs._init_db(cfg.db_path) + obs.record_alert_sent(cfg, "gateway_process", "critical", "not running") + with obs._db(cfg.db_path) as conn: + count = conn.execute("SELECT count(*) FROM alerts_sent").fetchone()[0] + assert count == 1 + + +# --------------------------------------------------------------------------- +# Alerting tests +# --------------------------------------------------------------------------- + +class TestAlerting: + def _snap(self, status): + return obs.HealthSnapshot( + ts="2026-01-01T00:00:00+00:00", + checks=[obs.CheckResult(name="gateway_process", status=status, message="test")], + ) + + def test_no_alert_when_ok(self, cfg): + snap = self._snap("ok") + prev = self._snap("ok") + obs._init_db(cfg.db_path) + with patch("observatory._telegram_send", return_value=True) as mock_send: + alerts = obs.maybe_alert(cfg, snap, prev) + mock_send.assert_not_called() + assert alerts == [] + + def test_alert_on_new_critical(self, cfg): + snap = self._snap("critical") + prev = self._snap("ok") + obs._init_db(cfg.db_path) + with patch("observatory._telegram_send", return_value=True) as mock_send: + alerts = obs.maybe_alert(cfg, snap, prev) + mock_send.assert_called_once() + assert len(alerts) == 1 + + def test_no_duplicate_alert(self, cfg): + snap = self._snap("critical") + prev = self._snap("critical") # already critical + obs._init_db(cfg.db_path) + with patch("observatory._telegram_send", return_value=True) as mock_send: + alerts = obs.maybe_alert(cfg, snap, prev) + mock_send.assert_not_called() + assert alerts == [] + + def test_recovery_alert(self, cfg): + snap = self._snap("ok") + prev = self._snap("critical") + obs._init_db(cfg.db_path) + with patch("observatory._telegram_send", return_value=True) as mock_send: + alerts = obs.maybe_alert(cfg, snap, prev) + mock_send.assert_called_once() + + def test_no_alert_without_token(self, cfg): + cfg.telegram_token = None + snap = self._snap("critical") + obs._init_db(cfg.db_path) + alerts = obs.maybe_alert(cfg, snap, None) + assert alerts == [] + + def test_no_alert_without_chat_id(self, cfg): + cfg.alert_chat_id = None + snap = self._snap("critical") + obs._init_db(cfg.db_path) + alerts = obs.maybe_alert(cfg, snap, None) + assert alerts == [] + + +# --------------------------------------------------------------------------- +# Digest tests +# --------------------------------------------------------------------------- + +class TestDigest: + def test_empty_digest(self, cfg): + obs._init_db(cfg.db_path) + digest = obs.build_digest(cfg) + assert "no health data" in digest.lower() or "24 hours" in digest.lower() + + def test_digest_with_data(self, cfg): + obs._init_db(cfg.db_path) + from datetime import datetime, timezone, timedelta + ts = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat() + snap = obs.HealthSnapshot( + ts=ts, + checks=[ + obs.CheckResult(name="gateway_process", status="ok", message="running"), + obs.CheckResult(name="disk", status="ok", message="50% used", value=50.0, unit="%"), + obs.CheckResult(name="webhook_http", status="ok", message="ok", value=150.0, unit="ms"), + ], + ) + obs.store_snapshot(cfg, snap) + digest = obs.build_digest(cfg) + assert "Daily Digest" in digest + assert "Gateway" in digest or "gateway" in digest + + def test_send_digest_no_token(self, cfg): + cfg.telegram_token = None + obs._init_db(cfg.db_path) + result = obs.send_digest(cfg) + assert result is False + + +# --------------------------------------------------------------------------- +# SLO tests +# --------------------------------------------------------------------------- + +class TestSLO: + def test_slo_definitions_complete(self): + assert "gateway_uptime_pct" in obs.SLO_DEFINITIONS + assert "webhook_latency_ms" in obs.SLO_DEFINITIONS + assert "api_server_latency_ms" in obs.SLO_DEFINITIONS + + def test_slo_targets(self): + assert obs.SLO_DEFINITIONS["gateway_uptime_pct"]["target"] == 99.5 + assert obs.SLO_DEFINITIONS["webhook_latency_ms"]["target"] == 2000 + + +# --------------------------------------------------------------------------- +# CLI tests +# --------------------------------------------------------------------------- + +class TestCLI: + def test_check_exits_0_on_ok(self, cfg, monkeypatch, tmp_path): + monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db")) + ok_snap = obs.HealthSnapshot( + ts="2026-01-01T00:00:00+00:00", + checks=[obs.CheckResult(name="all_good", status="ok", message="fine")], + ) + with patch("observatory.collect_snapshot", return_value=ok_snap), \ + patch("observatory.store_snapshot"): + rc = obs.main(["--check"]) + assert rc == 0 + + def test_check_exits_nonzero_on_critical(self, cfg, monkeypatch, tmp_path): + monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db")) + bad_snap = obs.HealthSnapshot( + ts="2026-01-01T00:00:00+00:00", + checks=[obs.CheckResult(name="gateway_process", status="critical", message="down")], + ) + with patch("observatory.collect_snapshot", return_value=bad_snap), \ + patch("observatory.store_snapshot"): + rc = obs.main(["--check"]) + assert rc != 0 + + def test_digest_flag(self, monkeypatch, tmp_path): + monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db")) + rc = obs.main(["--digest"]) + assert rc == 0 + + def test_slo_flag(self, monkeypatch, tmp_path): + monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db")) + rc = obs.main(["--slo"]) + assert rc == 0 + + def test_history_flag(self, monkeypatch, tmp_path): + monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db")) + rc = obs.main(["--history", "5"]) + assert rc == 0