""" Tests for observatory.py — health monitoring & alerting. Refs #147 """ from __future__ import annotations import json import os import sqlite3 import sys import tempfile import time from pathlib import Path from typing import Any from unittest.mock import MagicMock, patch import pytest PROJECT_ROOT = Path(__file__).parent.parent if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) import observatory as obs # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def cfg(tmp_path): """Return an ObservatoryConfig pointing at a temp directory.""" cfg = obs.ObservatoryConfig() cfg.db_path = tmp_path / "observatory.db" cfg.alert_chat_id = "99999" cfg.digest_chat_id = "99999" cfg.telegram_token = "fake-token" cfg.webhook_url = "http://127.0.0.1:19999/health" # port never bound cfg.api_url = "http://127.0.0.1:19998/health" return cfg # --------------------------------------------------------------------------- # Config tests # --------------------------------------------------------------------------- class TestObservatoryConfig: def test_defaults(self): c = obs.ObservatoryConfig() assert c.disk_warn_pct == 80.0 assert c.disk_crit_pct == 90.0 assert c.mem_warn_pct == 80.0 assert c.mem_crit_pct == 90.0 assert c.cpu_warn_pct == 80.0 assert c.cpu_crit_pct == 95.0 assert c.poll_interval == 60 assert c.webhook_latency_slo_ms == 2000.0 assert c.gateway_uptime_slo_pct == 99.5 def test_from_env_overrides(self, monkeypatch): monkeypatch.setenv("OBSERVATORY_DISK_WARN_PCT", "70") monkeypatch.setenv("OBSERVATORY_POLL_INTERVAL", "30") monkeypatch.setenv("OBSERVATORY_ALERT_CHAT_ID", "12345") monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "tok123") c = obs.ObservatoryConfig.from_env() assert c.disk_warn_pct == 70.0 assert c.poll_interval == 30 assert c.alert_chat_id == "12345" assert c.telegram_token == "tok123" def test_digest_chat_falls_back_to_alert(self, monkeypatch): monkeypatch.setenv("OBSERVATORY_ALERT_CHAT_ID", "abc") monkeypatch.delenv("OBSERVATORY_DIGEST_CHAT_ID", raising=False) c = obs.ObservatoryConfig.from_env() assert c.digest_chat_id == "abc" # --------------------------------------------------------------------------- # CheckResult / HealthSnapshot tests # --------------------------------------------------------------------------- class TestHealthSnapshot: def _make_snapshot(self, statuses): checks = [obs.CheckResult(name=f"c{i}", status=s, message="") for i, s in enumerate(statuses)] return obs.HealthSnapshot(ts="2026-01-01T00:00:00+00:00", checks=checks) def test_overall_ok(self): snap = self._make_snapshot(["ok", "ok"]) assert snap.overall_status == "ok" def test_overall_warn(self): snap = self._make_snapshot(["ok", "warn"]) assert snap.overall_status == "warn" def test_overall_critical(self): snap = self._make_snapshot(["ok", "warn", "critical"]) assert snap.overall_status == "critical" def test_overall_error(self): snap = self._make_snapshot(["ok", "error"]) assert snap.overall_status == "critical" def test_to_dict(self): snap = self._make_snapshot(["ok"]) d = snap.to_dict() assert d["overall"] == "ok" assert isinstance(d["checks"], list) assert d["checks"][0]["name"] == "c0" # --------------------------------------------------------------------------- # Individual check tests # --------------------------------------------------------------------------- class TestCheckGatewayLiveness: def test_running(self): with patch("gateway.status.is_gateway_running", return_value=True), \ patch("gateway.status.get_running_pid", return_value=12345): result = obs.check_gateway_liveness() assert result.status == "ok" assert "12345" in result.message def test_not_running(self): with patch("gateway.status.is_gateway_running", return_value=False), \ patch("gateway.status.get_running_pid", return_value=None): result = obs.check_gateway_liveness() assert result.status == "critical" def test_import_error(self): import builtins real_import = builtins.__import__ def mock_import(name, *args, **kwargs): if name == "gateway.status": raise ImportError("no module") return real_import(name, *args, **kwargs) with patch("builtins.__import__", side_effect=mock_import): result = obs.check_gateway_liveness() assert result.status in ("error", "critical", "ok") # graceful class TestCheckDisk: def test_ok(self, cfg): mock_usage = MagicMock() mock_usage.percent = 50.0 mock_usage.free = 10 * 1024 ** 3 mock_usage.total = 20 * 1024 ** 3 with patch("psutil.disk_usage", return_value=mock_usage): result = obs.check_disk(cfg) assert result.status == "ok" assert result.value == 50.0 def test_warn(self, cfg): mock_usage = MagicMock() mock_usage.percent = 85.0 mock_usage.free = 3 * 1024 ** 3 mock_usage.total = 20 * 1024 ** 3 with patch("psutil.disk_usage", return_value=mock_usage): result = obs.check_disk(cfg) assert result.status == "warn" def test_critical(self, cfg): mock_usage = MagicMock() mock_usage.percent = 92.0 mock_usage.free = 1 * 1024 ** 3 mock_usage.total = 20 * 1024 ** 3 with patch("psutil.disk_usage", return_value=mock_usage): result = obs.check_disk(cfg) assert result.status == "critical" def test_no_psutil(self, cfg, monkeypatch): monkeypatch.setattr(obs, "_PSUTIL", False) result = obs.check_disk(cfg) assert result.status == "error" class TestCheckMemory: def test_ok(self, cfg): mock_mem = MagicMock() mock_mem.percent = 60.0 mock_mem.available = 4 * 1024 ** 3 mock_mem.total = 16 * 1024 ** 3 with patch("psutil.virtual_memory", return_value=mock_mem): result = obs.check_memory(cfg) assert result.status == "ok" def test_critical(self, cfg): mock_mem = MagicMock() mock_mem.percent = 95.0 mock_mem.available = 512 * 1024 ** 2 mock_mem.total = 16 * 1024 ** 3 with patch("psutil.virtual_memory", return_value=mock_mem): result = obs.check_memory(cfg) assert result.status == "critical" class TestCheckCPU: def test_ok(self, cfg): with patch("psutil.cpu_percent", return_value=40.0): result = obs.check_cpu(cfg) assert result.status == "ok" def test_warn(self, cfg): with patch("psutil.cpu_percent", return_value=85.0): result = obs.check_cpu(cfg) assert result.status == "warn" def test_critical(self, cfg): with patch("psutil.cpu_percent", return_value=98.0): result = obs.check_cpu(cfg) assert result.status == "critical" class TestCheckDatabase: def test_ok(self, cfg): obs._init_db(cfg.db_path) result = obs.check_database(cfg) assert result.status == "ok" def test_not_yet_created(self, cfg): # db_path does not exist result = obs.check_database(cfg) assert result.status == "warn" class TestCheckHTTP: def test_webhook_connection_refused(self, cfg): result = obs.check_webhook_http(cfg) # Port 19999 is not bound — should get a "not reachable" warn assert result.status in ("warn", "error") def test_api_server_connection_refused(self, cfg): result = obs.check_api_server_http(cfg) assert result.status in ("warn", "error") def test_webhook_ok(self, cfg): import urllib.error from unittest.mock import patch, MagicMock mock_resp = MagicMock() mock_resp.__enter__ = lambda s: s mock_resp.__exit__ = MagicMock(return_value=False) mock_resp.status = 200 mock_resp.read.return_value = b'{"status":"ok"}' with patch("urllib.request.urlopen", return_value=mock_resp): result = obs.check_webhook_http(cfg) assert result.status in ("ok", "warn") def test_webhook_http_error(self, cfg): mock_resp = MagicMock() mock_resp.__enter__ = lambda s: s mock_resp.__exit__ = MagicMock(return_value=False) mock_resp.status = 503 with patch("urllib.request.urlopen", return_value=mock_resp): result = obs.check_webhook_http(cfg) assert result.status == "critical" # --------------------------------------------------------------------------- # Persistence tests # --------------------------------------------------------------------------- class TestPersistence: def test_store_and_load(self, cfg): obs._init_db(cfg.db_path) from datetime import datetime, timezone ts = datetime.now(timezone.utc).isoformat() snap = obs.HealthSnapshot( ts=ts, checks=[obs.CheckResult(name="test", status="ok", message="fine")], ) obs.store_snapshot(cfg, snap) loaded = obs.load_snapshots(cfg, days=30) assert len(loaded) == 1 assert loaded[0]["overall"] == "ok" def test_retention_pruning(self, cfg): obs._init_db(cfg.db_path) # Insert an old record directly with obs._db(cfg.db_path) as conn: conn.execute( "INSERT INTO health_snapshots (ts, overall, payload) VALUES (?, ?, ?)", ("2000-01-01T00:00:00+00:00", "ok", '{"ts":"2000-01-01T00:00:00+00:00","overall":"ok","checks":[]}'), ) snap = obs.HealthSnapshot( ts="2026-01-01T00:00:00+00:00", checks=[], ) obs.store_snapshot(cfg, snap) # Old record should have been pruned with obs._db(cfg.db_path) as conn: count = conn.execute("SELECT count(*) FROM health_snapshots WHERE ts < '2001-01-01'").fetchone()[0] assert count == 0 def test_record_alert_sent(self, cfg): obs._init_db(cfg.db_path) obs.record_alert_sent(cfg, "gateway_process", "critical", "not running") with obs._db(cfg.db_path) as conn: count = conn.execute("SELECT count(*) FROM alerts_sent").fetchone()[0] assert count == 1 # --------------------------------------------------------------------------- # Alerting tests # --------------------------------------------------------------------------- class TestAlerting: def _snap(self, status): return obs.HealthSnapshot( ts="2026-01-01T00:00:00+00:00", checks=[obs.CheckResult(name="gateway_process", status=status, message="test")], ) def test_no_alert_when_ok(self, cfg): snap = self._snap("ok") prev = self._snap("ok") obs._init_db(cfg.db_path) with patch("observatory._telegram_send", return_value=True) as mock_send: alerts = obs.maybe_alert(cfg, snap, prev) mock_send.assert_not_called() assert alerts == [] def test_alert_on_new_critical(self, cfg): snap = self._snap("critical") prev = self._snap("ok") obs._init_db(cfg.db_path) with patch("observatory._telegram_send", return_value=True) as mock_send: alerts = obs.maybe_alert(cfg, snap, prev) mock_send.assert_called_once() assert len(alerts) == 1 def test_no_duplicate_alert(self, cfg): snap = self._snap("critical") prev = self._snap("critical") # already critical obs._init_db(cfg.db_path) with patch("observatory._telegram_send", return_value=True) as mock_send: alerts = obs.maybe_alert(cfg, snap, prev) mock_send.assert_not_called() assert alerts == [] def test_recovery_alert(self, cfg): snap = self._snap("ok") prev = self._snap("critical") obs._init_db(cfg.db_path) with patch("observatory._telegram_send", return_value=True) as mock_send: alerts = obs.maybe_alert(cfg, snap, prev) mock_send.assert_called_once() def test_no_alert_without_token(self, cfg): cfg.telegram_token = None snap = self._snap("critical") obs._init_db(cfg.db_path) alerts = obs.maybe_alert(cfg, snap, None) assert alerts == [] def test_no_alert_without_chat_id(self, cfg): cfg.alert_chat_id = None snap = self._snap("critical") obs._init_db(cfg.db_path) alerts = obs.maybe_alert(cfg, snap, None) assert alerts == [] # --------------------------------------------------------------------------- # Digest tests # --------------------------------------------------------------------------- class TestDigest: def test_empty_digest(self, cfg): obs._init_db(cfg.db_path) digest = obs.build_digest(cfg) assert "no health data" in digest.lower() or "24 hours" in digest.lower() def test_digest_with_data(self, cfg): obs._init_db(cfg.db_path) from datetime import datetime, timezone, timedelta ts = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat() snap = obs.HealthSnapshot( ts=ts, checks=[ obs.CheckResult(name="gateway_process", status="ok", message="running"), obs.CheckResult(name="disk", status="ok", message="50% used", value=50.0, unit="%"), obs.CheckResult(name="webhook_http", status="ok", message="ok", value=150.0, unit="ms"), ], ) obs.store_snapshot(cfg, snap) digest = obs.build_digest(cfg) assert "Daily Digest" in digest assert "Gateway" in digest or "gateway" in digest def test_send_digest_no_token(self, cfg): cfg.telegram_token = None obs._init_db(cfg.db_path) result = obs.send_digest(cfg) assert result is False # --------------------------------------------------------------------------- # SLO tests # --------------------------------------------------------------------------- class TestSLO: def test_slo_definitions_complete(self): assert "gateway_uptime_pct" in obs.SLO_DEFINITIONS assert "webhook_latency_ms" in obs.SLO_DEFINITIONS assert "api_server_latency_ms" in obs.SLO_DEFINITIONS def test_slo_targets(self): assert obs.SLO_DEFINITIONS["gateway_uptime_pct"]["target"] == 99.5 assert obs.SLO_DEFINITIONS["webhook_latency_ms"]["target"] == 2000 # --------------------------------------------------------------------------- # CLI tests # --------------------------------------------------------------------------- class TestCLI: def test_check_exits_0_on_ok(self, cfg, monkeypatch, tmp_path): monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db")) ok_snap = obs.HealthSnapshot( ts="2026-01-01T00:00:00+00:00", checks=[obs.CheckResult(name="all_good", status="ok", message="fine")], ) with patch("observatory.collect_snapshot", return_value=ok_snap), \ patch("observatory.store_snapshot"): rc = obs.main(["--check"]) assert rc == 0 def test_check_exits_nonzero_on_critical(self, cfg, monkeypatch, tmp_path): monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db")) bad_snap = obs.HealthSnapshot( ts="2026-01-01T00:00:00+00:00", checks=[obs.CheckResult(name="gateway_process", status="critical", message="down")], ) with patch("observatory.collect_snapshot", return_value=bad_snap), \ patch("observatory.store_snapshot"): rc = obs.main(["--check"]) assert rc != 0 def test_digest_flag(self, monkeypatch, tmp_path): monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db")) rc = obs.main(["--digest"]) assert rc == 0 def test_slo_flag(self, monkeypatch, tmp_path): monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db")) rc = obs.main(["--slo"]) assert rc == 0 def test_history_flag(self, monkeypatch, tmp_path): monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db")) rc = obs.main(["--history", "5"]) assert rc == 0