Compare commits

...

1 Commits

Author SHA1 Message Date
Alexander Whitestone
7acfd3e870 feat: add crisis hook with file logging and Telegram notification
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 19s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 37s
Tests / e2e (pull_request) Successful in 3m31s
Tests / test (pull_request) Failing after 44m10s
Adds a built-in crisis hook that handles agent:crisis events by:
- Appending a structured JSON-lines entry to ~/.hermes/logs/crisis.log
  (includes session_id, crisis_level, platform, user_id, summary, timestamp)
- Sending a Telegram alert to TELEGRAM_CRISIS_CHAT_ID (or the configured
  home channel as fallback), including session ID and crisis level

The hook is always-on once the gateway starts; no configuration beyond an
existing Telegram bot token is needed to enable file logging.

Also documents the agent:crisis event in gateway/hooks.py and adds
17 tests covering file logging, message formatting, Telegram dispatch
(including graceful degradation when telegram is unconfigured), and
HookRegistry registration.

Fixes #705

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-15 21:15:39 -04:00
3 changed files with 403 additions and 0 deletions

View File

@@ -0,0 +1,140 @@
"""Built-in crisis hook — log and notify humans when a crisis is detected.
Listens for ``agent:crisis`` events and:
1. Appends a structured entry to ``~/.hermes/logs/crisis.log``
2. Sends a Telegram notification to the configured home channel (or the
channel set via ``TELEGRAM_CRISIS_CHAT_ID``).
Event context keys consumed:
session_id -- The active session identifier.
crisis_level -- Severity label (e.g. "high", "critical", "unknown").
platform -- The messaging platform that triggered the session.
user_id -- User identifier on that platform.
message -- Snippet of the incoming message (may be absent).
summary -- Short description of the crisis (may be absent).
Both notification steps are best-effort: failures are logged but never
raise, so they cannot block the main gateway pipeline.
"""
import json
import logging
import os
from datetime import datetime, timezone
from pathlib import Path
logger = logging.getLogger("hooks.crisis")
from hermes_constants import get_hermes_home
HERMES_HOME = get_hermes_home()
CRISIS_LOG_FILE = HERMES_HOME / "logs" / "crisis.log"
def _log_crisis_to_file(context: dict) -> None:
"""Append a JSON-lines entry to the crisis log file."""
try:
CRISIS_LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"session_id": context.get("session_id", "unknown"),
"crisis_level": context.get("crisis_level", "unknown"),
"platform": context.get("platform", ""),
"user_id": context.get("user_id", ""),
"message": context.get("message", ""),
"summary": context.get("summary", ""),
}
with CRISIS_LOG_FILE.open("a", encoding="utf-8") as f:
f.write(json.dumps(entry) + "\n")
logger.info(
"Crisis logged: session=%s level=%s",
entry["session_id"],
entry["crisis_level"],
)
except Exception as e:
logger.error("Failed to write crisis log: %s", e)
def _build_telegram_message(context: dict) -> str:
"""Format a Telegram alert message from the crisis context."""
session_id = context.get("session_id", "unknown")
crisis_level = context.get("crisis_level", "unknown").upper()
platform = context.get("platform", "unknown")
user_id = context.get("user_id", "")
summary = context.get("summary", "")
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
lines = [
f"🚨 *CRISIS ALERT* — Level: {crisis_level}",
f"Time: {timestamp}",
f"Session: `{session_id}`",
f"Platform: {platform}",
]
if user_id:
lines.append(f"User: {user_id}")
if summary:
lines.append(f"Summary: {summary}")
return "\n".join(lines)
async def _notify_telegram(context: dict) -> None:
"""Send a Telegram alert for the crisis event."""
try:
token = os.getenv("TELEGRAM_BOT_TOKEN", "")
if not token:
logger.warning("TELEGRAM_BOT_TOKEN not set — skipping crisis notification")
return
# Prefer explicit crisis chat ID, fall back to gateway home channel.
chat_id = os.getenv("TELEGRAM_CRISIS_CHAT_ID", "")
if not chat_id:
try:
from gateway.config import load_gateway_config, Platform
config = load_gateway_config()
home = config.get_home_channel(Platform.TELEGRAM)
if home:
chat_id = home.chat_id
except Exception as e:
logger.debug("Could not load gateway config for crisis hook: %s", e)
if not chat_id:
logger.warning(
"No Telegram target configured — set TELEGRAM_CRISIS_CHAT_ID or "
"configure a Telegram home channel. Skipping crisis notification."
)
return
from telegram import Bot
from telegram.constants import ParseMode
message = _build_telegram_message(context)
bot = Bot(token=token)
await bot.send_message(
chat_id=int(chat_id),
text=message,
parse_mode=ParseMode.MARKDOWN,
)
logger.info(
"Crisis Telegram notification sent to chat_id=%s (session=%s level=%s)",
chat_id,
context.get("session_id", "unknown"),
context.get("crisis_level", "unknown"),
)
except ImportError:
logger.warning(
"python-telegram-bot not installed — skipping Telegram crisis notification"
)
except Exception as e:
logger.error("Failed to send Telegram crisis notification: %s", e)
async def handle(event_type: str, context: dict) -> None:
"""Handle agent:crisis events — log to file and notify via Telegram."""
_log_crisis_to_file(context)
try:
await _notify_telegram(context)
except Exception as e:
logger.error("Crisis notification failed unexpectedly: %s", e)

