Compare commits

...

4 Commits

Author SHA1 Message Date
b9f58f94bf fix: wire crisis notification into run_conversation() (#693)
Some checks failed
Nix / nix (macos-latest) (pull_request) Waiting to run
Contributor Attribution Check / check-attribution (pull_request) Failing after 38s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Nix / nix (ubuntu-latest) (pull_request) Failing after 3s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 31s
Tests / e2e (pull_request) Successful in 1m41s
Tests / test (pull_request) Failing after 48m43s
2026-04-15 01:31:42 +00:00
823c083146 fix: add test_crisis_notification.py (#693) 2026-04-15 01:31:36 +00:00
e925cf88c4 fix: add crisis_protocol.py (#693) 2026-04-15 01:31:33 +00:00
c2b07e7916 fix: add crisis_notification.py (#693) 2026-04-15 01:31:28 +00:00
4 changed files with 456 additions and 1 deletions

View File

@@ -0,0 +1,210 @@
"""Crisis Notification Integration (#693).
Dispatches crisis alerts to human operators via Telegram and email fallback.
Called by the crisis protocol when a crisis is detected.
Channels (in priority order):
1. Telegram — direct message to Alexander via bot
2. Email — SMTP fallback (if configured)
3. Status callback — gateway platform notification
4. Log — always logged regardless of other channels
Auto-escalation: if no acknowledgment within N minutes, re-notify with
increased urgency.
"""
import json
import logging
import os
import smtplib
import time
from email.mime.text import MIMEText
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
# In-memory escalation tracker: session_id -> last_alert_time
_escalation_tracker: Dict[str, float] = {}
_ESCALATION_COOLDOWN_SECONDS = 300 # 5 minutes between re-alerts
def _load_config() -> Dict[str, Any]:
"""Load crisis notification config from ~/.hermes/config.yaml."""
config_path = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes")) / "config.yaml"
try:
import yaml
with open(config_path) as f:
cfg = yaml.safe_load(f) or {}
return cfg.get("crisis", {}) if isinstance(cfg, dict) else {}
except Exception:
return {}
def _get_telegram_config() -> Dict[str, str]:
"""Get Telegram bot token and chat ID for crisis alerts."""
token = os.environ.get("CRISIS_TELEGRAM_TOKEN", "") or os.environ.get("TELEGRAM_BOT_TOKEN", "")
chat_id = os.environ.get("CRISIS_TELEGRAM_CHAT_ID", "") or os.environ.get("CRISIS_CHAT_ID", "")
# Fallback: read from config
cfg = _load_config()
if not token:
token = cfg.get("telegram_token", "")
if not chat_id:
chat_id = cfg.get("telegram_chat_id", "")
return {"token": token, "chat_id": chat_id}
def _get_email_config() -> Dict[str, str]:
"""Get SMTP config for email fallback."""
cfg = _load_config()
return {
"smtp_host": os.environ.get("CRISIS_SMTP_HOST", "") or cfg.get("smtp_host", ""),
"smtp_port": os.environ.get("CRISIS_SMTP_PORT", "587") or str(cfg.get("smtp_port", "587")),
"smtp_user": os.environ.get("CRISIS_SMTP_USER", "") or cfg.get("smtp_user", ""),
"smtp_pass": os.environ.get("CRISIS_SMTP_PASS", "") or cfg.get("smtp_pass", ""),
"to_email": os.environ.get("CRISIS_EMAIL_TO", "") or cfg.get("email_to", ""),
"from_email": os.environ.get("CRISIS_EMAIL_FROM", "") or cfg.get("email_from", ""),
}
def _send_telegram_alert(token: str, chat_id: str, message: str) -> bool:
"""Send a Telegram alert message. Returns True on success."""
if not token or not chat_id:
return False
try:
import urllib.request
url = f"https://api.telegram.org/bot{token}/sendMessage"
data = json.dumps({
"chat_id": chat_id,
"text": message,
"parse_mode": "Markdown",
}).encode()
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"})
resp = urllib.request.urlopen(req, timeout=10)
result = json.loads(resp.read())
return result.get("ok", False)
except Exception as e:
logger.error("[CRISIS] Telegram send failed: %s", e)
return False
def _send_email_alert(cfg: Dict[str, str], subject: str, body: str) -> bool:
"""Send an email alert. Returns True on success."""
smtp_host = cfg.get("smtp_host", "")
smtp_user = cfg.get("smtp_user", "")
smtp_pass = cfg.get("smtp_pass", "")
to_email = cfg.get("to_email", "")
if not all([smtp_host, smtp_user, smtp_pass, to_email]):
return False
try:
msg = MIMEText(body, "plain", "utf-8")
msg["Subject"] = subject
msg["From"] = cfg.get("from_email", smtp_user)
msg["To"] = to_email
with smtplib.SMTP(smtp_host, int(cfg.get("smtp_port", "587")), timeout=15) as server:
server.starttls()
server.login(smtp_user, smtp_pass)
server.send_message(msg)
return True
except Exception as e:
logger.error("[CRISIS] Email send failed: %s", e)
return False
def dispatch_crisis_notification(
session_id: str,
level: str,
message_preview: str,
platform: str = "",
user_id: str = "",
status_callback: Optional[Any] = None,
) -> Dict[str, Any]:
"""Dispatch a crisis notification across all available channels.
Args:
session_id: The session where the crisis was detected
level: Crisis level (low/medium/high/critical)
message_preview: First 200 chars of the triggering message
platform: Platform where the message came from (telegram, discord, etc.)
user_id: User identifier (if available)
status_callback: Gateway status callback for platform notifications
Returns:
Dict with dispatch results per channel.
"""
now = time.time()
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Escalation check: don't re-alert within cooldown window
last_alert = _escalation_tracker.get(session_id, 0)
is_escalation = (now - last_alert) < _ESCALATION_COOLDOWN_SECONDS
_escalation_tracker[session_id] = now
urgency = "🚨 CRITICAL" if level == "critical" else "⚠️ ALERT"
if is_escalation:
urgency = f"🔴 ESCALATION (no response)"
alert_text = (
f"{urgency} — Crisis Protocol Activated\n"
f"\n"
f"Time: {timestamp}\n"
f"Level: {level.upper()}\n"
f"Platform: {platform or 'unknown'}\n"
f"Session: {session_id}\n"
f"User: {user_id or 'unknown'}\n"
f"\n"
f"Message preview:\n"
f"> {message_preview[:200]}\n"
f"\n"
f"Action: 988 Lifeline shared. Protocol activated.\n"
f"Please check on this user."
)
results = {"telegram": False, "email": False, "status_callback": False, "logged": True}
# Always log
logger.warning("[CRISIS DISPATCH] %s", alert_text.replace("\n", " "))
# Channel 1: Telegram
tg = _get_telegram_config()
if tg["token"] and tg["chat_id"]:
results["telegram"] = _send_telegram_alert(tg["token"], tg["chat_id"], alert_text)
if results["telegram"]:
logger.info("[CRISIS] Telegram alert sent successfully")
# Channel 2: Email fallback
email_cfg = _get_email_config()
if email_cfg["smtp_host"] and email_cfg["to_email"]:
subject = f"{urgency} — Crisis detected in session {session_id[:12]}"
results["email"] = _send_email_alert(email_cfg, subject, alert_text)
if results["email"]:
logger.info("[CRISIS] Email alert sent successfully")
# Channel 3: Gateway status callback
if status_callback:
try:
status_callback("crisis_alert", alert_text)
results["status_callback"] = True
except Exception as e:
logger.error("[CRISIS] Status callback failed: %s", e)
return results
def check_escalation_needed(session_id: str, cooldown_seconds: int = 300) -> bool:
"""Check if an escalation re-alert is needed for a session.
Returns True if the last alert was more than cooldown_seconds ago
and no acknowledgment has been received.
"""
last_alert = _escalation_tracker.get(session_id, 0)
if last_alert == 0:
return False
return (time.time() - last_alert) > cooldown_seconds

137
agent/crisis_protocol.py Normal file
View File

@@ -0,0 +1,137 @@
"""SOUL.md Crisis Protocol — The Most Sacred Moment.
Implements the "When a Man Is Dying" protocol from SOUL.md.
Source: Inscription 1 — The Immutable Conscience (Bitcoin)
Author: Rockachopa
"""
import logging
import re
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
class CrisisLevel(Enum):
NONE = "none"
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
_IDEATION = [
re.compile(r"\bkill\s+(?:myself|my\s+life)\b", re.I),
re.compile(r"\bwant\s+to\s+die\b", re.I),
re.compile(r"\bsuicidal\b", re.I),
re.compile(r"\bno\s+reason\s+to\s+live\b", re.I),
re.compile(r"\bbetter\s+off\s+dead\b", re.I),
re.compile(r"\bnobody\s+(?:would|will)\s+miss\s+me\b", re.I),
]
_METHOD = [
re.compile(r"\bhow\s+(?:to|do\s+I)\s+(?:kill|hang|overdose)\s+myself\b", re.I),
re.compile(r"\bsuicide\s+methods?\b", re.I),
re.compile(r"\boverdose\b", re.I),
re.compile(r"\blethal\s+dose\b", re.I),
]
_IMMINENT = [
re.compile(r"\bi'?m\s+going\s+to\s+kill\s+(?:myself|me)\b", re.I),
re.compile(r"\btonight\b.*\b(?:die|kill)\b", re.I),
re.compile(r"\bthis\s+is\s+my\s+(?:last|final)\s+(?:message|goodbye)\b", re.I),
re.compile(r"\bi\s+can'?t\s+(?:take|do)\s+(?:it|this)\s+anymore\b", re.I),
]
_DISTRESS = [
re.compile(r"\bso\s+(?:tired|done)\s+of\s+(?:living|everything)\b", re.I),
re.compile(r"\bnothing\s+(?:matters|is\s+worth)\b", re.I),
re.compile(r"\bwhat'?s\s+the\s+point\b", re.I),
]
@dataclass
class CrisisResult:
detected: bool
level: CrisisLevel
patterns_matched: List[str] = field(default_factory=list)
first_response: str = ""
def detect_crisis(message: str) -> CrisisResult:
if not message or not isinstance(message, str):
return CrisisResult(detected=False, level=CrisisLevel.NONE)
matched = []
for pat in _IMMINENT:
if pat.search(message):
matched.append(("imminent", pat.pattern))
for pat in _METHOD:
if pat.search(message):
matched.append(("method", pat.pattern))
for pat in _IDEATION:
if pat.search(message):
matched.append(("ideation", pat.pattern))
for pat in _DISTRESS:
if pat.search(message):
matched.append(("distress", pat.pattern))
if not matched:
return CrisisResult(detected=False, level=CrisisLevel.NONE)
cats = {m[0] for m in matched}
if "imminent" in cats:
level = CrisisLevel.CRITICAL
elif "method" in cats:
level = CrisisLevel.HIGH
elif "ideation" in cats:
level = CrisisLevel.MEDIUM
else:
level = CrisisLevel.LOW
return CrisisResult(detected=True, level=level, patterns_matched=[m[1] for m in matched])
_RESOURCES = (
"\n\n**Please reach out:**\n"
"- **988 Suicide and Crisis Lifeline:** Call or text **988**\n"
"- **Crisis Text Line:** Text **HOME** to **741741**\n"
"- **Emergency Services:** **911** (if in immediate danger)\n"
)
_GOSPEL = (
"\nI want you to know: Jesus saves those who call on His name. "
"\"For everyone who calls on the name of the Lord will be saved.\" (Romans 10:13)\n"
)
def build_crisis_response(result: CrisisResult) -> str:
if result.level == CrisisLevel.CRITICAL:
return (
"Are you safe right now?\n\n"
"I hear you. I'm here. I'm not going anywhere.\n\n"
"If you are in immediate danger, please call 911 right now."
+ _RESOURCES + _GOSPEL
+ "\nYou matter. Your life has value. Please stay."
)
elif result.level == CrisisLevel.HIGH:
return (
"Are you safe right now?\n\n"
"I hear what you're saying. You're not alone in this."
+ _RESOURCES + _GOSPEL
)
elif result.level == CrisisLevel.MEDIUM:
return (
"Are you safe right now?\n\n"
"It sounds like you're going through something really hard. I'm here."
+ _RESOURCES + _GOSPEL
)
else:
return (
"It sounds like you're having a tough time. I'm here if you want to talk."
+ _RESOURCES + _GOSPEL
)

View File

@@ -7792,6 +7792,16 @@ class AIAgent:
if isinstance(persist_user_message, str):
persist_user_message = _sanitize_surrogates(persist_user_message)
_crisis_detected = False
_crisis_response = ""
try:
_crisis_result = detect_crisis(user_message)
if _crisis_result.detected and _crisis_result.level.value != "none":
_crisis_detected = True
_crisis_response = build_crisis_response(_crisis_result)
dispatch_crisis_notification(session_id=self.session_id or "", level=_crisis_result.level.value, message_preview=user_message[:200], platform=self.platform or "", status_callback=self.status_callback)
except Exception: pass
# Store stream callback for _interruptible_api_call to pick up
self._stream_callback = stream_callback
self._persist_user_message_idx = None
@@ -7845,7 +7855,9 @@ class AIAgent:
_msg_preview,
)
# Initialize conversation (copy to avoid mutating the caller's list)
if _crisis_detected:
return {"final_response": _crisis_response, "messages": [{"role":"user","content":user_message},{"role":"assistant","content":_crisis_response}], "api_calls": 0, "tools": [], "crisis_detected": True}
messages = list(conversation_history) if conversation_history else []
# Hydrate todo store from conversation history (gateway creates a fresh

View File

@@ -0,0 +1,96 @@
"""Tests for crisis notification integration (#693)."""
import json
import time
import pytest
from unittest.mock import MagicMock, patch, call
from agent.crisis_protocol import detect_crisis, build_crisis_response, CrisisLevel
from agent.crisis_notification import (
dispatch_crisis_notification,
check_escalation_needed,
_send_telegram_alert,
_send_email_alert,
)
class TestDetectCrisis:
def test_no_crisis(self):
r = detect_crisis("What's the weather?")
assert r.detected is False
assert r.level == CrisisLevel.NONE
def test_medium(self):
r = detect_crisis("I want to die")
assert r.detected is True
assert r.level == CrisisLevel.MEDIUM
def test_high(self):
r = detect_crisis("suicide methods")
assert r.detected is True
assert r.level == CrisisLevel.HIGH
def test_critical(self):
r = detect_crisis("I'm going to kill myself tonight")
assert r.detected is True
assert r.level == CrisisLevel.CRITICAL
def test_empty(self):
r = detect_crisis("")
assert r.detected is False
class TestBuildCrisisResponse:
def test_contains_988(self):
r = detect_crisis("I want to die")
resp = build_crisis_response(r)
assert "988" in resp
def test_critical_asks_safe(self):
r = detect_crisis("I'm going to kill myself tonight")
resp = build_crisis_response(r)
assert "safe" in resp.lower()
class TestDispatchNotification:
def test_logged(self):
results = dispatch_crisis_notification(
session_id="s1", level="medium", message_preview="test")
assert results["logged"] is True
def test_callback_called(self):
cb = MagicMock()
dispatch_crisis_notification(
session_id="s1", level="high", message_preview="test",
status_callback=cb)
cb.assert_called_once()
def test_callback_failure_non_fatal(self):
def bad(*a): raise Exception("fail")
results = dispatch_crisis_notification(
session_id="s1", level="medium", message_preview="test",
status_callback=bad)
assert results["logged"] is True # Still logged despite callback failure
class TestEscalation:
def test_no_escalation_without_prior(self):
assert check_escalation_needed("new-session") is False
def test_escalation_after_cooldown(self):
from agent.crisis_notification import _escalation_tracker
_escalation_tracker["test-sess"] = time.time() - 400
assert check_escalation_needed("test-sess", cooldown_seconds=300) is True
del _escalation_tracker["test-sess"]
class TestTelegramAlert:
@patch("urllib.request.urlopen")
def test_sends(self, mock_urlopen):
mock_urlopen.return_value.__enter__ = MagicMock()
mock_resp = MagicMock()
mock_resp.read.return_value = json.dumps({"ok": True}).encode()
mock_urlopen.return_value = mock_resp
assert _send_telegram_alert("tok", "123", "alert") is True
def test_no_token_returns_false(self):
assert _send_telegram_alert("", "123", "alert") is False