456 lines
16 KiB
Python
456 lines
16 KiB
Python
"""
|
|
Tests for observatory.py — health monitoring & alerting.
|
|
|
|
Refs #147
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
PROJECT_ROOT = Path(__file__).parent.parent
|
|
if str(PROJECT_ROOT) not in sys.path:
|
|
sys.path.insert(0, str(PROJECT_ROOT))
|
|
|
|
import observatory as obs
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.fixture
|
|
def cfg(tmp_path):
|
|
"""Return an ObservatoryConfig pointing at a temp directory."""
|
|
cfg = obs.ObservatoryConfig()
|
|
cfg.db_path = tmp_path / "observatory.db"
|
|
cfg.alert_chat_id = "99999"
|
|
cfg.digest_chat_id = "99999"
|
|
cfg.telegram_token = "fake-token"
|
|
cfg.webhook_url = "http://127.0.0.1:19999/health" # port never bound
|
|
cfg.api_url = "http://127.0.0.1:19998/health"
|
|
return cfg
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestObservatoryConfig:
|
|
def test_defaults(self):
|
|
c = obs.ObservatoryConfig()
|
|
assert c.disk_warn_pct == 80.0
|
|
assert c.disk_crit_pct == 90.0
|
|
assert c.mem_warn_pct == 80.0
|
|
assert c.mem_crit_pct == 90.0
|
|
assert c.cpu_warn_pct == 80.0
|
|
assert c.cpu_crit_pct == 95.0
|
|
assert c.poll_interval == 60
|
|
assert c.webhook_latency_slo_ms == 2000.0
|
|
assert c.gateway_uptime_slo_pct == 99.5
|
|
|
|
def test_from_env_overrides(self, monkeypatch):
|
|
monkeypatch.setenv("OBSERVATORY_DISK_WARN_PCT", "70")
|
|
monkeypatch.setenv("OBSERVATORY_POLL_INTERVAL", "30")
|
|
monkeypatch.setenv("OBSERVATORY_ALERT_CHAT_ID", "12345")
|
|
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "tok123")
|
|
c = obs.ObservatoryConfig.from_env()
|
|
assert c.disk_warn_pct == 70.0
|
|
assert c.poll_interval == 30
|
|
assert c.alert_chat_id == "12345"
|
|
assert c.telegram_token == "tok123"
|
|
|
|
def test_digest_chat_falls_back_to_alert(self, monkeypatch):
|
|
monkeypatch.setenv("OBSERVATORY_ALERT_CHAT_ID", "abc")
|
|
monkeypatch.delenv("OBSERVATORY_DIGEST_CHAT_ID", raising=False)
|
|
c = obs.ObservatoryConfig.from_env()
|
|
assert c.digest_chat_id == "abc"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CheckResult / HealthSnapshot tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestHealthSnapshot:
|
|
def _make_snapshot(self, statuses):
|
|
checks = [obs.CheckResult(name=f"c{i}", status=s, message="") for i, s in enumerate(statuses)]
|
|
return obs.HealthSnapshot(ts="2026-01-01T00:00:00+00:00", checks=checks)
|
|
|
|
def test_overall_ok(self):
|
|
snap = self._make_snapshot(["ok", "ok"])
|
|
assert snap.overall_status == "ok"
|
|
|
|
def test_overall_warn(self):
|
|
snap = self._make_snapshot(["ok", "warn"])
|
|
assert snap.overall_status == "warn"
|
|
|
|
def test_overall_critical(self):
|
|
snap = self._make_snapshot(["ok", "warn", "critical"])
|
|
assert snap.overall_status == "critical"
|
|
|
|
def test_overall_error(self):
|
|
snap = self._make_snapshot(["ok", "error"])
|
|
assert snap.overall_status == "critical"
|
|
|
|
def test_to_dict(self):
|
|
snap = self._make_snapshot(["ok"])
|
|
d = snap.to_dict()
|
|
assert d["overall"] == "ok"
|
|
assert isinstance(d["checks"], list)
|
|
assert d["checks"][0]["name"] == "c0"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Individual check tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCheckGatewayLiveness:
|
|
def test_running(self):
|
|
with patch("gateway.status.is_gateway_running", return_value=True), \
|
|
patch("gateway.status.get_running_pid", return_value=12345):
|
|
result = obs.check_gateway_liveness()
|
|
assert result.status == "ok"
|
|
assert "12345" in result.message
|
|
|
|
def test_not_running(self):
|
|
with patch("gateway.status.is_gateway_running", return_value=False), \
|
|
patch("gateway.status.get_running_pid", return_value=None):
|
|
result = obs.check_gateway_liveness()
|
|
assert result.status == "critical"
|
|
|
|
def test_import_error(self):
|
|
import builtins
|
|
real_import = builtins.__import__
|
|
|
|
def mock_import(name, *args, **kwargs):
|
|
if name == "gateway.status":
|
|
raise ImportError("no module")
|
|
return real_import(name, *args, **kwargs)
|
|
|
|
with patch("builtins.__import__", side_effect=mock_import):
|
|
result = obs.check_gateway_liveness()
|
|
assert result.status in ("error", "critical", "ok") # graceful
|
|
|
|
|
|
class TestCheckDisk:
|
|
def test_ok(self, cfg):
|
|
mock_usage = MagicMock()
|
|
mock_usage.percent = 50.0
|
|
mock_usage.free = 10 * 1024 ** 3
|
|
mock_usage.total = 20 * 1024 ** 3
|
|
with patch("psutil.disk_usage", return_value=mock_usage):
|
|
result = obs.check_disk(cfg)
|
|
assert result.status == "ok"
|
|
assert result.value == 50.0
|
|
|
|
def test_warn(self, cfg):
|
|
mock_usage = MagicMock()
|
|
mock_usage.percent = 85.0
|
|
mock_usage.free = 3 * 1024 ** 3
|
|
mock_usage.total = 20 * 1024 ** 3
|
|
with patch("psutil.disk_usage", return_value=mock_usage):
|
|
result = obs.check_disk(cfg)
|
|
assert result.status == "warn"
|
|
|
|
def test_critical(self, cfg):
|
|
mock_usage = MagicMock()
|
|
mock_usage.percent = 92.0
|
|
mock_usage.free = 1 * 1024 ** 3
|
|
mock_usage.total = 20 * 1024 ** 3
|
|
with patch("psutil.disk_usage", return_value=mock_usage):
|
|
result = obs.check_disk(cfg)
|
|
assert result.status == "critical"
|
|
|
|
def test_no_psutil(self, cfg, monkeypatch):
|
|
monkeypatch.setattr(obs, "_PSUTIL", False)
|
|
result = obs.check_disk(cfg)
|
|
assert result.status == "error"
|
|
|
|
|
|
class TestCheckMemory:
|
|
def test_ok(self, cfg):
|
|
mock_mem = MagicMock()
|
|
mock_mem.percent = 60.0
|
|
mock_mem.available = 4 * 1024 ** 3
|
|
mock_mem.total = 16 * 1024 ** 3
|
|
with patch("psutil.virtual_memory", return_value=mock_mem):
|
|
result = obs.check_memory(cfg)
|
|
assert result.status == "ok"
|
|
|
|
def test_critical(self, cfg):
|
|
mock_mem = MagicMock()
|
|
mock_mem.percent = 95.0
|
|
mock_mem.available = 512 * 1024 ** 2
|
|
mock_mem.total = 16 * 1024 ** 3
|
|
with patch("psutil.virtual_memory", return_value=mock_mem):
|
|
result = obs.check_memory(cfg)
|
|
assert result.status == "critical"
|
|
|
|
|
|
class TestCheckCPU:
|
|
def test_ok(self, cfg):
|
|
with patch("psutil.cpu_percent", return_value=40.0):
|
|
result = obs.check_cpu(cfg)
|
|
assert result.status == "ok"
|
|
|
|
def test_warn(self, cfg):
|
|
with patch("psutil.cpu_percent", return_value=85.0):
|
|
result = obs.check_cpu(cfg)
|
|
assert result.status == "warn"
|
|
|
|
def test_critical(self, cfg):
|
|
with patch("psutil.cpu_percent", return_value=98.0):
|
|
result = obs.check_cpu(cfg)
|
|
assert result.status == "critical"
|
|
|
|
|
|
class TestCheckDatabase:
|
|
def test_ok(self, cfg):
|
|
obs._init_db(cfg.db_path)
|
|
result = obs.check_database(cfg)
|
|
assert result.status == "ok"
|
|
|
|
def test_not_yet_created(self, cfg):
|
|
# db_path does not exist
|
|
result = obs.check_database(cfg)
|
|
assert result.status == "warn"
|
|
|
|
|
|
class TestCheckHTTP:
|
|
def test_webhook_connection_refused(self, cfg):
|
|
result = obs.check_webhook_http(cfg)
|
|
# Port 19999 is not bound — should get a "not reachable" warn
|
|
assert result.status in ("warn", "error")
|
|
|
|
def test_api_server_connection_refused(self, cfg):
|
|
result = obs.check_api_server_http(cfg)
|
|
assert result.status in ("warn", "error")
|
|
|
|
def test_webhook_ok(self, cfg):
|
|
import urllib.error
|
|
from unittest.mock import patch, MagicMock
|
|
mock_resp = MagicMock()
|
|
mock_resp.__enter__ = lambda s: s
|
|
mock_resp.__exit__ = MagicMock(return_value=False)
|
|
mock_resp.status = 200
|
|
mock_resp.read.return_value = b'{"status":"ok"}'
|
|
with patch("urllib.request.urlopen", return_value=mock_resp):
|
|
result = obs.check_webhook_http(cfg)
|
|
assert result.status in ("ok", "warn")
|
|
|
|
def test_webhook_http_error(self, cfg):
|
|
mock_resp = MagicMock()
|
|
mock_resp.__enter__ = lambda s: s
|
|
mock_resp.__exit__ = MagicMock(return_value=False)
|
|
mock_resp.status = 503
|
|
with patch("urllib.request.urlopen", return_value=mock_resp):
|
|
result = obs.check_webhook_http(cfg)
|
|
assert result.status == "critical"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Persistence tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestPersistence:
|
|
def test_store_and_load(self, cfg):
|
|
obs._init_db(cfg.db_path)
|
|
from datetime import datetime, timezone
|
|
ts = datetime.now(timezone.utc).isoformat()
|
|
snap = obs.HealthSnapshot(
|
|
ts=ts,
|
|
checks=[obs.CheckResult(name="test", status="ok", message="fine")],
|
|
)
|
|
obs.store_snapshot(cfg, snap)
|
|
loaded = obs.load_snapshots(cfg, days=30)
|
|
assert len(loaded) == 1
|
|
assert loaded[0]["overall"] == "ok"
|
|
|
|
def test_retention_pruning(self, cfg):
|
|
obs._init_db(cfg.db_path)
|
|
# Insert an old record directly
|
|
with obs._db(cfg.db_path) as conn:
|
|
conn.execute(
|
|
"INSERT INTO health_snapshots (ts, overall, payload) VALUES (?, ?, ?)",
|
|
("2000-01-01T00:00:00+00:00", "ok", '{"ts":"2000-01-01T00:00:00+00:00","overall":"ok","checks":[]}'),
|
|
)
|
|
snap = obs.HealthSnapshot(
|
|
ts="2026-01-01T00:00:00+00:00",
|
|
checks=[],
|
|
)
|
|
obs.store_snapshot(cfg, snap)
|
|
# Old record should have been pruned
|
|
with obs._db(cfg.db_path) as conn:
|
|
count = conn.execute("SELECT count(*) FROM health_snapshots WHERE ts < '2001-01-01'").fetchone()[0]
|
|
assert count == 0
|
|
|
|
def test_record_alert_sent(self, cfg):
|
|
obs._init_db(cfg.db_path)
|
|
obs.record_alert_sent(cfg, "gateway_process", "critical", "not running")
|
|
with obs._db(cfg.db_path) as conn:
|
|
count = conn.execute("SELECT count(*) FROM alerts_sent").fetchone()[0]
|
|
assert count == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Alerting tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestAlerting:
|
|
def _snap(self, status):
|
|
return obs.HealthSnapshot(
|
|
ts="2026-01-01T00:00:00+00:00",
|
|
checks=[obs.CheckResult(name="gateway_process", status=status, message="test")],
|
|
)
|
|
|
|
def test_no_alert_when_ok(self, cfg):
|
|
snap = self._snap("ok")
|
|
prev = self._snap("ok")
|
|
obs._init_db(cfg.db_path)
|
|
with patch("observatory._telegram_send", return_value=True) as mock_send:
|
|
alerts = obs.maybe_alert(cfg, snap, prev)
|
|
mock_send.assert_not_called()
|
|
assert alerts == []
|
|
|
|
def test_alert_on_new_critical(self, cfg):
|
|
snap = self._snap("critical")
|
|
prev = self._snap("ok")
|
|
obs._init_db(cfg.db_path)
|
|
with patch("observatory._telegram_send", return_value=True) as mock_send:
|
|
alerts = obs.maybe_alert(cfg, snap, prev)
|
|
mock_send.assert_called_once()
|
|
assert len(alerts) == 1
|
|
|
|
def test_no_duplicate_alert(self, cfg):
|
|
snap = self._snap("critical")
|
|
prev = self._snap("critical") # already critical
|
|
obs._init_db(cfg.db_path)
|
|
with patch("observatory._telegram_send", return_value=True) as mock_send:
|
|
alerts = obs.maybe_alert(cfg, snap, prev)
|
|
mock_send.assert_not_called()
|
|
assert alerts == []
|
|
|
|
def test_recovery_alert(self, cfg):
|
|
snap = self._snap("ok")
|
|
prev = self._snap("critical")
|
|
obs._init_db(cfg.db_path)
|
|
with patch("observatory._telegram_send", return_value=True) as mock_send:
|
|
alerts = obs.maybe_alert(cfg, snap, prev)
|
|
mock_send.assert_called_once()
|
|
|
|
def test_no_alert_without_token(self, cfg):
|
|
cfg.telegram_token = None
|
|
snap = self._snap("critical")
|
|
obs._init_db(cfg.db_path)
|
|
alerts = obs.maybe_alert(cfg, snap, None)
|
|
assert alerts == []
|
|
|
|
def test_no_alert_without_chat_id(self, cfg):
|
|
cfg.alert_chat_id = None
|
|
snap = self._snap("critical")
|
|
obs._init_db(cfg.db_path)
|
|
alerts = obs.maybe_alert(cfg, snap, None)
|
|
assert alerts == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Digest tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDigest:
|
|
def test_empty_digest(self, cfg):
|
|
obs._init_db(cfg.db_path)
|
|
digest = obs.build_digest(cfg)
|
|
assert "no health data" in digest.lower() or "24 hours" in digest.lower()
|
|
|
|
def test_digest_with_data(self, cfg):
|
|
obs._init_db(cfg.db_path)
|
|
from datetime import datetime, timezone, timedelta
|
|
ts = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat()
|
|
snap = obs.HealthSnapshot(
|
|
ts=ts,
|
|
checks=[
|
|
obs.CheckResult(name="gateway_process", status="ok", message="running"),
|
|
obs.CheckResult(name="disk", status="ok", message="50% used", value=50.0, unit="%"),
|
|
obs.CheckResult(name="webhook_http", status="ok", message="ok", value=150.0, unit="ms"),
|
|
],
|
|
)
|
|
obs.store_snapshot(cfg, snap)
|
|
digest = obs.build_digest(cfg)
|
|
assert "Daily Digest" in digest
|
|
assert "Gateway" in digest or "gateway" in digest
|
|
|
|
def test_send_digest_no_token(self, cfg):
|
|
cfg.telegram_token = None
|
|
obs._init_db(cfg.db_path)
|
|
result = obs.send_digest(cfg)
|
|
assert result is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SLO tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestSLO:
|
|
def test_slo_definitions_complete(self):
|
|
assert "gateway_uptime_pct" in obs.SLO_DEFINITIONS
|
|
assert "webhook_latency_ms" in obs.SLO_DEFINITIONS
|
|
assert "api_server_latency_ms" in obs.SLO_DEFINITIONS
|
|
|
|
def test_slo_targets(self):
|
|
assert obs.SLO_DEFINITIONS["gateway_uptime_pct"]["target"] == 99.5
|
|
assert obs.SLO_DEFINITIONS["webhook_latency_ms"]["target"] == 2000
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCLI:
|
|
def test_check_exits_0_on_ok(self, cfg, monkeypatch, tmp_path):
|
|
monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
|
|
ok_snap = obs.HealthSnapshot(
|
|
ts="2026-01-01T00:00:00+00:00",
|
|
checks=[obs.CheckResult(name="all_good", status="ok", message="fine")],
|
|
)
|
|
with patch("observatory.collect_snapshot", return_value=ok_snap), \
|
|
patch("observatory.store_snapshot"):
|
|
rc = obs.main(["--check"])
|
|
assert rc == 0
|
|
|
|
def test_check_exits_nonzero_on_critical(self, cfg, monkeypatch, tmp_path):
|
|
monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
|
|
bad_snap = obs.HealthSnapshot(
|
|
ts="2026-01-01T00:00:00+00:00",
|
|
checks=[obs.CheckResult(name="gateway_process", status="critical", message="down")],
|
|
)
|
|
with patch("observatory.collect_snapshot", return_value=bad_snap), \
|
|
patch("observatory.store_snapshot"):
|
|
rc = obs.main(["--check"])
|
|
assert rc != 0
|
|
|
|
def test_digest_flag(self, monkeypatch, tmp_path):
|
|
monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
|
|
rc = obs.main(["--digest"])
|
|
assert rc == 0
|
|
|
|
def test_slo_flag(self, monkeypatch, tmp_path):
|
|
monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
|
|
rc = obs.main(["--slo"])
|
|
assert rc == 0
|
|
|
|
def test_history_flag(self, monkeypatch, tmp_path):
|
|
monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
|
|
rc = obs.main(["--history", "5"])
|
|
assert rc == 0
|