Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7acfd3e870 |
140
gateway/builtin_hooks/crisis.py
Normal file
140
gateway/builtin_hooks/crisis.py
Normal file
@@ -0,0 +1,140 @@
|
||||
"""Built-in crisis hook — log and notify humans when a crisis is detected.
|
||||
|
||||
Listens for ``agent:crisis`` events and:
|
||||
|
||||
1. Appends a structured entry to ``~/.hermes/logs/crisis.log``
|
||||
2. Sends a Telegram notification to the configured home channel (or the
|
||||
channel set via ``TELEGRAM_CRISIS_CHAT_ID``).
|
||||
|
||||
Event context keys consumed:
|
||||
|
||||
session_id -- The active session identifier.
|
||||
crisis_level -- Severity label (e.g. "high", "critical", "unknown").
|
||||
platform -- The messaging platform that triggered the session.
|
||||
user_id -- User identifier on that platform.
|
||||
message -- Snippet of the incoming message (may be absent).
|
||||
summary -- Short description of the crisis (may be absent).
|
||||
|
||||
Both notification steps are best-effort: failures are logged but never
|
||||
raise, so they cannot block the main gateway pipeline.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger("hooks.crisis")
|
||||
|
||||
from hermes_constants import get_hermes_home
|
||||
|
||||
HERMES_HOME = get_hermes_home()
|
||||
CRISIS_LOG_FILE = HERMES_HOME / "logs" / "crisis.log"
|
||||
|
||||
|
||||
def _log_crisis_to_file(context: dict) -> None:
|
||||
"""Append a JSON-lines entry to the crisis log file."""
|
||||
try:
|
||||
CRISIS_LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||
entry = {
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"session_id": context.get("session_id", "unknown"),
|
||||
"crisis_level": context.get("crisis_level", "unknown"),
|
||||
"platform": context.get("platform", ""),
|
||||
"user_id": context.get("user_id", ""),
|
||||
"message": context.get("message", ""),
|
||||
"summary": context.get("summary", ""),
|
||||
}
|
||||
with CRISIS_LOG_FILE.open("a", encoding="utf-8") as f:
|
||||
f.write(json.dumps(entry) + "\n")
|
||||
logger.info(
|
||||
"Crisis logged: session=%s level=%s",
|
||||
entry["session_id"],
|
||||
entry["crisis_level"],
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Failed to write crisis log: %s", e)
|
||||
|
||||
|
||||
def _build_telegram_message(context: dict) -> str:
|
||||
"""Format a Telegram alert message from the crisis context."""
|
||||
session_id = context.get("session_id", "unknown")
|
||||
crisis_level = context.get("crisis_level", "unknown").upper()
|
||||
platform = context.get("platform", "unknown")
|
||||
user_id = context.get("user_id", "")
|
||||
summary = context.get("summary", "")
|
||||
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
|
||||
|
||||
lines = [
|
||||
f"🚨 *CRISIS ALERT* — Level: {crisis_level}",
|
||||
f"Time: {timestamp}",
|
||||
f"Session: `{session_id}`",
|
||||
f"Platform: {platform}",
|
||||
]
|
||||
if user_id:
|
||||
lines.append(f"User: {user_id}")
|
||||
if summary:
|
||||
lines.append(f"Summary: {summary}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
async def _notify_telegram(context: dict) -> None:
|
||||
"""Send a Telegram alert for the crisis event."""
|
||||
try:
|
||||
token = os.getenv("TELEGRAM_BOT_TOKEN", "")
|
||||
if not token:
|
||||
logger.warning("TELEGRAM_BOT_TOKEN not set — skipping crisis notification")
|
||||
return
|
||||
|
||||
# Prefer explicit crisis chat ID, fall back to gateway home channel.
|
||||
chat_id = os.getenv("TELEGRAM_CRISIS_CHAT_ID", "")
|
||||
if not chat_id:
|
||||
try:
|
||||
from gateway.config import load_gateway_config, Platform
|
||||
config = load_gateway_config()
|
||||
home = config.get_home_channel(Platform.TELEGRAM)
|
||||
if home:
|
||||
chat_id = home.chat_id
|
||||
except Exception as e:
|
||||
logger.debug("Could not load gateway config for crisis hook: %s", e)
|
||||
|
||||
if not chat_id:
|
||||
logger.warning(
|
||||
"No Telegram target configured — set TELEGRAM_CRISIS_CHAT_ID or "
|
||||
"configure a Telegram home channel. Skipping crisis notification."
|
||||
)
|
||||
return
|
||||
|
||||
from telegram import Bot
|
||||
from telegram.constants import ParseMode
|
||||
|
||||
message = _build_telegram_message(context)
|
||||
bot = Bot(token=token)
|
||||
await bot.send_message(
|
||||
chat_id=int(chat_id),
|
||||
text=message,
|
||||
parse_mode=ParseMode.MARKDOWN,
|
||||
)
|
||||
logger.info(
|
||||
"Crisis Telegram notification sent to chat_id=%s (session=%s level=%s)",
|
||||
chat_id,
|
||||
context.get("session_id", "unknown"),
|
||||
context.get("crisis_level", "unknown"),
|
||||
)
|
||||
except ImportError:
|
||||
logger.warning(
|
||||
"python-telegram-bot not installed — skipping Telegram crisis notification"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Failed to send Telegram crisis notification: %s", e)
|
||||
|
||||
|
||||
async def handle(event_type: str, context: dict) -> None:
|
||||
"""Handle agent:crisis events — log to file and notify via Telegram."""
|
||||
_log_crisis_to_file(context)
|
||||
try:
|
||||
await _notify_telegram(context)
|
||||
except Exception as e:
|
||||
logger.error("Crisis notification failed unexpectedly: %s", e)
|
||||
@@ -14,6 +14,7 @@ Events:
|
||||
- agent:start -- Agent begins processing a message
|
||||
- agent:step -- Each turn in the tool-calling loop
|
||||
- agent:end -- Agent finishes processing
|
||||
- agent:crisis -- Crisis detected during a session (builtin: logs + Telegram alert)
|
||||
- command:* -- Any slash command executed (wildcard match)
|
||||
|
||||
Errors in hooks are caught and logged but never block the main pipeline.
|
||||
@@ -66,6 +67,19 @@ class HookRegistry:
|
||||
except Exception as e:
|
||||
print(f"[hooks] Could not load built-in boot-md hook: {e}", flush=True)
|
||||
|
||||
try:
|
||||
from gateway.builtin_hooks.crisis import handle as crisis_handle
|
||||
|
||||
self._handlers.setdefault("agent:crisis", []).append(crisis_handle)
|
||||
self._loaded_hooks.append({
|
||||
"name": "crisis",
|
||||
"description": "Log crisis events to file and send Telegram notification",
|
||||
"events": ["agent:crisis"],
|
||||
"path": "(builtin)",
|
||||
})
|
||||
except Exception as e:
|
||||
print(f"[hooks] Could not load built-in crisis hook: {e}", flush=True)
|
||||
|
||||
def discover_and_load(self) -> None:
|
||||
"""
|
||||
Scan the hooks directory for hook directories and load their handlers.
|
||||
|
||||
249
tests/gateway/test_crisis_hook.py
Normal file
249
tests/gateway/test_crisis_hook.py
Normal file
@@ -0,0 +1,249 @@
|
||||
"""Tests for gateway/builtin_hooks/crisis.py — crisis notification hook."""
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_context(**kwargs):
|
||||
base = {
|
||||
"session_id": "sess-abc123",
|
||||
"crisis_level": "high",
|
||||
"platform": "telegram",
|
||||
"user_id": "user-42",
|
||||
"message": "I need help urgently",
|
||||
"summary": "User expressed distress",
|
||||
}
|
||||
base.update(kwargs)
|
||||
return base
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _log_crisis_to_file
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestLogCrisisToFile:
|
||||
def test_creates_log_file_and_entry(self, tmp_path):
|
||||
log_file = tmp_path / "logs" / "crisis.log"
|
||||
|
||||
with patch("gateway.builtin_hooks.crisis.CRISIS_LOG_FILE", log_file):
|
||||
from gateway.builtin_hooks.crisis import _log_crisis_to_file
|
||||
_log_crisis_to_file(_make_context())
|
||||
|
||||
assert log_file.exists()
|
||||
entry = json.loads(log_file.read_text().strip())
|
||||
assert entry["session_id"] == "sess-abc123"
|
||||
assert entry["crisis_level"] == "high"
|
||||
assert entry["platform"] == "telegram"
|
||||
assert entry["user_id"] == "user-42"
|
||||
assert "timestamp" in entry
|
||||
|
||||
def test_appends_multiple_entries(self, tmp_path):
|
||||
log_file = tmp_path / "logs" / "crisis.log"
|
||||
|
||||
with patch("gateway.builtin_hooks.crisis.CRISIS_LOG_FILE", log_file):
|
||||
from gateway.builtin_hooks.crisis import _log_crisis_to_file
|
||||
_log_crisis_to_file(_make_context(session_id="sess-1"))
|
||||
_log_crisis_to_file(_make_context(session_id="sess-2"))
|
||||
|
||||
lines = log_file.read_text().strip().splitlines()
|
||||
assert len(lines) == 2
|
||||
assert json.loads(lines[0])["session_id"] == "sess-1"
|
||||
assert json.loads(lines[1])["session_id"] == "sess-2"
|
||||
|
||||
def test_missing_optional_fields_use_defaults(self, tmp_path):
|
||||
log_file = tmp_path / "logs" / "crisis.log"
|
||||
|
||||
with patch("gateway.builtin_hooks.crisis.CRISIS_LOG_FILE", log_file):
|
||||
from gateway.builtin_hooks.crisis import _log_crisis_to_file
|
||||
_log_crisis_to_file({}) # empty context
|
||||
|
||||
entry = json.loads(log_file.read_text().strip())
|
||||
assert entry["session_id"] == "unknown"
|
||||
assert entry["crisis_level"] == "unknown"
|
||||
|
||||
def test_tolerates_write_error(self, tmp_path):
|
||||
log_file = tmp_path / "logs" / "crisis.log"
|
||||
|
||||
with patch("gateway.builtin_hooks.crisis.CRISIS_LOG_FILE", log_file):
|
||||
with patch("builtins.open", side_effect=OSError("disk full")):
|
||||
from gateway.builtin_hooks.crisis import _log_crisis_to_file
|
||||
# Should not raise
|
||||
_log_crisis_to_file(_make_context())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _build_telegram_message
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestBuildTelegramMessage:
|
||||
def test_includes_session_id_and_level(self):
|
||||
from gateway.builtin_hooks.crisis import _build_telegram_message
|
||||
msg = _build_telegram_message(_make_context(
|
||||
session_id="sess-xyz",
|
||||
crisis_level="critical",
|
||||
))
|
||||
assert "sess-xyz" in msg
|
||||
assert "CRITICAL" in msg
|
||||
|
||||
def test_includes_summary_when_present(self):
|
||||
from gateway.builtin_hooks.crisis import _build_telegram_message
|
||||
msg = _build_telegram_message(_make_context(summary="User in danger"))
|
||||
assert "User in danger" in msg
|
||||
|
||||
def test_omits_summary_when_absent(self):
|
||||
from gateway.builtin_hooks.crisis import _build_telegram_message
|
||||
ctx = _make_context()
|
||||
ctx.pop("summary")
|
||||
msg = _build_telegram_message(ctx)
|
||||
assert "Summary" not in msg
|
||||
|
||||
def test_includes_platform(self):
|
||||
from gateway.builtin_hooks.crisis import _build_telegram_message
|
||||
msg = _build_telegram_message(_make_context(platform="discord"))
|
||||
assert "discord" in msg
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _notify_telegram
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestNotifyTelegram:
|
||||
@pytest.mark.asyncio
|
||||
async def test_skips_when_no_token(self):
|
||||
with patch.dict("os.environ", {}, clear=True):
|
||||
with patch("os.getenv", return_value=""):
|
||||
from gateway.builtin_hooks.crisis import _notify_telegram
|
||||
# Should complete without error
|
||||
await _notify_telegram(_make_context())
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sends_message_with_explicit_chat_id(self):
|
||||
mock_bot = AsyncMock()
|
||||
mock_bot_cls = MagicMock(return_value=mock_bot)
|
||||
|
||||
env = {
|
||||
"TELEGRAM_BOT_TOKEN": "test-token",
|
||||
"TELEGRAM_CRISIS_CHAT_ID": "9999",
|
||||
}
|
||||
with patch.dict("os.environ", env):
|
||||
with patch("gateway.builtin_hooks.crisis.os.getenv", side_effect=lambda k, d="": env.get(k, d)):
|
||||
with patch("gateway.builtin_hooks.crisis._build_telegram_message", return_value="alert!"):
|
||||
try:
|
||||
from telegram import Bot
|
||||
from telegram.constants import ParseMode
|
||||
with patch("gateway.builtin_hooks.crisis.Bot", mock_bot_cls):
|
||||
from gateway.builtin_hooks.crisis import _notify_telegram
|
||||
await _notify_telegram(_make_context())
|
||||
mock_bot.send_message.assert_awaited_once()
|
||||
call_kwargs = mock_bot.send_message.call_args
|
||||
assert call_kwargs.kwargs["chat_id"] == 9999
|
||||
except ImportError:
|
||||
pytest.skip("python-telegram-bot not installed")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_skips_when_no_chat_id(self):
|
||||
env = {"TELEGRAM_BOT_TOKEN": "test-token"}
|
||||
with patch.dict("os.environ", env):
|
||||
with patch("gateway.builtin_hooks.crisis.os.getenv", side_effect=lambda k, d="": env.get(k, d)):
|
||||
with patch("gateway.builtin_hooks.crisis.load_gateway_config", side_effect=Exception("no config"), create=True):
|
||||
from gateway.builtin_hooks.crisis import _notify_telegram
|
||||
# Should complete without error
|
||||
await _notify_telegram(_make_context())
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handles_import_error_gracefully(self):
|
||||
env = {
|
||||
"TELEGRAM_BOT_TOKEN": "test-token",
|
||||
"TELEGRAM_CRISIS_CHAT_ID": "9999",
|
||||
}
|
||||
import sys
|
||||
with patch.dict("os.environ", env):
|
||||
with patch.dict(sys.modules, {"telegram": None}):
|
||||
from gateway.builtin_hooks import crisis as crisis_mod
|
||||
import importlib
|
||||
importlib.reload(crisis_mod)
|
||||
# Should not raise even if telegram is unavailable at runtime
|
||||
await crisis_mod._notify_telegram(_make_context())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# handle (integration: log + notify together)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestHandle:
|
||||
@pytest.mark.asyncio
|
||||
async def test_handle_logs_and_notifies(self, tmp_path):
|
||||
log_file = tmp_path / "logs" / "crisis.log"
|
||||
|
||||
with patch("gateway.builtin_hooks.crisis.CRISIS_LOG_FILE", log_file):
|
||||
with patch("gateway.builtin_hooks.crisis._notify_telegram", new_callable=AsyncMock) as mock_notify:
|
||||
from gateway.builtin_hooks.crisis import handle
|
||||
await handle("agent:crisis", _make_context())
|
||||
|
||||
# Log written
|
||||
assert log_file.exists()
|
||||
entry = json.loads(log_file.read_text().strip())
|
||||
assert entry["session_id"] == "sess-abc123"
|
||||
|
||||
# Notification attempted
|
||||
mock_notify.assert_awaited_once()
|
||||
call_context = mock_notify.call_args[0][0]
|
||||
assert call_context["session_id"] == "sess-abc123"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handle_does_not_raise_on_notify_failure(self, tmp_path):
|
||||
log_file = tmp_path / "logs" / "crisis.log"
|
||||
|
||||
with patch("gateway.builtin_hooks.crisis.CRISIS_LOG_FILE", log_file):
|
||||
with patch(
|
||||
"gateway.builtin_hooks.crisis._notify_telegram",
|
||||
new_callable=AsyncMock,
|
||||
side_effect=Exception("network down"),
|
||||
):
|
||||
from gateway.builtin_hooks.crisis import handle
|
||||
# Should not propagate the exception
|
||||
await handle("agent:crisis", _make_context())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# HookRegistry: crisis hook is registered as a builtin
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCrisisHookRegistration:
|
||||
def test_crisis_hook_registered_in_registry(self):
|
||||
from gateway.hooks import HookRegistry
|
||||
|
||||
reg = HookRegistry()
|
||||
reg.discover_and_load()
|
||||
|
||||
names = [h["name"] for h in reg.loaded_hooks]
|
||||
assert "crisis" in names
|
||||
|
||||
def test_crisis_handler_registered_for_event(self):
|
||||
from gateway.hooks import HookRegistry
|
||||
|
||||
reg = HookRegistry()
|
||||
reg.discover_and_load()
|
||||
|
||||
assert "agent:crisis" in reg._handlers
|
||||
assert len(reg._handlers["agent:crisis"]) >= 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_emit_agent_crisis_calls_handler(self, tmp_path):
|
||||
log_file = tmp_path / "logs" / "crisis.log"
|
||||
|
||||
with patch("gateway.builtin_hooks.crisis.CRISIS_LOG_FILE", log_file):
|
||||
with patch("gateway.builtin_hooks.crisis._notify_telegram", new_callable=AsyncMock):
|
||||
from gateway.hooks import HookRegistry
|
||||
reg = HookRegistry()
|
||||
reg.discover_and_load()
|
||||
await reg.emit("agent:crisis", _make_context())
|
||||
|
||||
assert log_file.exists()
|
||||
Reference in New Issue
Block a user