View File

@@ -14,6 +14,7 @@ Events:
- agent:start -- Agent begins processing a message
- agent:step -- Each turn in the tool-calling loop
- agent:end -- Agent finishes processing
- agent:crisis -- Crisis detected during a session (builtin: logs + Telegram alert)
- command:* -- Any slash command executed (wildcard match)
Errors in hooks are caught and logged but never block the main pipeline.
@@ -66,6 +67,19 @@ class HookRegistry:
except Exception as e:
print(f"[hooks] Could not load built-in boot-md hook: {e}", flush=True)
try:
from gateway.builtin_hooks.crisis import handle as crisis_handle
self._handlers.setdefault("agent:crisis", []).append(crisis_handle)
self._loaded_hooks.append({
"name": "crisis",
"description": "Log crisis events to file and send Telegram notification",
"events": ["agent:crisis"],
"path": "(builtin)",
})
except Exception as e:
print(f"[hooks] Could not load built-in crisis hook: {e}", flush=True)
def discover_and_load(self) -> None:
"""
Scan the hooks directory for hook directories and load their handlers.

View File

@@ -0,0 +1,249 @@
"""Tests for gateway/builtin_hooks/crisis.py — crisis notification hook."""
import json
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_context(**kwargs):
base = {
"session_id": "sess-abc123",
"crisis_level": "high",
"platform": "telegram",
"user_id": "user-42",
"message": "I need help urgently",
"summary": "User expressed distress",
}
base.update(kwargs)
return base
# ---------------------------------------------------------------------------
# _log_crisis_to_file
# ---------------------------------------------------------------------------
class TestLogCrisisToFile:
def test_creates_log_file_and_entry(self, tmp_path):
log_file = tmp_path / "logs" / "crisis.log"
with patch("gateway.builtin_hooks.crisis.CRISIS_LOG_FILE", log_file):
from gateway.builtin_hooks.crisis import _log_crisis_to_file
_log_crisis_to_file(_make_context())
assert log_file.exists()
entry = json.loads(log_file.read_text().strip())
assert entry["session_id"] == "sess-abc123"
assert entry["crisis_level"] == "high"
assert entry["platform"] == "telegram"
assert entry["user_id"] == "user-42"
assert "timestamp" in entry
def test_appends_multiple_entries(self, tmp_path):
log_file = tmp_path / "logs" / "crisis.log"
with patch("gateway.builtin_hooks.crisis.CRISIS_LOG_FILE", log_file):
from gateway.builtin_hooks.crisis import _log_crisis_to_file
_log_crisis_to_file(_make_context(session_id="sess-1"))
_log_crisis_to_file(_make_context(session_id="sess-2"))
lines = log_file.read_text().strip().splitlines()
assert len(lines) == 2
assert json.loads(lines[0])["session_id"] == "sess-1"
assert json.loads(lines[1])["session_id"] == "sess-2"
def test_missing_optional_fields_use_defaults(self, tmp_path):
log_file = tmp_path / "logs" / "crisis.log"
with patch("gateway.builtin_hooks.crisis.CRISIS_LOG_FILE", log_file):
from gateway.builtin_hooks.crisis import _log_crisis_to_file
_log_crisis_to_file({}) # empty context
entry = json.loads(log_file.read_text().strip())
assert entry["session_id"] == "unknown"
assert entry["crisis_level"] == "unknown"
def test_tolerates_write_error(self, tmp_path):
log_file = tmp_path / "logs" / "crisis.log"
with patch("gateway.builtin_hooks.crisis.CRISIS_LOG_FILE", log_file):
with patch("builtins.open", side_effect=OSError("disk full")):
from gateway.builtin_hooks.crisis import _log_crisis_to_file
# Should not raise
_log_crisis_to_file(_make_context())
# ---------------------------------------------------------------------------
# _build_telegram_message
# ---------------------------------------------------------------------------
class TestBuildTelegramMessage:
def test_includes_session_id_and_level(self):
from gateway.builtin_hooks.crisis import _build_telegram_message
msg = _build_telegram_message(_make_context(
session_id="sess-xyz",
crisis_level="critical",
))
assert "sess-xyz" in msg
assert "CRITICAL" in msg
def test_includes_summary_when_present(self):
from gateway.builtin_hooks.crisis import _build_telegram_message
msg = _build_telegram_message(_make_context(summary="User in danger"))
assert "User in danger" in msg
def test_omits_summary_when_absent(self):
from gateway.builtin_hooks.crisis import _build_telegram_message
ctx = _make_context()
ctx.pop("summary")
msg = _build_telegram_message(ctx)
assert "Summary" not in msg
def test_includes_platform(self):
from gateway.builtin_hooks.crisis import _build_telegram_message
msg = _build_telegram_message(_make_context(platform="discord"))
assert "discord" in msg
# ---------------------------------------------------------------------------
# _notify_telegram
# ---------------------------------------------------------------------------
class TestNotifyTelegram:
@pytest.mark.asyncio
async def test_skips_when_no_token(self):
with patch.dict("os.environ", {}, clear=True):
with patch("os.getenv", return_value=""):
from gateway.builtin_hooks.crisis import _notify_telegram
# Should complete without error
await _notify_telegram(_make_context())
@pytest.mark.asyncio
async def test_sends_message_with_explicit_chat_id(self):
mock_bot = AsyncMock()
mock_bot_cls = MagicMock(return_value=mock_bot)
env = {
"TELEGRAM_BOT_TOKEN": "test-token",
"TELEGRAM_CRISIS_CHAT_ID": "9999",
}
with patch.dict("os.environ", env):
with patch("gateway.builtin_hooks.crisis.os.getenv", side_effect=lambda k, d="": env.get(k, d)):
with patch("gateway.builtin_hooks.crisis._build_telegram_message", return_value="alert!"):
try:
from telegram import Bot
from telegram.constants import ParseMode
with patch("gateway.builtin_hooks.crisis.Bot", mock_bot_cls):
from gateway.builtin_hooks.crisis import _notify_telegram
await _notify_telegram(_make_context())
mock_bot.send_message.assert_awaited_once()
call_kwargs = mock_bot.send_message.call_args
assert call_kwargs.kwargs["chat_id"] == 9999
except ImportError:
pytest.skip("python-telegram-bot not installed")
@pytest.mark.asyncio
async def test_skips_when_no_chat_id(self):
env = {"TELEGRAM_BOT_TOKEN": "test-token"}
with patch.dict("os.environ", env):
with patch("gateway.builtin_hooks.crisis.os.getenv", side_effect=lambda k, d="": env.get(k, d)):
with patch("gateway.builtin_hooks.crisis.load_gateway_config", side_effect=Exception("no config"), create=True):
from gateway.builtin_hooks.crisis import _notify_telegram
# Should complete without error
await _notify_telegram(_make_context())
@pytest.mark.asyncio
async def test_handles_import_error_gracefully(self):
env = {
"TELEGRAM_BOT_TOKEN": "test-token",
"TELEGRAM_CRISIS_CHAT_ID": "9999",
}
import sys
with patch.dict("os.environ", env):
with patch.dict(sys.modules, {"telegram": None}):
from gateway.builtin_hooks import crisis as crisis_mod
import importlib
importlib.reload(crisis_mod)
# Should not raise even if telegram is unavailable at runtime
await crisis_mod._notify_telegram(_make_context())
# ---------------------------------------------------------------------------
# handle (integration: log + notify together)
# ---------------------------------------------------------------------------
class TestHandle:
@pytest.mark.asyncio
async def test_handle_logs_and_notifies(self, tmp_path):
log_file = tmp_path / "logs" / "crisis.log"
with patch("gateway.builtin_hooks.crisis.CRISIS_LOG_FILE", log_file):
with patch("gateway.builtin_hooks.crisis._notify_telegram", new_callable=AsyncMock) as mock_notify:
from gateway.builtin_hooks.crisis import handle
await handle("agent:crisis", _make_context())
# Log written
assert log_file.exists()
entry = json.loads(log_file.read_text().strip())
assert entry["session_id"] == "sess-abc123"
# Notification attempted
mock_notify.assert_awaited_once()
call_context = mock_notify.call_args[0][0]
assert call_context["session_id"] == "sess-abc123"
@pytest.mark.asyncio
async def test_handle_does_not_raise_on_notify_failure(self, tmp_path):
log_file = tmp_path / "logs" / "crisis.log"
with patch("gateway.builtin_hooks.crisis.CRISIS_LOG_FILE", log_file):
with patch(
"gateway.builtin_hooks.crisis._notify_telegram",
new_callable=AsyncMock,
side_effect=Exception("network down"),
):
from gateway.builtin_hooks.crisis import handle
# Should not propagate the exception
await handle("agent:crisis", _make_context())
# ---------------------------------------------------------------------------
# HookRegistry: crisis hook is registered as a builtin
# ---------------------------------------------------------------------------
class TestCrisisHookRegistration:
def test_crisis_hook_registered_in_registry(self):
from gateway.hooks import HookRegistry
reg = HookRegistry()
reg.discover_and_load()
names = [h["name"] for h in reg.loaded_hooks]
assert "crisis" in names
def test_crisis_handler_registered_for_event(self):
from gateway.hooks import HookRegistry
reg = HookRegistry()
reg.discover_and_load()
assert "agent:crisis" in reg._handlers
assert len(reg._handlers["agent:crisis"]) >= 1
@pytest.mark.asyncio
async def test_emit_agent_crisis_calls_handler(self, tmp_path):
log_file = tmp_path / "logs" / "crisis.log"
with patch("gateway.builtin_hooks.crisis.CRISIS_LOG_FILE", log_file):
with patch("gateway.builtin_hooks.crisis._notify_telegram", new_callable=AsyncMock):
from gateway.hooks import HookRegistry
reg = HookRegistry()
reg.discover_and_load()
await reg.emit("agent:crisis", _make_context())
assert log_file.exists()