""" Observatory — Testbed Health Monitoring & Alerting for Hermes Agent Checks running services, system resources, and connectivity. Fires Telegram alerts when thresholds are breached. Posts daily digest reports. Stores 30 days of historical health data in SQLite. Usage: python observatory.py --check # one-shot health check (stdout) python observatory.py --daemon # continuous monitor (60s poll) python observatory.py --digest # print / send daily digest python observatory.py --history N # show last N health records python observatory.py --slo # print SLO report Configuration (env vars, falls back to ~/.hermes/.env): OBSERVATORY_ALERT_CHAT_ID Telegram chat ID for alerts OBSERVATORY_DIGEST_CHAT_ID Telegram chat ID for daily digest (default: alert chat) OBSERVATORY_POLL_INTERVAL Seconds between health polls (default: 60) OBSERVATORY_DB_PATH SQLite path (default: ~/.hermes/observatory.db) TELEGRAM_BOT_TOKEN Bot token used to send alerts # Threshold overrides (all optional): OBSERVATORY_DISK_WARN_PCT Disk usage warn threshold (default: 80) OBSERVATORY_DISK_CRIT_PCT Disk usage critical threshold (default: 90) OBSERVATORY_MEM_WARN_PCT Memory usage warn threshold (default: 80) OBSERVATORY_MEM_CRIT_PCT Memory usage critical threshold (default: 90) OBSERVATORY_CPU_WARN_PCT CPU usage warn threshold (default: 80) OBSERVATORY_CPU_CRIT_PCT CPU usage critical threshold (default: 95) OBSERVATORY_WEBHOOK_URL Webhook endpoint to probe (default: http://127.0.0.1:8080/health) OBSERVATORY_API_URL API server health URL (default: http://127.0.0.1:8642/health) OBSERVATORY_WEBHOOK_LATENCY_SLO_MS Webhook latency SLO ms (default: 2000) OBSERVATORY_GATEWAY_UPTIME_SLO_PCT Gateway uptime SLO % (default: 99.5) """ from __future__ import annotations import argparse import json import logging import os import signal import sqlite3 import sys import time import urllib.request import urllib.error from contextlib import contextmanager from dataclasses import dataclass, field, asdict from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Any, Dict, List, Optional, Tuple # --------------------------------------------------------------------------- # Optional imports # --------------------------------------------------------------------------- try: import psutil _PSUTIL = True except ImportError: _PSUTIL = False try: from dotenv import load_dotenv as _load_dotenv _DOTENV = True except ImportError: _DOTENV = False logger = logging.getLogger("observatory") # --------------------------------------------------------------------------- # Constants & SLO definitions # --------------------------------------------------------------------------- RETENTION_DAYS = 30 SLO_DEFINITIONS = { "gateway_uptime_pct": { "description": "Gateway process uptime over the last 24 hours", "target": 99.5, "unit": "%", }, "webhook_latency_ms": { "description": "Webhook endpoint p95 response latency", "target": 2000, "unit": "ms", "direction": "lower_is_better", }, "api_server_latency_ms": { "description": "API server /health p95 response latency", "target": 2000, "unit": "ms", "direction": "lower_is_better", }, } # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- def _load_env() -> None: """Load .env from HERMES_HOME if dotenv is available.""" if not _DOTENV: return hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) env_path = hermes_home / ".env" if env_path.exists(): _load_dotenv(env_path, override=False) # Project-level .env as dev fallback project_env = Path(__file__).parent / ".env" if project_env.exists(): _load_dotenv(project_env, override=False) @dataclass class ObservatoryConfig: alert_chat_id: Optional[str] = None digest_chat_id: Optional[str] = None telegram_token: Optional[str] = None poll_interval: int = 60 db_path: Path = field(default_factory=lambda: Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) / "observatory.db") disk_warn_pct: float = 80.0 disk_crit_pct: float = 90.0 mem_warn_pct: float = 80.0 mem_crit_pct: float = 90.0 cpu_warn_pct: float = 80.0 cpu_crit_pct: float = 95.0 webhook_url: str = "http://127.0.0.1:8080/health" api_url: str = "http://127.0.0.1:8642/health" webhook_latency_slo_ms: float = 2000.0 gateway_uptime_slo_pct: float = 99.5 @classmethod def from_env(cls) -> "ObservatoryConfig": _load_env() cfg = cls() cfg.telegram_token = os.getenv("TELEGRAM_BOT_TOKEN") cfg.alert_chat_id = os.getenv("OBSERVATORY_ALERT_CHAT_ID") cfg.digest_chat_id = os.getenv("OBSERVATORY_DIGEST_CHAT_ID") or cfg.alert_chat_id cfg.poll_interval = int(os.getenv("OBSERVATORY_POLL_INTERVAL", 60)) db_override = os.getenv("OBSERVATORY_DB_PATH") if db_override: cfg.db_path = Path(db_override) cfg.disk_warn_pct = float(os.getenv("OBSERVATORY_DISK_WARN_PCT", 80)) cfg.disk_crit_pct = float(os.getenv("OBSERVATORY_DISK_CRIT_PCT", 90)) cfg.mem_warn_pct = float(os.getenv("OBSERVATORY_MEM_WARN_PCT", 80)) cfg.mem_crit_pct = float(os.getenv("OBSERVATORY_MEM_CRIT_PCT", 90)) cfg.cpu_warn_pct = float(os.getenv("OBSERVATORY_CPU_WARN_PCT", 80)) cfg.cpu_crit_pct = float(os.getenv("OBSERVATORY_CPU_CRIT_PCT", 95)) cfg.webhook_url = os.getenv("OBSERVATORY_WEBHOOK_URL", "http://127.0.0.1:8080/health") cfg.api_url = os.getenv("OBSERVATORY_API_URL", "http://127.0.0.1:8642/health") cfg.webhook_latency_slo_ms = float(os.getenv("OBSERVATORY_WEBHOOK_LATENCY_SLO_MS", 2000)) cfg.gateway_uptime_slo_pct = float(os.getenv("OBSERVATORY_GATEWAY_UPTIME_SLO_PCT", 99.5)) return cfg # --------------------------------------------------------------------------- # Health check models # --------------------------------------------------------------------------- @dataclass class CheckResult: name: str status: str # "ok" | "warn" | "critical" | "error" message: str value: Optional[float] = None unit: Optional[str] = None extra: Dict[str, Any] = field(default_factory=dict) @dataclass class HealthSnapshot: ts: str # ISO8601 UTC checks: List[CheckResult] = field(default_factory=list) @property def overall_status(self) -> str: statuses = {c.status for c in self.checks} if "critical" in statuses or "error" in statuses: return "critical" if "warn" in statuses: return "warn" return "ok" def to_dict(self) -> Dict[str, Any]: return { "ts": self.ts, "overall": self.overall_status, "checks": [asdict(c) for c in self.checks], } # --------------------------------------------------------------------------- # Individual health checks # --------------------------------------------------------------------------- def check_gateway_liveness() -> CheckResult: """Check whether the Hermes gateway process is running.""" try: from gateway.status import is_gateway_running, get_running_pid running = is_gateway_running() pid = get_running_pid() if running: return CheckResult( name="gateway_process", status="ok", message=f"Gateway running (pid={pid})", value=float(pid) if pid else None, ) return CheckResult( name="gateway_process", status="critical", message="Gateway process is NOT running", ) except Exception as exc: return CheckResult( name="gateway_process", status="error", message=f"Could not determine gateway status: {exc}", ) def check_api_server_http(cfg: ObservatoryConfig) -> CheckResult: """Check API server /health endpoint responsiveness.""" url = cfg.api_url start = time.monotonic() try: req = urllib.request.Request(url, method="GET") req.add_header("User-Agent", "hermes-observatory/1.0") with urllib.request.urlopen(req, timeout=10) as resp: latency_ms = (time.monotonic() - start) * 1000 body = resp.read(512).decode("utf-8", errors="replace") status_code = resp.status if status_code < 400: slo_ok = latency_ms <= cfg.webhook_latency_slo_ms return CheckResult( name="api_server_http", status="ok" if slo_ok else "warn", message=f"API server OK ({latency_ms:.0f}ms){'' if slo_ok else ' — exceeds latency SLO'}", value=latency_ms, unit="ms", extra={"status_code": status_code, "body_preview": body[:100]}, ) return CheckResult( name="api_server_http", status="critical", message=f"API server returned HTTP {status_code}", value=latency_ms, unit="ms", ) except urllib.error.URLError as exc: latency_ms = (time.monotonic() - start) * 1000 # Not running is acceptable if gateway is not configured for API reason = str(exc.reason) if hasattr(exc, "reason") else str(exc) if "Connection refused" in reason or "Connection reset" in reason: return CheckResult( name="api_server_http", status="warn", message=f"API server not reachable at {url} (not started?)", value=latency_ms, unit="ms", ) return CheckResult( name="api_server_http", status="error", message=f"API server probe error: {exc}", value=latency_ms, unit="ms", ) except Exception as exc: latency_ms = (time.monotonic() - start) * 1000 return CheckResult( name="api_server_http", status="error", message=f"API server probe exception: {exc}", value=latency_ms, unit="ms", ) def check_webhook_http(cfg: ObservatoryConfig) -> CheckResult: """Check webhook endpoint responsiveness.""" url = cfg.webhook_url start = time.monotonic() try: req = urllib.request.Request(url, method="GET") req.add_header("User-Agent", "hermes-observatory/1.0") with urllib.request.urlopen(req, timeout=10) as resp: latency_ms = (time.monotonic() - start) * 1000 status_code = resp.status slo_ok = latency_ms <= cfg.webhook_latency_slo_ms if status_code < 400: return CheckResult( name="webhook_http", status="ok" if slo_ok else "warn", message=f"Webhook OK ({latency_ms:.0f}ms){'' if slo_ok else ' — exceeds latency SLO'}", value=latency_ms, unit="ms", extra={"status_code": status_code}, ) return CheckResult( name="webhook_http", status="critical", message=f"Webhook returned HTTP {status_code}", value=latency_ms, unit="ms", ) except urllib.error.URLError as exc: latency_ms = (time.monotonic() - start) * 1000 reason = str(exc.reason) if hasattr(exc, "reason") else str(exc) if "Connection refused" in reason or "Connection reset" in reason: return CheckResult( name="webhook_http", status="warn", message=f"Webhook not reachable at {url} (not started?)", value=latency_ms, unit="ms", ) return CheckResult( name="webhook_http", status="error", message=f"Webhook probe error: {exc}", value=latency_ms, unit="ms", ) except Exception as exc: latency_ms = (time.monotonic() - start) * 1000 return CheckResult( name="webhook_http", status="error", message=f"Webhook probe exception: {exc}", value=latency_ms, unit="ms", ) def check_disk(cfg: ObservatoryConfig) -> CheckResult: """Check disk usage on the HERMES_HOME filesystem.""" if not _PSUTIL: return CheckResult(name="disk", status="error", message="psutil not installed") try: hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) path = str(hermes_home) if hermes_home.exists() else "/" usage = psutil.disk_usage(path) pct = usage.percent free_gb = usage.free / (1024 ** 3) if pct >= cfg.disk_crit_pct: status = "critical" elif pct >= cfg.disk_warn_pct: status = "warn" else: status = "ok" return CheckResult( name="disk", status=status, message=f"Disk {pct:.1f}% used ({free_gb:.1f}GB free)", value=pct, unit="%", extra={"free_bytes": usage.free, "total_bytes": usage.total}, ) except Exception as exc: return CheckResult(name="disk", status="error", message=f"Disk check error: {exc}") def check_memory(cfg: ObservatoryConfig) -> CheckResult: """Check system memory usage.""" if not _PSUTIL: return CheckResult(name="memory", status="error", message="psutil not installed") try: mem = psutil.virtual_memory() pct = mem.percent available_gb = mem.available / (1024 ** 3) if pct >= cfg.mem_crit_pct: status = "critical" elif pct >= cfg.mem_warn_pct: status = "warn" else: status = "ok" return CheckResult( name="memory", status=status, message=f"Memory {pct:.1f}% used ({available_gb:.1f}GB available)", value=pct, unit="%", extra={"available_bytes": mem.available, "total_bytes": mem.total}, ) except Exception as exc: return CheckResult(name="memory", status="error", message=f"Memory check error: {exc}") def check_cpu(cfg: ObservatoryConfig) -> CheckResult: """Check CPU usage (1-second sample).""" if not _PSUTIL: return CheckResult(name="cpu", status="error", message="psutil not installed") try: pct = psutil.cpu_percent(interval=1) if pct >= cfg.cpu_crit_pct: status = "critical" elif pct >= cfg.cpu_warn_pct: status = "warn" else: status = "ok" return CheckResult( name="cpu", status=status, message=f"CPU {pct:.1f}%", value=pct, unit="%", ) except Exception as exc: return CheckResult(name="cpu", status="error", message=f"CPU check error: {exc}") def check_database(cfg: ObservatoryConfig) -> CheckResult: """Check observatory SQLite DB connectivity and size.""" db_path = cfg.db_path try: if not db_path.exists(): return CheckResult( name="database", status="warn", message=f"Observatory DB not yet created at {db_path}", ) size_kb = db_path.stat().st_size / 1024 conn = sqlite3.connect(str(db_path), timeout=5) conn.execute("SELECT count(*) FROM health_snapshots").fetchone() conn.close() return CheckResult( name="database", status="ok", message=f"Observatory DB OK ({size_kb:.1f}KB)", value=size_kb, unit="KB", extra={"path": str(db_path)}, ) except Exception as exc: return CheckResult( name="database", status="error", message=f"DB check error: {exc}", ) def check_response_store_db() -> CheckResult: """Check the API server's SQLite response store DB if it exists.""" try: hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) db_path = hermes_home / "response_store.db" if not db_path.exists(): return CheckResult( name="response_store_db", status="ok", message="Response store DB not present (API server not yet used)", ) size_kb = db_path.stat().st_size / 1024 conn = sqlite3.connect(str(db_path), timeout=5) count = conn.execute("SELECT count(*) FROM responses").fetchone()[0] conn.close() return CheckResult( name="response_store_db", status="ok", message=f"Response store DB OK ({count} responses, {size_kb:.1f}KB)", value=size_kb, unit="KB", ) except Exception as exc: return CheckResult( name="response_store_db", status="error", message=f"Response store DB error: {exc}", ) # --------------------------------------------------------------------------- # Snapshot collector # --------------------------------------------------------------------------- def collect_snapshot(cfg: ObservatoryConfig) -> HealthSnapshot: """Run all checks and return a HealthSnapshot.""" ts = datetime.now(timezone.utc).isoformat() checks = [ check_gateway_liveness(), check_api_server_http(cfg), check_webhook_http(cfg), check_disk(cfg), check_memory(cfg), check_cpu(cfg), check_database(cfg), check_response_store_db(), ] return HealthSnapshot(ts=ts, checks=checks) # --------------------------------------------------------------------------- # SQLite persistence # --------------------------------------------------------------------------- @contextmanager def _db(path: Path): path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(path), timeout=10) conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") try: yield conn conn.commit() finally: conn.close() def _init_db(path: Path) -> None: """Create tables if they don't exist.""" with _db(path) as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS health_snapshots ( id INTEGER PRIMARY KEY AUTOINCREMENT, ts TEXT NOT NULL, overall TEXT NOT NULL, payload TEXT NOT NULL ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_ts ON health_snapshots(ts)") conn.execute(""" CREATE TABLE IF NOT EXISTS alerts_sent ( id INTEGER PRIMARY KEY AUTOINCREMENT, ts TEXT NOT NULL, check_name TEXT NOT NULL, status TEXT NOT NULL, message TEXT NOT NULL ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_alerts_ts ON alerts_sent(ts)") def store_snapshot(cfg: ObservatoryConfig, snapshot: HealthSnapshot) -> None: """Persist snapshot to SQLite.""" _init_db(cfg.db_path) payload = json.dumps(snapshot.to_dict()) with _db(cfg.db_path) as conn: conn.execute( "INSERT INTO health_snapshots (ts, overall, payload) VALUES (?, ?, ?)", (snapshot.ts, snapshot.overall_status, payload), ) # Prune records older than RETENTION_DAYS cutoff = (datetime.now(timezone.utc) - timedelta(days=RETENTION_DAYS)).isoformat() conn.execute("DELETE FROM health_snapshots WHERE ts < ?", (cutoff,)) def record_alert_sent(cfg: ObservatoryConfig, check_name: str, status: str, message: str) -> None: """Record that an alert was dispatched.""" _init_db(cfg.db_path) with _db(cfg.db_path) as conn: conn.execute( "INSERT INTO alerts_sent (ts, check_name, status, message) VALUES (?, ?, ?, ?)", (datetime.now(timezone.utc).isoformat(), check_name, status, message), ) def load_snapshots(cfg: ObservatoryConfig, days: int = RETENTION_DAYS) -> List[Dict[str, Any]]: """Load snapshots from the last N days.""" if not cfg.db_path.exists(): return [] cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat() with _db(cfg.db_path) as conn: rows = conn.execute( "SELECT ts, overall, payload FROM health_snapshots WHERE ts >= ? ORDER BY ts DESC", (cutoff,), ).fetchall() return [json.loads(row[2]) for row in rows] # --------------------------------------------------------------------------- # Alerting # --------------------------------------------------------------------------- def _telegram_send(token: str, chat_id: str, text: str) -> bool: """Send a Telegram message via the Bot API. Returns True on success.""" url = f"https://api.telegram.org/bot{token}/sendMessage" payload = json.dumps({ "chat_id": chat_id, "text": text, "parse_mode": "HTML", "disable_web_page_preview": True, }).encode("utf-8") req = urllib.request.Request(url, data=payload, method="POST") req.add_header("Content-Type", "application/json") req.add_header("User-Agent", "hermes-observatory/1.0") try: with urllib.request.urlopen(req, timeout=15) as resp: body = json.loads(resp.read()) return bool(body.get("ok")) except Exception as exc: logger.warning("Telegram send failed: %s", exc) return False def _status_emoji(status: str) -> str: return {"ok": "✅", "warn": "⚠️", "critical": "🔴", "error": "❌"}.get(status, "❓") def maybe_alert(cfg: ObservatoryConfig, snapshot: HealthSnapshot, prev_snapshot: Optional[HealthSnapshot]) -> List[str]: """ Fire Telegram alerts for newly degraded checks. Returns list of alert messages sent. """ if not cfg.telegram_token or not cfg.alert_chat_id: return [] alerts_sent = [] prev_statuses: Dict[str, str] = {} if prev_snapshot: for c in prev_snapshot.checks: prev_statuses[c.name] = c.status for check in snapshot.checks: if check.status in ("critical", "error"): prev = prev_statuses.get(check.name, "ok") if prev not in ("critical", "error"): # Newly degraded — alert emoji = _status_emoji(check.status) msg = ( f"{emoji} Hermes Observatory Alert\n\n" f"Check: {check.name}\n" f"Status: {check.status.upper()}\n" f"Message: {check.message}\n" f"Time: {snapshot.ts}" ) if _telegram_send(cfg.telegram_token, cfg.alert_chat_id, msg): alerts_sent.append(msg) record_alert_sent(cfg, check.name, check.status, check.message) logger.info("Alert sent for %s (%s)", check.name, check.status) elif check.status == "ok": prev = prev_statuses.get(check.name) if prev in ("critical", "error"): # Recovery alert msg = ( f"✅ Hermes Observatory — Recovery\n\n" f"Check: {check.name} has recovered\n" f"Message: {check.message}\n" f"Time: {snapshot.ts}" ) if _telegram_send(cfg.telegram_token, cfg.alert_chat_id, msg): alerts_sent.append(msg) record_alert_sent(cfg, check.name, "recovery", check.message) return alerts_sent # --------------------------------------------------------------------------- # Daily digest # --------------------------------------------------------------------------- def build_digest(cfg: ObservatoryConfig) -> str: """Build a daily health digest from stored snapshots.""" snapshots = load_snapshots(cfg, days=1) total = len(snapshots) if total == 0: return "No health data available for the last 24 hours." # Count by overall status status_counts: Dict[str, int] = {"ok": 0, "warn": 0, "critical": 0, "error": 0} check_degraded_counts: Dict[str, int] = {} latencies: Dict[str, List[float]] = {} for snap in snapshots: overall = snap.get("overall", "ok") status_counts[overall] = status_counts.get(overall, 0) + 1 for check in snap.get("checks", []): name = check["name"] status = check["status"] if status in ("critical", "error", "warn"): check_degraded_counts[name] = check_degraded_counts.get(name, 0) + 1 value = check.get("value") unit = check.get("unit") if value is not None and unit == "ms": if name not in latencies: latencies[name] = [] latencies[name].append(float(value)) uptime_pct = 100.0 * status_counts["ok"] / total if total else 0.0 now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") lines = [ f"📊 Hermes Observatory — Daily Digest", f"Generated: {now}", f"", f"Last 24h Summary ({total} samples)", f" Healthy: {status_counts['ok']} ({100*status_counts['ok']//total if total else 0}%)", f" Warning: {status_counts.get('warn', 0)}", f" Critical: {status_counts.get('critical', 0)}", f" Error: {status_counts.get('error', 0)}", f"", ] # SLO status lines.append("SLO Status") gw_uptime_target = cfg.gateway_uptime_slo_pct gw_snapshots = [ s for s in snapshots if any(c["name"] == "gateway_process" and c["status"] == "ok" for c in s.get("checks", [])) ] gw_uptime = 100.0 * len(gw_snapshots) / total if total else 0.0 gw_ok = gw_uptime >= gw_uptime_target lines.append( f" {'✅' if gw_ok else '❌'} Gateway uptime: {gw_uptime:.1f}% (target: ≥{gw_uptime_target}%)" ) wh_latency_target = cfg.webhook_latency_slo_ms if "webhook_http" in latencies and latencies["webhook_http"]: wh_vals = sorted(latencies["webhook_http"]) p95_idx = int(len(wh_vals) * 0.95) p95 = wh_vals[min(p95_idx, len(wh_vals) - 1)] wh_ok = p95 <= wh_latency_target lines.append( f" {'✅' if wh_ok else '❌'} Webhook p95 latency: {p95:.0f}ms (target: ≤{wh_latency_target:.0f}ms)" ) else: lines.append(f" ⚫ Webhook latency: no data") if "api_server_http" in latencies and latencies["api_server_http"]: api_vals = sorted(latencies["api_server_http"]) p95_idx = int(len(api_vals) * 0.95) p95 = api_vals[min(p95_idx, len(api_vals) - 1)] api_ok = p95 <= wh_latency_target lines.append( f" {'✅' if api_ok else '❌'} API server p95 latency: {p95:.0f}ms (target: ≤{wh_latency_target:.0f}ms)" ) # Top degraded checks if check_degraded_counts: lines.append("") lines.append("Degraded Checks (24h)") for name, count in sorted(check_degraded_counts.items(), key=lambda x: -x[1]): pct = 100 * count // total if total else 0 lines.append(f" • {name}: {count} incidents ({pct}%)") lines.append("") lines.append(f"Observatory DB: {cfg.db_path}") return "\n".join(lines) def send_digest(cfg: ObservatoryConfig) -> bool: """Build and send the daily digest to Telegram. Returns True on success.""" digest = build_digest(cfg) if cfg.telegram_token and cfg.digest_chat_id: return _telegram_send(cfg.telegram_token, cfg.digest_chat_id, digest) return False # --------------------------------------------------------------------------- # Display helpers # --------------------------------------------------------------------------- _STATUS_COLORS = { "ok": "\033[32m", # green "warn": "\033[33m", # yellow "critical": "\033[31m", # red "error": "\033[91m", # bright red } _RESET = "\033[0m" def _color_status(status: str) -> str: c = _STATUS_COLORS.get(status, "") return f"{c}{status.upper()}{_RESET}" def print_snapshot(snapshot: HealthSnapshot) -> None: overall_color = _STATUS_COLORS.get(snapshot.overall_status, "") print(f"\n{'='*60}") print(f" Hermes Observatory — {snapshot.ts}") print(f" Overall: {overall_color}{snapshot.overall_status.upper()}{_RESET}") print(f"{'='*60}") for check in snapshot.checks: emoji = _status_emoji(check.status) val_str = f" [{check.value:.1f}{check.unit}]" if check.value is not None and check.unit else "" print(f" {emoji} {check.name:<25} {_color_status(check.status):<15} {check.message}{val_str}") print() def print_slo_report(cfg: ObservatoryConfig) -> None: """Print current SLO definitions and targets.""" snapshots = load_snapshots(cfg, days=30) total = len(snapshots) print(f"\n{'='*60}") print(" Hermes Observatory — SLO Report (last 30 days)") print(f"{'='*60}") for slo_key, slo in SLO_DEFINITIONS.items(): print(f"\n {slo['description']}") print(f" Target: {slo['target']}{slo['unit']}") if total == 0: print(f" Status: no data") continue if slo_key == "gateway_uptime_pct": ok_count = sum( 1 for s in snapshots if any(c["name"] == "gateway_process" and c["status"] == "ok" for c in s.get("checks", [])) ) actual = 100.0 * ok_count / total met = actual >= slo["target"] print(f" Actual: {actual:.2f}% {'✅ MET' if met else '❌ MISSED'}") elif slo_key in ("webhook_latency_ms", "api_server_http_latency_ms"): check_name = "webhook_http" if "webhook" in slo_key else "api_server_http" vals = [ float(c["value"]) for s in snapshots for c in s.get("checks", []) if c["name"] == check_name and c.get("value") is not None ] if vals: vals.sort() p95_idx = int(len(vals) * 0.95) p95 = vals[min(p95_idx, len(vals) - 1)] met = p95 <= slo["target"] print(f" p95: {p95:.0f}ms {'✅ MET' if met else '❌ MISSED'}") else: print(f" Status: no latency data") print() def print_history(cfg: ObservatoryConfig, count: int = 20) -> None: """Print recent health records.""" snapshots = load_snapshots(cfg, days=RETENTION_DAYS)[:count] if not snapshots: print("No history available.") return print(f"\n{'='*60}") print(f" Last {min(count, len(snapshots))} health records") print(f"{'='*60}") for snap in snapshots: ts = snap.get("ts", "?") overall = snap.get("overall", "?") emoji = _status_emoji(overall) degraded = [c["name"] for c in snap.get("checks", []) if c["status"] != "ok"] degraded_str = f" — issues: {', '.join(degraded)}" if degraded else "" print(f" {emoji} {ts} {overall.upper()}{degraded_str}") print() # --------------------------------------------------------------------------- # Daemon mode # --------------------------------------------------------------------------- class Observatory: """Continuous monitoring daemon.""" def __init__(self, cfg: ObservatoryConfig): self.cfg = cfg self._running = False self._prev_snapshot: Optional[HealthSnapshot] = None def _handle_signal(self, signum: int, frame: Any) -> None: logger.info("Received signal %d, shutting down...", signum) self._running = False def run_once(self) -> HealthSnapshot: snapshot = collect_snapshot(self.cfg) store_snapshot(self.cfg, snapshot) alerts = maybe_alert(self.cfg, snapshot, self._prev_snapshot) if alerts: logger.info("Sent %d alert(s)", len(alerts)) self._prev_snapshot = snapshot return snapshot def run(self) -> None: _init_db(self.cfg.db_path) logger.info( "Observatory starting — poll_interval=%ds db=%s", self.cfg.poll_interval, self.cfg.db_path, ) self._running = True signal.signal(signal.SIGINT, self._handle_signal) signal.signal(signal.SIGTERM, self._handle_signal) while self._running: try: snapshot = self.run_once() logger.info("Health check: %s", snapshot.overall_status) except Exception as exc: logger.error("Health check failed: %s", exc, exc_info=True) if self._running: time.sleep(self.cfg.poll_interval) logger.info("Observatory stopped.") # --------------------------------------------------------------------------- # CLI entry point # --------------------------------------------------------------------------- def main(argv: Optional[List[str]] = None) -> int: parser = argparse.ArgumentParser( description="Hermes Observatory — health monitoring & alerting", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("--check", action="store_true", help="Run one health check and print results") parser.add_argument("--daemon", action="store_true", help="Run as continuous monitoring daemon") parser.add_argument("--digest", action="store_true", help="Print (and optionally send) daily digest") parser.add_argument("--history", type=int, metavar="N", help="Show last N health records") parser.add_argument("--slo", action="store_true", help="Print SLO report") parser.add_argument("--send-digest", action="store_true", help="Send daily digest via Telegram") parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging") args = parser.parse_args(argv) logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(asctime)s %(levelname)s [observatory] %(message)s", ) cfg = ObservatoryConfig.from_env() _init_db(cfg.db_path) if args.check: snapshot = collect_snapshot(cfg) store_snapshot(cfg, snapshot) print_snapshot(snapshot) return 0 if snapshot.overall_status == "ok" else 1 if args.daemon: obs = Observatory(cfg) obs.run() return 0 if args.digest or args.send_digest: digest = build_digest(cfg) print(digest) if args.send_digest: ok = send_digest(cfg) if ok: print("\n[Digest sent to Telegram]") else: print("\n[Telegram send skipped — token/chat_id not configured]") return 0 if args.history is not None: print_history(cfg, args.history) return 0 if args.slo: print_slo_report(cfg) return 0 # Default: one-shot check snapshot = collect_snapshot(cfg) store_snapshot(cfg, snapshot) print_snapshot(snapshot) return 0 if snapshot.overall_status == "ok" else 1 if __name__ == "__main__": sys.exit(main())