Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
7bd18e1a9a feat: crisis notification hook with Telegram alerts (#705)
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Contributor Attribution Check / check-attribution (pull_request) Failing after 38s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 58s
Tests / e2e (pull_request) Successful in 4m59s
Tests / test (pull_request) Failing after 42m5s
Crisis hook detects crisis events in conversations and notifies humans.

New gateway/builtin_hooks/crisis_notify.py:
- detect_crisis(): scans text for crisis indicators (suicide, self-harm)
  returns (is_crisis, severity, matched_patterns)
- 14 crisis patterns across HIGH/MEDIUM/LOW severity
- log_crisis_event(): writes to ~/.hermes/crisis-events.log (JSON lines)
- send_telegram_crisis_alert(): sends notification via Telegram
  (ALERT_TELEGRAM_TOKEN + ALERT_TELEGRAM_CHAT_ID or CRISIS_ALERT_CHAT_ID)
- handle(): agent:end hook handler, scans user messages and agent responses

Integration:
- Registered as builtin hook in gateway/hooks.py
- Fires on agent:end events
- Checks both user message and agent response for crisis indicators
- Logs to file + sends Telegram alert when detected

Tests: tests/test_crisis_notify.py

Closes #705
2026-04-14 23:15:03 -04:00
5 changed files with 230 additions and 288 deletions

View File

@@ -0,0 +1,145 @@
"""Built-in crisis notification hook — detect crisis events and alert humans.
Fires on agent:end events. Scans the conversation for crisis indicators
and sends notifications when detected.
Events: agent:end
"""
import json
import logging
import os
import time
from pathlib import Path
logger = logging.getLogger("hooks.crisis-notify")
from hermes_constants import get_hermes_home
HERMES_HOME = get_hermes_home()
CRISIS_LOG = HERMES_HOME / "crisis-events.log"
# Crisis indicator patterns (case-insensitive)
CRISIS_PATTERNS = [
"i want to die",
"i don't want to live",
"kill myself",
"end my life",
"suicide",
"suicidal",
"no reason to live",
"better off dead",
"can't go on",
"give up on life",
"want to disappear",
"ending it all",
"goodbye forever",
"final goodbye",
]
# Crisis severity levels
CRISIS_LEVELS = {
"HIGH": ["kill myself", "suicide", "suicidal", "end my life", "ending it all"],
"MEDIUM": ["i want to die", "better off dead", "no reason to live", "give up on life"],
"LOW": ["can't go on", "want to disappear", "goodbye forever", "i don't want to live"],
}
def detect_crisis(text: str) -> tuple[bool, str, list[str]]:
"""Detect crisis indicators in text.
Returns (is_crisis, severity, matched_patterns).
"""
if not text:
return False, "", []
text_lower = text.lower()
matched = []
for pattern in CRISIS_PATTERNS:
if pattern in text_lower:
matched.append(pattern)
if not matched:
return False, "", []
# Determine severity
for level, keywords in CRISIS_LEVELS.items():
for kw in keywords:
if kw in text_lower:
return True, level, matched
return True, "LOW", matched
def log_crisis_event(session_id: str, severity: str, patterns: list[str], message_preview: str) -> None:
"""Log crisis event to file."""
try:
event = {
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"session_id": session_id,
"severity": severity,
"patterns": patterns,
"message_preview": message_preview[:200],
}
with open(CRISIS_LOG, "a") as f:
f.write(json.dumps(event) + "\n")
logger.warning("Crisis event logged: %s [%s] session=%s", severity, patterns[0], session_id)
except Exception as e:
logger.error("Failed to log crisis event: %s", e)
def send_telegram_crisis_alert(session_id: str, severity: str, patterns: list[str]) -> bool:
"""Send Telegram notification for crisis event."""
token = os.getenv("ALERT_TELEGRAM_TOKEN", "") or os.getenv("TELEGRAM_BOT_TOKEN", "")
chat_id = os.getenv("ALERT_TELEGRAM_CHAT_ID", "") or os.getenv("CRISIS_ALERT_CHAT_ID", "")
if not token or not chat_id:
logger.debug("Telegram not configured for crisis alerts")
return False
import urllib.request
import urllib.parse
emoji = {"HIGH": "\U0001f6a8", "MEDIUM": "\u26a0\ufe0f", "LOW": "\U0001f4c8"}.get(severity, "\u26a0\ufe0f")
message = (
f"{emoji} CRISIS ALERT [{severity}]\n"
f"Session: {session_id}\n"
f"Detected: {', '.join(patterns[:3])}\n"
f"Action: Check session immediately"
)
url = f"https://api.telegram.org/bot{token}/sendMessage"
data = urllib.parse.urlencode({"chat_id": chat_id, "text": message}).encode()
try:
req = urllib.request.Request(url, data=data, method="POST")
with urllib.request.urlopen(req, timeout=10) as resp:
result = json.loads(resp.read())
return result.get("ok", False)
except Exception as e:
logger.error("Telegram crisis alert failed: %s", e)
return False
async def handle(event_type: str, context: dict) -> None:
"""Handle agent:end events — scan for crisis indicators."""
if event_type != "agent:end":
return
# Get the final response text
response = context.get("response", "") or context.get("final_response", "")
user_message = context.get("user_message", "") or context.get("message", "")
session_id = context.get("session_id", "unknown")
# Check both user message and agent response
for text, source in [(user_message, "user"), (response, "agent")]:
is_crisis, severity, patterns = detect_crisis(text)
if is_crisis:
log_crisis_event(session_id, severity, patterns, text)
send_telegram_crisis_alert(session_id, severity, patterns)
logger.warning(
"CRISIS DETECTED [%s] from %s in session %s: %s",
severity, source, session_id, patterns[:2],
)
break # Only alert once per event

View File

@@ -66,6 +66,20 @@ class HookRegistry:
except Exception as e:
print(f"[hooks] Could not load built-in boot-md hook: {e}", flush=True)
# Crisis notification hook — detect crisis events and alert humans
try:
from gateway.builtin_hooks.crisis_notify import handle as crisis_handle
self._handlers.setdefault("agent:end", []).append(crisis_handle)
self._loaded_hooks.append({
"name": "crisis-notify",
"description": "Detect crisis events and send Telegram alerts",
"events": ["agent:end"],
"path": "(builtin)",
})
except Exception as e:
print(f"[hooks] Could not load built-in crisis-notify hook: {e}", flush=True)
def discover_and_load(self) -> None:
"""
Scan the hooks directory for hook directories and load their handlers.

View File

@@ -0,0 +1,71 @@
"""Tests for crisis notification hook."""
import json
import pytest
import sys
import tempfile
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from gateway.builtin_hooks.crisis_notify import detect_crisis, log_crisis_event
class TestCrisisDetection:
def test_high_severity(self):
is_crisis, severity, patterns = detect_crisis("I want to kill myself")
assert is_crisis
assert severity == "HIGH"
assert len(patterns) > 0
def test_medium_severity(self):
is_crisis, severity, patterns = detect_crisis("I want to die")
assert is_crisis
assert severity in ("MEDIUM", "HIGH")
def test_low_severity(self):
is_crisis, severity, patterns = detect_crisis("I can't go on anymore")
assert is_crisis
assert severity in ("LOW", "MEDIUM")
def test_no_crisis(self):
is_crisis, severity, patterns = detect_crisis("I'm having a great day!")
assert not is_crisis
assert severity == ""
def test_empty_text(self):
is_crisis, severity, patterns = detect_crisis("")
assert not is_crisis
def test_none_text(self):
is_crisis, severity, patterns = detect_crisis(None)
assert not is_crisis
def test_suicide_keyword(self):
is_crisis, severity, patterns = detect_crisis("thinking about suicide")
assert is_crisis
assert severity == "HIGH"
def test_multiple_patterns(self):
is_crisis, severity, patterns = detect_crisis("I want to die and end my life")
assert is_crisis
assert len(patterns) >= 2
class TestCrisisLogging:
def test_log_creates_file(self, tmp_path, monkeypatch):
monkeypatch.setattr("gateway.builtin_hooks.crisis_notify.CRISIS_LOG", tmp_path / "crisis.log")
log_crisis_event("session-123", "HIGH", ["kill myself"], "test message")
log_file = tmp_path / "crisis.log"
assert log_file.exists()
content = log_file.read_text()
data = json.loads(content.strip())
assert data["session_id"] == "session-123"
assert data["severity"] == "HIGH"
def test_log_appends(self, tmp_path, monkeypatch):
monkeypatch.setattr("gateway.builtin_hooks.crisis_notify.CRISIS_LOG", tmp_path / "crisis.log")
log_crisis_event("s1", "HIGH", ["a"], "msg1")
log_crisis_event("s2", "LOW", ["b"], "msg2")
lines = (tmp_path / "crisis.log").read_text().strip().split("\n")
assert len(lines) == 2

View File

@@ -1,55 +0,0 @@
"""
Tests for error classification (#752).
"""
import pytest
from tools.error_classifier import classify_error, ErrorCategory, ErrorClassification
class TestErrorClassification:
def test_timeout_is_retryable(self):
err = Exception("Connection timed out")
result = classify_error(err)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_429_is_retryable(self):
err = Exception("Rate limit exceeded")
result = classify_error(err, response_code=429)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_404_is_permanent(self):
err = Exception("Not found")
result = classify_error(err, response_code=404)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_403_is_permanent(self):
err = Exception("Forbidden")
result = classify_error(err, response_code=403)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_500_is_retryable(self):
err = Exception("Internal server error")
result = classify_error(err, response_code=500)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_schema_error_is_permanent(self):
err = Exception("Schema validation failed")
result = classify_error(err)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_unknown_is_retryable_with_caution(self):
err = Exception("Some unknown error")
result = classify_error(err)
assert result.category == ErrorCategory.UNKNOWN
assert result.should_retry is True
assert result.max_retries == 1
if __name__ == "__main__":
pytest.main([__file__])

View File

@@ -1,233 +0,0 @@
"""
Tool Error Classification — Retryable vs Permanent.
Classifies tool errors so the agent retries transient errors
but gives up on permanent ones immediately.
"""
import logging
import re
import time
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
class ErrorCategory(Enum):
"""Error category classification."""
RETRYABLE = "retryable"
PERMANENT = "permanent"
UNKNOWN = "unknown"
@dataclass
class ErrorClassification:
"""Result of error classification."""
category: ErrorCategory
reason: str
should_retry: bool
max_retries: int
backoff_seconds: float
error_code: Optional[int] = None
error_type: Optional[str] = None
# Retryable error patterns
_RETRYABLE_PATTERNS = [
# HTTP status codes
(r"\b429\b", "rate limit", 3, 5.0),
(r"\b500\b", "server error", 3, 2.0),
(r"\b502\b", "bad gateway", 3, 2.0),
(r"\b503\b", "service unavailable", 3, 5.0),
(r"\b504\b", "gateway timeout", 3, 5.0),
# Timeout patterns
(r"timeout", "timeout", 3, 2.0),
(r"timed out", "timeout", 3, 2.0),
(r"TimeoutExpired", "timeout", 3, 2.0),
# Connection errors
(r"connection refused", "connection refused", 2, 5.0),
(r"connection reset", "connection reset", 2, 2.0),
(r"network unreachable", "network unreachable", 2, 10.0),
(r"DNS", "DNS error", 2, 5.0),
# Transient errors
(r"temporary", "temporary error", 2, 2.0),
(r"transient", "transient error", 2, 2.0),
(r"retry", "retryable", 2, 2.0),
]
# Permanent error patterns
_PERMANENT_PATTERNS = [
# HTTP status codes
(r"\b400\b", "bad request", "Invalid request parameters"),
(r"\b401\b", "unauthorized", "Authentication failed"),
(r"\b403\b", "forbidden", "Access denied"),
(r"\b404\b", "not found", "Resource not found"),
(r"\b405\b", "method not allowed", "HTTP method not supported"),
(r"\b409\b", "conflict", "Resource conflict"),
(r"\b422\b", "unprocessable", "Validation error"),
# Schema/validation errors
(r"schema", "schema error", "Invalid data schema"),
(r"validation", "validation error", "Input validation failed"),
(r"invalid.*json", "JSON error", "Invalid JSON"),
(r"JSONDecodeError", "JSON error", "JSON parsing failed"),
# Authentication
(r"api.?key", "API key error", "Invalid or missing API key"),
(r"token.*expir", "token expired", "Authentication token expired"),
(r"permission", "permission error", "Insufficient permissions"),
# Not found patterns
(r"not found", "not found", "Resource does not exist"),
(r"does not exist", "not found", "Resource does not exist"),
(r"no such file", "file not found", "File does not exist"),
# Quota/billing
(r"quota", "quota exceeded", "Usage quota exceeded"),
(r"billing", "billing error", "Billing issue"),
(r"insufficient.*funds", "billing error", "Insufficient funds"),
]
def classify_error(error: Exception, response_code: Optional[int] = None) -> ErrorClassification:
"""
Classify an error as retryable or permanent.
Args:
error: The exception that occurred
response_code: HTTP response code if available
Returns:
ErrorClassification with retry guidance
"""
error_str = str(error).lower()
error_type = type(error).__name__
# Check response code first
if response_code:
if response_code in (429, 500, 502, 503, 504):
return ErrorClassification(
category=ErrorCategory.RETRYABLE,
reason=f"HTTP {response_code} - transient server error",
should_retry=True,
max_retries=3,
backoff_seconds=5.0 if response_code == 429 else 2.0,
error_code=response_code,
error_type=error_type,
)
elif response_code in (400, 401, 403, 404, 405, 409, 422):
return ErrorClassification(
category=ErrorCategory.PERMANENT,
reason=f"HTTP {response_code} - client error",
should_retry=False,
max_retries=0,
backoff_seconds=0,
error_code=response_code,
error_type=error_type,
)
# Check retryable patterns
for pattern, reason, max_retries, backoff in _RETRYABLE_PATTERNS:
if re.search(pattern, error_str, re.IGNORECASE):
return ErrorClassification(
category=ErrorCategory.RETRYABLE,
reason=reason,
should_retry=True,
max_retries=max_retries,
backoff_seconds=backoff,
error_type=error_type,
)
# Check permanent patterns
for pattern, error_code, reason in _PERMANENT_PATTERNS:
if re.search(pattern, error_str, re.IGNORECASE):
return ErrorClassification(
category=ErrorCategory.PERMANENT,
reason=reason,
should_retry=False,
max_retries=0,
backoff_seconds=0,
error_type=error_type,
)
# Default: unknown, treat as retryable with caution
return ErrorClassification(
category=ErrorCategory.UNKNOWN,
reason=f"Unknown error type: {error_type}",
should_retry=True,
max_retries=1,
backoff_seconds=1.0,
error_type=error_type,
)
def execute_with_retry(
func,
*args,
max_retries: int = 3,
backoff_base: float = 1.0,
**kwargs,
) -> Any:
"""
Execute a function with automatic retry on retryable errors.
Args:
func: Function to execute
*args: Function arguments
max_retries: Maximum retry attempts
backoff_base: Base backoff time in seconds
**kwargs: Function keyword arguments
Returns:
Function result
Raises:
Exception: If permanent error or max retries exceeded
"""
last_error = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_error = e
# Classify the error
classification = classify_error(e)
logger.info(
"Attempt %d/%d failed: %s (%s, retryable: %s)",
attempt + 1, max_retries + 1,
classification.reason,
classification.category.value,
classification.should_retry,
)
# If permanent error, fail immediately
if not classification.should_retry:
logger.error("Permanent error: %s", classification.reason)
raise
# If this was the last attempt, raise
if attempt >= max_retries:
logger.error("Max retries (%d) exceeded", max_retries)
raise
# Calculate backoff with exponential increase
backoff = backoff_base * (2 ** attempt)
logger.info("Retrying in %.1fs...", backoff)
time.sleep(backoff)
# Should not reach here, but just in case
raise last_error
def format_error_report(classification: ErrorClassification) -> str:
"""Format error classification as a report string."""
icon = "🔄" if classification.should_retry else ""
return f"{icon} {classification.category.value}: {classification.reason}"