Compare commits
1 Commits
main
...
claude/iss
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7acfd3e870 |
140
gateway/builtin_hooks/crisis.py
Normal file
140
gateway/builtin_hooks/crisis.py
Normal file
@@ -0,0 +1,140 @@
|
|||||||
|
"""Built-in crisis hook — log and notify humans when a crisis is detected.
|
||||||
|
|
||||||
|
Listens for ``agent:crisis`` events and:
|
||||||
|
|
||||||
|
1. Appends a structured entry to ``~/.hermes/logs/crisis.log``
|
||||||
|
2. Sends a Telegram notification to the configured home channel (or the
|
||||||
|
channel set via ``TELEGRAM_CRISIS_CHAT_ID``).
|
||||||
|
|
||||||
|
Event context keys consumed:
|
||||||
|
|
||||||
|
session_id -- The active session identifier.
|
||||||
|
crisis_level -- Severity label (e.g. "high", "critical", "unknown").
|
||||||
|
platform -- The messaging platform that triggered the session.
|
||||||
|
user_id -- User identifier on that platform.
|
||||||
|
message -- Snippet of the incoming message (may be absent).
|
||||||
|
summary -- Short description of the crisis (may be absent).
|
||||||
|
|
||||||
|
Both notification steps are best-effort: failures are logged but never
|
||||||
|
raise, so they cannot block the main gateway pipeline.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
logger = logging.getLogger("hooks.crisis")
|
||||||
|
|
||||||
|
from hermes_constants import get_hermes_home
|
||||||
|
|
||||||
|
HERMES_HOME = get_hermes_home()
|
||||||
|
CRISIS_LOG_FILE = HERMES_HOME / "logs" / "crisis.log"
|
||||||
|
|
||||||
|
|
||||||
|
def _log_crisis_to_file(context: dict) -> None:
|
||||||
|
"""Append a JSON-lines entry to the crisis log file."""
|
||||||
|
try:
|
||||||
|
CRISIS_LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
entry = {
|
||||||
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||||
|
"session_id": context.get("session_id", "unknown"),
|
||||||
|
"crisis_level": context.get("crisis_level", "unknown"),
|
||||||
|
"platform": context.get("platform", ""),
|
||||||
|
"user_id": context.get("user_id", ""),
|
||||||
|
"message": context.get("message", ""),
|
||||||
|
"summary": context.get("summary", ""),
|
||||||
|
}
|
||||||
|
with CRISIS_LOG_FILE.open("a", encoding="utf-8") as f:
|
||||||
|
f.write(json.dumps(entry) + "\n")
|
||||||
|
logger.info(
|
||||||
|
"Crisis logged: session=%s level=%s",
|
||||||
|
entry["session_id"],
|
||||||
|
entry["crisis_level"],
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to write crisis log: %s", e)
|
||||||
|
|
||||||
|
|
||||||
|
def _build_telegram_message(context: dict) -> str:
|
||||||
|
"""Format a Telegram alert message from the crisis context."""
|
||||||
|
session_id = context.get("session_id", "unknown")
|
||||||
|
crisis_level = context.get("crisis_level", "unknown").upper()
|
||||||
|
platform = context.get("platform", "unknown")
|
||||||
|
user_id = context.get("user_id", "")
|
||||||
|
summary = context.get("summary", "")
|
||||||
|
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
|
||||||
|
|
||||||
|
lines = [
|
||||||
|
f"🚨 *CRISIS ALERT* — Level: {crisis_level}",
|
||||||
|
f"Time: {timestamp}",
|
||||||
|
f"Session: `{session_id}`",
|
||||||
|
f"Platform: {platform}",
|
||||||
|
]
|
||||||
|
if user_id:
|
||||||
|
lines.append(f"User: {user_id}")
|
||||||
|
if summary:
|
||||||
|
lines.append(f"Summary: {summary}")
|
||||||
|
|
||||||
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
|
||||||
|
async def _notify_telegram(context: dict) -> None:
|
||||||
|
"""Send a Telegram alert for the crisis event."""
|
||||||
|
try:
|
||||||
|
token = os.getenv("TELEGRAM_BOT_TOKEN", "")
|
||||||
|
if not token:
|
||||||
|
logger.warning("TELEGRAM_BOT_TOKEN not set — skipping crisis notification")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Prefer explicit crisis chat ID, fall back to gateway home channel.
|
||||||
|
chat_id = os.getenv("TELEGRAM_CRISIS_CHAT_ID", "")
|
||||||
|
if not chat_id:
|
||||||
|
try:
|
||||||
|
from gateway.config import load_gateway_config, Platform
|
||||||
|
config = load_gateway_config()
|
||||||
|
home = config.get_home_channel(Platform.TELEGRAM)
|
||||||
|
if home:
|
||||||
|
chat_id = home.chat_id
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug("Could not load gateway config for crisis hook: %s", e)
|
||||||
|
|
||||||
|
if not chat_id:
|
||||||
|
logger.warning(
|
||||||
|
"No Telegram target configured — set TELEGRAM_CRISIS_CHAT_ID or "
|
||||||
|
"configure a Telegram home channel. Skipping crisis notification."
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
from telegram import Bot
|
||||||
|
from telegram.constants import ParseMode
|
||||||
|
|
||||||
|
message = _build_telegram_message(context)
|
||||||
|
bot = Bot(token=token)
|
||||||
|
await bot.send_message(
|
||||||
|
chat_id=int(chat_id),
|
||||||
|
text=message,
|
||||||
|
parse_mode=ParseMode.MARKDOWN,
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
"Crisis Telegram notification sent to chat_id=%s (session=%s level=%s)",
|
||||||
|
chat_id,
|
||||||
|
context.get("session_id", "unknown"),
|
||||||
|
context.get("crisis_level", "unknown"),
|
||||||
|
)
|
||||||
|
except ImportError:
|
||||||
|
logger.warning(
|
||||||
|
"python-telegram-bot not installed — skipping Telegram crisis notification"
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to send Telegram crisis notification: %s", e)
|
||||||
|
|
||||||
|
|
||||||
|
async def handle(event_type: str, context: dict) -> None:
|
||||||
|
"""Handle agent:crisis events — log to file and notify via Telegram."""
|
||||||
|
_log_crisis_to_file(context)
|
||||||
|
try:
|
||||||
|
await _notify_telegram(context)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Crisis notification failed unexpectedly: %s", e)
|
||||||
@@ -14,6 +14,7 @@ Events:
|
|||||||
- agent:start -- Agent begins processing a message
|
- agent:start -- Agent begins processing a message
|
||||||
- agent:step -- Each turn in the tool-calling loop
|
- agent:step -- Each turn in the tool-calling loop
|
||||||
- agent:end -- Agent finishes processing
|
- agent:end -- Agent finishes processing
|
||||||
|
- agent:crisis -- Crisis detected during a session (builtin: logs + Telegram alert)
|
||||||
- command:* -- Any slash command executed (wildcard match)
|
- command:* -- Any slash command executed (wildcard match)
|
||||||
|
|
||||||
Errors in hooks are caught and logged but never block the main pipeline.
|
Errors in hooks are caught and logged but never block the main pipeline.
|
||||||
@@ -66,6 +67,19 @@ class HookRegistry:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[hooks] Could not load built-in boot-md hook: {e}", flush=True)
|
print(f"[hooks] Could not load built-in boot-md hook: {e}", flush=True)
|
||||||
|
|
||||||
|
try:
|
||||||
|
from gateway.builtin_hooks.crisis import handle as crisis_handle
|
||||||
|
|
||||||
|
self._handlers.setdefault("agent:crisis", []).append(crisis_handle)
|
||||||
|
self._loaded_hooks.append({
|
||||||
|
"name": "crisis",
|
||||||
|
"description": "Log crisis events to file and send Telegram notification",
|
||||||
|
"events": ["agent:crisis"],
|
||||||
|
"path": "(builtin)",
|
||||||
|
})
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[hooks] Could not load built-in crisis hook: {e}", flush=True)
|
||||||
|
|
||||||
def discover_and_load(self) -> None:
|
def discover_and_load(self) -> None:
|
||||||
"""
|
"""
|
||||||
Scan the hooks directory for hook directories and load their handlers.
|
Scan the hooks directory for hook directories and load their handlers.
|
||||||
|
|||||||
249
tests/gateway/test_crisis_hook.py
Normal file
249
tests/gateway/test_crisis_hook.py
Normal file
@@ -0,0 +1,249 @@
|
|||||||
|
"""Tests for gateway/builtin_hooks/crisis.py — crisis notification hook."""
|
||||||
|
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import AsyncMock, MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _make_context(**kwargs):
|
||||||
|
base = {
|
||||||
|
"session_id": "sess-abc123",
|
||||||
|
"crisis_level": "high",
|
||||||
|
"platform": "telegram",
|
||||||
|
"user_id": "user-42",
|
||||||
|
"message": "I need help urgently",
|
||||||
|
"summary": "User expressed distress",
|
||||||
|
}
|
||||||
|
base.update(kwargs)
|
||||||
|
return base
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# _log_crisis_to_file
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestLogCrisisToFile:
|
||||||
|
def test_creates_log_file_and_entry(self, tmp_path):
|
||||||
|
log_file = tmp_path / "logs" / "crisis.log"
|
||||||
|
|
||||||
|
with patch("gateway.builtin_hooks.crisis.CRISIS_LOG_FILE", log_file):
|
||||||
|
from gateway.builtin_hooks.crisis import _log_crisis_to_file
|
||||||
|
_log_crisis_to_file(_make_context())
|
||||||
|
|
||||||
|
assert log_file.exists()
|
||||||
|
entry = json.loads(log_file.read_text().strip())
|
||||||
|
assert entry["session_id"] == "sess-abc123"
|
||||||
|
assert entry["crisis_level"] == "high"
|
||||||
|
assert entry["platform"] == "telegram"
|
||||||
|
assert entry["user_id"] == "user-42"
|
||||||
|
assert "timestamp" in entry
|
||||||
|
|
||||||
|
def test_appends_multiple_entries(self, tmp_path):
|
||||||
|
log_file = tmp_path / "logs" / "crisis.log"
|
||||||
|
|
||||||
|
with patch("gateway.builtin_hooks.crisis.CRISIS_LOG_FILE", log_file):
|
||||||
|
from gateway.builtin_hooks.crisis import _log_crisis_to_file
|
||||||
|
_log_crisis_to_file(_make_context(session_id="sess-1"))
|
||||||
|
_log_crisis_to_file(_make_context(session_id="sess-2"))
|
||||||
|
|
||||||
|
lines = log_file.read_text().strip().splitlines()
|
||||||
|
assert len(lines) == 2
|
||||||
|
assert json.loads(lines[0])["session_id"] == "sess-1"
|
||||||
|
assert json.loads(lines[1])["session_id"] == "sess-2"
|
||||||
|
|
||||||
|
def test_missing_optional_fields_use_defaults(self, tmp_path):
|
||||||
|
log_file = tmp_path / "logs" / "crisis.log"
|
||||||
|
|
||||||
|
with patch("gateway.builtin_hooks.crisis.CRISIS_LOG_FILE", log_file):
|
||||||
|
from gateway.builtin_hooks.crisis import _log_crisis_to_file
|
||||||
|
_log_crisis_to_file({}) # empty context
|
||||||
|
|
||||||
|
entry = json.loads(log_file.read_text().strip())
|
||||||
|
assert entry["session_id"] == "unknown"
|
||||||
|
assert entry["crisis_level"] == "unknown"
|
||||||
|
|
||||||
|
def test_tolerates_write_error(self, tmp_path):
|
||||||
|
log_file = tmp_path / "logs" / "crisis.log"
|
||||||
|
|
||||||
|
with patch("gateway.builtin_hooks.crisis.CRISIS_LOG_FILE", log_file):
|
||||||
|
with patch("builtins.open", side_effect=OSError("disk full")):
|
||||||
|
from gateway.builtin_hooks.crisis import _log_crisis_to_file
|
||||||
|
# Should not raise
|
||||||
|
_log_crisis_to_file(_make_context())
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# _build_telegram_message
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestBuildTelegramMessage:
|
||||||
|
def test_includes_session_id_and_level(self):
|
||||||
|
from gateway.builtin_hooks.crisis import _build_telegram_message
|
||||||
|
msg = _build_telegram_message(_make_context(
|
||||||
|
session_id="sess-xyz",
|
||||||
|
crisis_level="critical",
|
||||||
|
))
|
||||||
|
assert "sess-xyz" in msg
|
||||||
|
assert "CRITICAL" in msg
|
||||||
|
|
||||||
|
def test_includes_summary_when_present(self):
|
||||||
|
from gateway.builtin_hooks.crisis import _build_telegram_message
|
||||||
|
msg = _build_telegram_message(_make_context(summary="User in danger"))
|
||||||
|
assert "User in danger" in msg
|
||||||
|
|
||||||
|
def test_omits_summary_when_absent(self):
|
||||||
|
from gateway.builtin_hooks.crisis import _build_telegram_message
|
||||||
|
ctx = _make_context()
|
||||||
|
ctx.pop("summary")
|
||||||
|
msg = _build_telegram_message(ctx)
|
||||||
|
assert "Summary" not in msg
|
||||||
|
|
||||||
|
def test_includes_platform(self):
|
||||||
|
from gateway.builtin_hooks.crisis import _build_telegram_message
|
||||||
|
msg = _build_telegram_message(_make_context(platform="discord"))
|
||||||
|
assert "discord" in msg
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# _notify_telegram
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestNotifyTelegram:
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_skips_when_no_token(self):
|
||||||
|
with patch.dict("os.environ", {}, clear=True):
|
||||||
|
with patch("os.getenv", return_value=""):
|
||||||
|
from gateway.builtin_hooks.crisis import _notify_telegram
|
||||||
|
# Should complete without error
|
||||||
|
await _notify_telegram(_make_context())
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_sends_message_with_explicit_chat_id(self):
|
||||||
|
mock_bot = AsyncMock()
|
||||||
|
mock_bot_cls = MagicMock(return_value=mock_bot)
|
||||||
|
|
||||||
|
env = {
|
||||||
|
"TELEGRAM_BOT_TOKEN": "test-token",
|
||||||
|
"TELEGRAM_CRISIS_CHAT_ID": "9999",
|
||||||
|
}
|
||||||
|
with patch.dict("os.environ", env):
|
||||||
|
with patch("gateway.builtin_hooks.crisis.os.getenv", side_effect=lambda k, d="": env.get(k, d)):
|
||||||
|
with patch("gateway.builtin_hooks.crisis._build_telegram_message", return_value="alert!"):
|
||||||
|
try:
|
||||||
|
from telegram import Bot
|
||||||
|
from telegram.constants import ParseMode
|
||||||
|
with patch("gateway.builtin_hooks.crisis.Bot", mock_bot_cls):
|
||||||
|
from gateway.builtin_hooks.crisis import _notify_telegram
|
||||||
|
await _notify_telegram(_make_context())
|
||||||
|
mock_bot.send_message.assert_awaited_once()
|
||||||
|
call_kwargs = mock_bot.send_message.call_args
|
||||||
|
assert call_kwargs.kwargs["chat_id"] == 9999
|
||||||
|
except ImportError:
|
||||||
|
pytest.skip("python-telegram-bot not installed")
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_skips_when_no_chat_id(self):
|
||||||
|
env = {"TELEGRAM_BOT_TOKEN": "test-token"}
|
||||||
|
with patch.dict("os.environ", env):
|
||||||
|
with patch("gateway.builtin_hooks.crisis.os.getenv", side_effect=lambda k, d="": env.get(k, d)):
|
||||||
|
with patch("gateway.builtin_hooks.crisis.load_gateway_config", side_effect=Exception("no config"), create=True):
|
||||||
|
from gateway.builtin_hooks.crisis import _notify_telegram
|
||||||
|
# Should complete without error
|
||||||
|
await _notify_telegram(_make_context())
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_handles_import_error_gracefully(self):
|
||||||
|
env = {
|
||||||
|
"TELEGRAM_BOT_TOKEN": "test-token",
|
||||||
|
"TELEGRAM_CRISIS_CHAT_ID": "9999",
|
||||||
|
}
|
||||||
|
import sys
|
||||||
|
with patch.dict("os.environ", env):
|
||||||
|
with patch.dict(sys.modules, {"telegram": None}):
|
||||||
|
from gateway.builtin_hooks import crisis as crisis_mod
|
||||||
|
import importlib
|
||||||
|
importlib.reload(crisis_mod)
|
||||||
|
# Should not raise even if telegram is unavailable at runtime
|
||||||
|
await crisis_mod._notify_telegram(_make_context())
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# handle (integration: log + notify together)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestHandle:
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_handle_logs_and_notifies(self, tmp_path):
|
||||||
|
log_file = tmp_path / "logs" / "crisis.log"
|
||||||
|
|
||||||
|
with patch("gateway.builtin_hooks.crisis.CRISIS_LOG_FILE", log_file):
|
||||||
|
with patch("gateway.builtin_hooks.crisis._notify_telegram", new_callable=AsyncMock) as mock_notify:
|
||||||
|
from gateway.builtin_hooks.crisis import handle
|
||||||
|
await handle("agent:crisis", _make_context())
|
||||||
|
|
||||||
|
# Log written
|
||||||
|
assert log_file.exists()
|
||||||
|
entry = json.loads(log_file.read_text().strip())
|
||||||
|
assert entry["session_id"] == "sess-abc123"
|
||||||
|
|
||||||
|
# Notification attempted
|
||||||
|
mock_notify.assert_awaited_once()
|
||||||
|
call_context = mock_notify.call_args[0][0]
|
||||||
|
assert call_context["session_id"] == "sess-abc123"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_handle_does_not_raise_on_notify_failure(self, tmp_path):
|
||||||
|
log_file = tmp_path / "logs" / "crisis.log"
|
||||||
|
|
||||||
|
with patch("gateway.builtin_hooks.crisis.CRISIS_LOG_FILE", log_file):
|
||||||
|
with patch(
|
||||||
|
"gateway.builtin_hooks.crisis._notify_telegram",
|
||||||
|
new_callable=AsyncMock,
|
||||||
|
side_effect=Exception("network down"),
|
||||||
|
):
|
||||||
|
from gateway.builtin_hooks.crisis import handle
|
||||||
|
# Should not propagate the exception
|
||||||
|
await handle("agent:crisis", _make_context())
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# HookRegistry: crisis hook is registered as a builtin
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestCrisisHookRegistration:
|
||||||
|
def test_crisis_hook_registered_in_registry(self):
|
||||||
|
from gateway.hooks import HookRegistry
|
||||||
|
|
||||||
|
reg = HookRegistry()
|
||||||
|
reg.discover_and_load()
|
||||||
|
|
||||||
|
names = [h["name"] for h in reg.loaded_hooks]
|
||||||
|
assert "crisis" in names
|
||||||
|
|
||||||
|
def test_crisis_handler_registered_for_event(self):
|
||||||
|
from gateway.hooks import HookRegistry
|
||||||
|
|
||||||
|
reg = HookRegistry()
|
||||||
|
reg.discover_and_load()
|
||||||
|
|
||||||
|
assert "agent:crisis" in reg._handlers
|
||||||
|
assert len(reg._handlers["agent:crisis"]) >= 1
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_emit_agent_crisis_calls_handler(self, tmp_path):
|
||||||
|
log_file = tmp_path / "logs" / "crisis.log"
|
||||||
|
|
||||||
|
with patch("gateway.builtin_hooks.crisis.CRISIS_LOG_FILE", log_file):
|
||||||
|
with patch("gateway.builtin_hooks.crisis._notify_telegram", new_callable=AsyncMock):
|
||||||
|
from gateway.hooks import HookRegistry
|
||||||
|
reg = HookRegistry()
|
||||||
|
reg.discover_and_load()
|
||||||
|
await reg.emit("agent:crisis", _make_context())
|
||||||
|
|
||||||
|
assert log_file.exists()
|
||||||
Reference in New Issue
Block a user