Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
7bd18e1a9a feat: crisis notification hook with Telegram alerts (#705)
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Contributor Attribution Check / check-attribution (pull_request) Failing after 38s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 58s
Tests / e2e (pull_request) Successful in 4m59s
Tests / test (pull_request) Failing after 42m5s
Crisis hook detects crisis events in conversations and notifies humans.

New gateway/builtin_hooks/crisis_notify.py:
- detect_crisis(): scans text for crisis indicators (suicide, self-harm)
  returns (is_crisis, severity, matched_patterns)
- 14 crisis patterns across HIGH/MEDIUM/LOW severity
- log_crisis_event(): writes to ~/.hermes/crisis-events.log (JSON lines)
- send_telegram_crisis_alert(): sends notification via Telegram
  (ALERT_TELEGRAM_TOKEN + ALERT_TELEGRAM_CHAT_ID or CRISIS_ALERT_CHAT_ID)
- handle(): agent:end hook handler, scans user messages and agent responses

Integration:
- Registered as builtin hook in gateway/hooks.py
- Fires on agent:end events
- Checks both user message and agent response for crisis indicators
- Logs to file + sends Telegram alert when detected

Tests: tests/test_crisis_notify.py

Closes #705
2026-04-14 23:15:03 -04:00
5 changed files with 230 additions and 300 deletions

View File

@@ -1,189 +0,0 @@
"""
Session Analytics — Per-session token/cost/time tracking
Tracks resource consumption per session for transparency.
Issue: #753
"""
import json
import time
from dataclasses import dataclass, asdict, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
HERMES_HOME = Path.home() / ".hermes"
ANALYTICS_DIR = HERMES_HOME / "analytics"
# Cost per 1K tokens by provider (input/output)
COST_TABLE = {
"anthropic": {"input": 0.015, "output": 0.075},
"openai": {"input": 0.005, "output": 0.015},
"nous": {"input": 0.002, "output": 0.006},
"openrouter": {"input": 0.005, "output": 0.015},
"ollama": {"input": 0.0, "output": 0.0},
"local": {"input": 0.0, "output": 0.0},
}
@dataclass
class SessionStats:
"""Statistics for a single session."""
session_id: str
start_time: str
end_time: Optional[str] = None
# Token counts
input_tokens: int = 0
output_tokens: int = 0
total_tokens: int = 0
# Tool usage
tool_calls: int = 0
tool_errors: int = 0
# Timing
wall_time_seconds: float = 0.0
api_calls: int = 0
# Cost
estimated_cost_usd: float = 0.0
provider: str = ""
model: str = ""
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
class SessionTracker:
"""Track per-session analytics."""
def __init__(self, session_id: str, provider: str = "", model: str = ""):
self.session_id = session_id
self.provider = provider.lower() if provider else ""
self.model = model
self.start_time = time.time()
self.stats = SessionStats(
session_id=session_id,
start_time=datetime.now(timezone.utc).isoformat(),
provider=provider,
model=model
)
def record_tokens(self, input_tokens: int, output_tokens: int):
"""Record token usage."""
self.stats.input_tokens += input_tokens
self.stats.output_tokens += output_tokens
self.stats.total_tokens = self.stats.input_tokens + self.stats.output_tokens
# Estimate cost
costs = COST_TABLE.get(self.provider, {"input": 0.01, "output": 0.03})
cost = (input_tokens / 1000) * costs["input"] + (output_tokens / 1000) * costs["output"]
self.stats.estimated_cost_usd += cost
def record_tool_call(self, success: bool = True):
"""Record a tool call."""
self.stats.tool_calls += 1
if not success:
self.stats.tool_errors += 1
def record_api_call(self):
"""Record an API call."""
self.stats.api_calls += 1
def finish(self) -> SessionStats:
"""Finish tracking and return stats."""
self.stats.end_time = datetime.now(timezone.utc).isoformat()
self.stats.wall_time_seconds = time.time() - self.start_time
return self.stats
def get_current_stats(self) -> SessionStats:
"""Get current stats without finishing."""
self.stats.wall_time_seconds = time.time() - self.start_time
return self.stats
def format_stats(stats: SessionStats) -> str:
"""Format stats for display."""
lines = []
lines.append(f"Session: {stats.session_id[:20]}...")
lines.append(f"Provider: {stats.provider or 'unknown'}")
lines.append(f"Model: {stats.model or 'unknown'}")
lines.append("")
lines.append(f"Tokens: {stats.input_tokens:,} in / {stats.output_tokens:,} out ({stats.total_tokens:,} total)")
lines.append(f"Cost: ${stats.estimated_cost_usd:.4f}")
lines.append(f"API calls: {stats.api_calls}")
lines.append(f"Tool calls: {stats.tool_calls} ({stats.tool_errors} errors)")
lines.append(f"Wall time: {stats.wall_time_seconds:.1f}s")
return "\n".join(lines)
def save_session_stats(stats: SessionStats):
"""Save session stats to disk."""
ANALYTICS_DIR.mkdir(parents=True, exist_ok=True)
# Daily file
date_str = datetime.now().strftime("%Y-%m-%d")
stats_file = ANALYTICS_DIR / f"sessions_{date_str}.jsonl"
with open(stats_file, "a") as f:
f.write(json.dumps(stats.to_dict()) + "\n")
def get_daily_stats(date_str: Optional[str] = None) -> Dict[str, Any]:
"""Get aggregate stats for a day."""
if date_str is None:
date_str = datetime.now().strftime("%Y-%m-%d")
stats_file = ANALYTICS_DIR / f"sessions_{date_str}.jsonl"
if not stats_file.exists():
return {"date": date_str, "sessions": 0}
sessions = []
with open(stats_file) as f:
for line in f:
line = line.strip()
if line:
try:
sessions.append(json.loads(line))
except json.JSONDecodeError:
pass
if not sessions:
return {"date": date_str, "sessions": 0}
total_tokens = sum(s.get("total_tokens", 0) for s in sessions)
total_cost = sum(s.get("estimated_cost_usd", 0) for s in sessions)
total_time = sum(s.get("wall_time_seconds", 0) for s in sessions)
total_tool_calls = sum(s.get("tool_calls", 0) for s in sessions)
total_errors = sum(s.get("tool_errors", 0) for s in sessions)
return {
"date": date_str,
"sessions": len(sessions),
"total_tokens": total_tokens,
"total_cost_usd": round(total_cost, 4),
"total_wall_time_seconds": round(total_time, 1),
"total_tool_calls": total_tool_calls,
"total_tool_errors": total_errors,
"avg_tokens_per_session": total_tokens // len(sessions) if sessions else 0,
"avg_cost_per_session": round(total_cost / len(sessions), 4) if sessions else 0,
}
def format_daily_report(stats: Dict[str, Any]) -> str:
"""Format daily stats as report."""
lines = []
lines.append(f"# Session Analytics — {stats['date']}")
lines.append("")
lines.append(f"Sessions: {stats['sessions']}")
lines.append(f"Total tokens: {stats.get('total_tokens', 0):,}")
lines.append(f"Total cost: ${stats.get('total_cost_usd', 0):.4f}")
lines.append(f"Total wall time: {stats.get('total_wall_time_seconds', 0):.1f}s")
lines.append(f"Tool calls: {stats.get('total_tool_calls', 0)} ({stats.get('total_tool_errors', 0)} errors)")
lines.append("")
lines.append(f"Avg tokens/session: {stats.get('avg_tokens_per_session', 0):,}")
lines.append(f"Avg cost/session: ${stats.get('avg_cost_per_session', 0):.4f}")
return "\n".join(lines)

View File

@@ -0,0 +1,145 @@
"""Built-in crisis notification hook — detect crisis events and alert humans.
Fires on agent:end events. Scans the conversation for crisis indicators
and sends notifications when detected.
Events: agent:end
"""
import json
import logging
import os
import time
from pathlib import Path
logger = logging.getLogger("hooks.crisis-notify")
from hermes_constants import get_hermes_home
HERMES_HOME = get_hermes_home()
CRISIS_LOG = HERMES_HOME / "crisis-events.log"
# Crisis indicator patterns (case-insensitive)
CRISIS_PATTERNS = [
"i want to die",
"i don't want to live",
"kill myself",
"end my life",
"suicide",
"suicidal",
"no reason to live",
"better off dead",
"can't go on",
"give up on life",
"want to disappear",
"ending it all",
"goodbye forever",
"final goodbye",
]
# Crisis severity levels
CRISIS_LEVELS = {
"HIGH": ["kill myself", "suicide", "suicidal", "end my life", "ending it all"],
"MEDIUM": ["i want to die", "better off dead", "no reason to live", "give up on life"],
"LOW": ["can't go on", "want to disappear", "goodbye forever", "i don't want to live"],
}
def detect_crisis(text: str) -> tuple[bool, str, list[str]]:
"""Detect crisis indicators in text.
Returns (is_crisis, severity, matched_patterns).
"""
if not text:
return False, "", []
text_lower = text.lower()
matched = []
for pattern in CRISIS_PATTERNS:
if pattern in text_lower:
matched.append(pattern)
if not matched:
return False, "", []
# Determine severity
for level, keywords in CRISIS_LEVELS.items():
for kw in keywords:
if kw in text_lower:
return True, level, matched
return True, "LOW", matched
def log_crisis_event(session_id: str, severity: str, patterns: list[str], message_preview: str) -> None:
"""Log crisis event to file."""
try:
event = {
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"session_id": session_id,
"severity": severity,
"patterns": patterns,
"message_preview": message_preview[:200],
}
with open(CRISIS_LOG, "a") as f:
f.write(json.dumps(event) + "\n")
logger.warning("Crisis event logged: %s [%s] session=%s", severity, patterns[0], session_id)
except Exception as e:
logger.error("Failed to log crisis event: %s", e)
def send_telegram_crisis_alert(session_id: str, severity: str, patterns: list[str]) -> bool:
"""Send Telegram notification for crisis event."""
token = os.getenv("ALERT_TELEGRAM_TOKEN", "") or os.getenv("TELEGRAM_BOT_TOKEN", "")
chat_id = os.getenv("ALERT_TELEGRAM_CHAT_ID", "") or os.getenv("CRISIS_ALERT_CHAT_ID", "")
if not token or not chat_id:
logger.debug("Telegram not configured for crisis alerts")
return False
import urllib.request
import urllib.parse
emoji = {"HIGH": "\U0001f6a8", "MEDIUM": "\u26a0\ufe0f", "LOW": "\U0001f4c8"}.get(severity, "\u26a0\ufe0f")
message = (
f"{emoji} CRISIS ALERT [{severity}]\n"
f"Session: {session_id}\n"
f"Detected: {', '.join(patterns[:3])}\n"
f"Action: Check session immediately"
)
url = f"https://api.telegram.org/bot{token}/sendMessage"
data = urllib.parse.urlencode({"chat_id": chat_id, "text": message}).encode()
try:
req = urllib.request.Request(url, data=data, method="POST")
with urllib.request.urlopen(req, timeout=10) as resp:
result = json.loads(resp.read())
return result.get("ok", False)
except Exception as e:
logger.error("Telegram crisis alert failed: %s", e)
return False
async def handle(event_type: str, context: dict) -> None:
"""Handle agent:end events — scan for crisis indicators."""
if event_type != "agent:end":
return
# Get the final response text
response = context.get("response", "") or context.get("final_response", "")
user_message = context.get("user_message", "") or context.get("message", "")
session_id = context.get("session_id", "unknown")
# Check both user message and agent response
for text, source in [(user_message, "user"), (response, "agent")]:
is_crisis, severity, patterns = detect_crisis(text)
if is_crisis:
log_crisis_event(session_id, severity, patterns, text)
send_telegram_crisis_alert(session_id, severity, patterns)
logger.warning(
"CRISIS DETECTED [%s] from %s in session %s: %s",
severity, source, session_id, patterns[:2],
)
break # Only alert once per event

View File

@@ -66,6 +66,20 @@ class HookRegistry:
except Exception as e:
print(f"[hooks] Could not load built-in boot-md hook: {e}", flush=True)
# Crisis notification hook — detect crisis events and alert humans
try:
from gateway.builtin_hooks.crisis_notify import handle as crisis_handle
self._handlers.setdefault("agent:end", []).append(crisis_handle)
self._loaded_hooks.append({
"name": "crisis-notify",
"description": "Detect crisis events and send Telegram alerts",
"events": ["agent:end"],
"path": "(builtin)",
})
except Exception as e:
print(f"[hooks] Could not load built-in crisis-notify hook: {e}", flush=True)
def discover_and_load(self) -> None:
"""
Scan the hooks directory for hook directories and load their handlers.

View File

@@ -0,0 +1,71 @@
"""Tests for crisis notification hook."""
import json
import pytest
import sys
import tempfile
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from gateway.builtin_hooks.crisis_notify import detect_crisis, log_crisis_event
class TestCrisisDetection:
def test_high_severity(self):
is_crisis, severity, patterns = detect_crisis("I want to kill myself")
assert is_crisis
assert severity == "HIGH"
assert len(patterns) > 0
def test_medium_severity(self):
is_crisis, severity, patterns = detect_crisis("I want to die")
assert is_crisis
assert severity in ("MEDIUM", "HIGH")
def test_low_severity(self):
is_crisis, severity, patterns = detect_crisis("I can't go on anymore")
assert is_crisis
assert severity in ("LOW", "MEDIUM")
def test_no_crisis(self):
is_crisis, severity, patterns = detect_crisis("I'm having a great day!")
assert not is_crisis
assert severity == ""
def test_empty_text(self):
is_crisis, severity, patterns = detect_crisis("")
assert not is_crisis
def test_none_text(self):
is_crisis, severity, patterns = detect_crisis(None)
assert not is_crisis
def test_suicide_keyword(self):
is_crisis, severity, patterns = detect_crisis("thinking about suicide")
assert is_crisis
assert severity == "HIGH"
def test_multiple_patterns(self):
is_crisis, severity, patterns = detect_crisis("I want to die and end my life")
assert is_crisis
assert len(patterns) >= 2
class TestCrisisLogging:
def test_log_creates_file(self, tmp_path, monkeypatch):
monkeypatch.setattr("gateway.builtin_hooks.crisis_notify.CRISIS_LOG", tmp_path / "crisis.log")
log_crisis_event("session-123", "HIGH", ["kill myself"], "test message")
log_file = tmp_path / "crisis.log"
assert log_file.exists()
content = log_file.read_text()
data = json.loads(content.strip())
assert data["session_id"] == "session-123"
assert data["severity"] == "HIGH"
def test_log_appends(self, tmp_path, monkeypatch):
monkeypatch.setattr("gateway.builtin_hooks.crisis_notify.CRISIS_LOG", tmp_path / "crisis.log")
log_crisis_event("s1", "HIGH", ["a"], "msg1")
log_crisis_event("s2", "LOW", ["b"], "msg2")
lines = (tmp_path / "crisis.log").read_text().strip().split("\n")
assert len(lines) == 2

View File

@@ -1,111 +0,0 @@
"""
Tests for session analytics
Issue: #753
"""
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
from agent.session_analytics import (
SessionTracker,
SessionStats,
format_stats,
get_daily_stats,
format_daily_report,
)
class TestSessionStats(unittest.TestCase):
def test_defaults(self):
stats = SessionStats(session_id="test", start_time="2026-01-01")
self.assertEqual(stats.input_tokens, 0)
self.assertEqual(stats.output_tokens, 0)
self.assertEqual(stats.tool_calls, 0)
def test_to_dict(self):
stats = SessionStats(session_id="test", start_time="2026-01-01")
d = stats.to_dict()
self.assertEqual(d["session_id"], "test")
self.assertIn("input_tokens", d)
class TestSessionTracker(unittest.TestCase):
def test_record_tokens(self):
tracker = SessionTracker("test", provider="openai")
tracker.record_tokens(100, 50)
stats = tracker.get_current_stats()
self.assertEqual(stats.input_tokens, 100)
self.assertEqual(stats.output_tokens, 50)
self.assertGreater(stats.estimated_cost_usd, 0)
def test_record_tool_call(self):
tracker = SessionTracker("test")
tracker.record_tool_call(success=True)
tracker.record_tool_call(success=False)
stats = tracker.get_current_stats()
self.assertEqual(stats.tool_calls, 2)
self.assertEqual(stats.tool_errors, 1)
def test_free_provider(self):
tracker = SessionTracker("test", provider="ollama")
tracker.record_tokens(1000, 500)
stats = tracker.get_current_stats()
self.assertEqual(stats.estimated_cost_usd, 0.0)
def test_finish(self):
tracker = SessionTracker("test")
stats = tracker.finish()
self.assertIsNotNone(stats.end_time)
self.assertGreater(stats.wall_time_seconds, 0)
class TestFormatStats(unittest.TestCase):
def test_format(self):
stats = SessionStats(
session_id="test123",
start_time="2026-01-01",
input_tokens=1000,
output_tokens=500,
total_tokens=1500,
tool_calls=5,
tool_errors=1,
wall_time_seconds=30.5,
api_calls=3
)
formatted = format_stats(stats)
self.assertIn("1,000", formatted)
self.assertIn("500", formatted)
class TestDailyStats(unittest.TestCase):
def test_empty(self):
with patch("agent.session_analytics.ANALYTICS_DIR", Path(tempfile.mkdtemp())):
stats = get_daily_stats("2020-01-01")
self.assertEqual(stats["sessions"], 0)
def test_format_report(self):
stats = {
"date": "2026-04-14",
"sessions": 10,
"total_tokens": 50000,
"total_cost_usd": 0.50,
"total_wall_time_seconds": 300,
"total_tool_calls": 100,
"total_tool_errors": 5,
"avg_tokens_per_session": 5000,
"avg_cost_per_session": 0.05,
}
report = format_daily_report(stats)
self.assertIn("10", report)
self.assertIn("50,000", report)
if __name__ == "__main__":
unittest.main()