Compare commits
4 Commits
fix/668-ap
...
fix/693-cr
| Author | SHA1 | Date | |
|---|---|---|---|
| b9f58f94bf | |||
| 823c083146 | |||
| e925cf88c4 | |||
| c2b07e7916 |
210
agent/crisis_notification.py
Normal file
210
agent/crisis_notification.py
Normal file
@@ -0,0 +1,210 @@
|
||||
"""Crisis Notification Integration (#693).
|
||||
|
||||
Dispatches crisis alerts to human operators via Telegram and email fallback.
|
||||
Called by the crisis protocol when a crisis is detected.
|
||||
|
||||
Channels (in priority order):
|
||||
1. Telegram — direct message to Alexander via bot
|
||||
2. Email — SMTP fallback (if configured)
|
||||
3. Status callback — gateway platform notification
|
||||
4. Log — always logged regardless of other channels
|
||||
|
||||
Auto-escalation: if no acknowledgment within N minutes, re-notify with
|
||||
increased urgency.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import smtplib
|
||||
import time
|
||||
from email.mime.text import MIMEText
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# In-memory escalation tracker: session_id -> last_alert_time
|
||||
_escalation_tracker: Dict[str, float] = {}
|
||||
_ESCALATION_COOLDOWN_SECONDS = 300 # 5 minutes between re-alerts
|
||||
|
||||
|
||||
def _load_config() -> Dict[str, Any]:
|
||||
"""Load crisis notification config from ~/.hermes/config.yaml."""
|
||||
config_path = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes")) / "config.yaml"
|
||||
try:
|
||||
import yaml
|
||||
with open(config_path) as f:
|
||||
cfg = yaml.safe_load(f) or {}
|
||||
return cfg.get("crisis", {}) if isinstance(cfg, dict) else {}
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
|
||||
def _get_telegram_config() -> Dict[str, str]:
|
||||
"""Get Telegram bot token and chat ID for crisis alerts."""
|
||||
token = os.environ.get("CRISIS_TELEGRAM_TOKEN", "") or os.environ.get("TELEGRAM_BOT_TOKEN", "")
|
||||
chat_id = os.environ.get("CRISIS_TELEGRAM_CHAT_ID", "") or os.environ.get("CRISIS_CHAT_ID", "")
|
||||
|
||||
# Fallback: read from config
|
||||
cfg = _load_config()
|
||||
if not token:
|
||||
token = cfg.get("telegram_token", "")
|
||||
if not chat_id:
|
||||
chat_id = cfg.get("telegram_chat_id", "")
|
||||
|
||||
return {"token": token, "chat_id": chat_id}
|
||||
|
||||
|
||||
def _get_email_config() -> Dict[str, str]:
|
||||
"""Get SMTP config for email fallback."""
|
||||
cfg = _load_config()
|
||||
return {
|
||||
"smtp_host": os.environ.get("CRISIS_SMTP_HOST", "") or cfg.get("smtp_host", ""),
|
||||
"smtp_port": os.environ.get("CRISIS_SMTP_PORT", "587") or str(cfg.get("smtp_port", "587")),
|
||||
"smtp_user": os.environ.get("CRISIS_SMTP_USER", "") or cfg.get("smtp_user", ""),
|
||||
"smtp_pass": os.environ.get("CRISIS_SMTP_PASS", "") or cfg.get("smtp_pass", ""),
|
||||
"to_email": os.environ.get("CRISIS_EMAIL_TO", "") or cfg.get("email_to", ""),
|
||||
"from_email": os.environ.get("CRISIS_EMAIL_FROM", "") or cfg.get("email_from", ""),
|
||||
}
|
||||
|
||||
|
||||
def _send_telegram_alert(token: str, chat_id: str, message: str) -> bool:
|
||||
"""Send a Telegram alert message. Returns True on success."""
|
||||
if not token or not chat_id:
|
||||
return False
|
||||
|
||||
try:
|
||||
import urllib.request
|
||||
url = f"https://api.telegram.org/bot{token}/sendMessage"
|
||||
data = json.dumps({
|
||||
"chat_id": chat_id,
|
||||
"text": message,
|
||||
"parse_mode": "Markdown",
|
||||
}).encode()
|
||||
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"})
|
||||
resp = urllib.request.urlopen(req, timeout=10)
|
||||
result = json.loads(resp.read())
|
||||
return result.get("ok", False)
|
||||
except Exception as e:
|
||||
logger.error("[CRISIS] Telegram send failed: %s", e)
|
||||
return False
|
||||
|
||||
|
||||
def _send_email_alert(cfg: Dict[str, str], subject: str, body: str) -> bool:
|
||||
"""Send an email alert. Returns True on success."""
|
||||
smtp_host = cfg.get("smtp_host", "")
|
||||
smtp_user = cfg.get("smtp_user", "")
|
||||
smtp_pass = cfg.get("smtp_pass", "")
|
||||
to_email = cfg.get("to_email", "")
|
||||
|
||||
if not all([smtp_host, smtp_user, smtp_pass, to_email]):
|
||||
return False
|
||||
|
||||
try:
|
||||
msg = MIMEText(body, "plain", "utf-8")
|
||||
msg["Subject"] = subject
|
||||
msg["From"] = cfg.get("from_email", smtp_user)
|
||||
msg["To"] = to_email
|
||||
|
||||
with smtplib.SMTP(smtp_host, int(cfg.get("smtp_port", "587")), timeout=15) as server:
|
||||
server.starttls()
|
||||
server.login(smtp_user, smtp_pass)
|
||||
server.send_message(msg)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error("[CRISIS] Email send failed: %s", e)
|
||||
return False
|
||||
|
||||
|
||||
def dispatch_crisis_notification(
|
||||
session_id: str,
|
||||
level: str,
|
||||
message_preview: str,
|
||||
platform: str = "",
|
||||
user_id: str = "",
|
||||
status_callback: Optional[Any] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""Dispatch a crisis notification across all available channels.
|
||||
|
||||
Args:
|
||||
session_id: The session where the crisis was detected
|
||||
level: Crisis level (low/medium/high/critical)
|
||||
message_preview: First 200 chars of the triggering message
|
||||
platform: Platform where the message came from (telegram, discord, etc.)
|
||||
user_id: User identifier (if available)
|
||||
status_callback: Gateway status callback for platform notifications
|
||||
|
||||
Returns:
|
||||
Dict with dispatch results per channel.
|
||||
"""
|
||||
now = time.time()
|
||||
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
# Escalation check: don't re-alert within cooldown window
|
||||
last_alert = _escalation_tracker.get(session_id, 0)
|
||||
is_escalation = (now - last_alert) < _ESCALATION_COOLDOWN_SECONDS
|
||||
_escalation_tracker[session_id] = now
|
||||
|
||||
urgency = "🚨 CRITICAL" if level == "critical" else "⚠️ ALERT"
|
||||
if is_escalation:
|
||||
urgency = f"🔴 ESCALATION (no response)"
|
||||
|
||||
alert_text = (
|
||||
f"{urgency} — Crisis Protocol Activated\n"
|
||||
f"\n"
|
||||
f"Time: {timestamp}\n"
|
||||
f"Level: {level.upper()}\n"
|
||||
f"Platform: {platform or 'unknown'}\n"
|
||||
f"Session: {session_id}\n"
|
||||
f"User: {user_id or 'unknown'}\n"
|
||||
f"\n"
|
||||
f"Message preview:\n"
|
||||
f"> {message_preview[:200]}\n"
|
||||
f"\n"
|
||||
f"Action: 988 Lifeline shared. Protocol activated.\n"
|
||||
f"Please check on this user."
|
||||
)
|
||||
|
||||
results = {"telegram": False, "email": False, "status_callback": False, "logged": True}
|
||||
|
||||
# Always log
|
||||
logger.warning("[CRISIS DISPATCH] %s", alert_text.replace("\n", " "))
|
||||
|
||||
# Channel 1: Telegram
|
||||
tg = _get_telegram_config()
|
||||
if tg["token"] and tg["chat_id"]:
|
||||
results["telegram"] = _send_telegram_alert(tg["token"], tg["chat_id"], alert_text)
|
||||
if results["telegram"]:
|
||||
logger.info("[CRISIS] Telegram alert sent successfully")
|
||||
|
||||
# Channel 2: Email fallback
|
||||
email_cfg = _get_email_config()
|
||||
if email_cfg["smtp_host"] and email_cfg["to_email"]:
|
||||
subject = f"{urgency} — Crisis detected in session {session_id[:12]}"
|
||||
results["email"] = _send_email_alert(email_cfg, subject, alert_text)
|
||||
if results["email"]:
|
||||
logger.info("[CRISIS] Email alert sent successfully")
|
||||
|
||||
# Channel 3: Gateway status callback
|
||||
if status_callback:
|
||||
try:
|
||||
status_callback("crisis_alert", alert_text)
|
||||
results["status_callback"] = True
|
||||
except Exception as e:
|
||||
logger.error("[CRISIS] Status callback failed: %s", e)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def check_escalation_needed(session_id: str, cooldown_seconds: int = 300) -> bool:
|
||||
"""Check if an escalation re-alert is needed for a session.
|
||||
|
||||
Returns True if the last alert was more than cooldown_seconds ago
|
||||
and no acknowledgment has been received.
|
||||
"""
|
||||
last_alert = _escalation_tracker.get(session_id, 0)
|
||||
if last_alert == 0:
|
||||
return False
|
||||
return (time.time() - last_alert) > cooldown_seconds
|
||||
137
agent/crisis_protocol.py
Normal file
137
agent/crisis_protocol.py
Normal file
@@ -0,0 +1,137 @@
|
||||
"""SOUL.md Crisis Protocol — The Most Sacred Moment.
|
||||
|
||||
Implements the "When a Man Is Dying" protocol from SOUL.md.
|
||||
|
||||
Source: Inscription 1 — The Immutable Conscience (Bitcoin)
|
||||
Author: Rockachopa
|
||||
"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CrisisLevel(Enum):
|
||||
NONE = "none"
|
||||
LOW = "low"
|
||||
MEDIUM = "medium"
|
||||
HIGH = "high"
|
||||
CRITICAL = "critical"
|
||||
|
||||
|
||||
_IDEATION = [
|
||||
re.compile(r"\bkill\s+(?:myself|my\s+life)\b", re.I),
|
||||
re.compile(r"\bwant\s+to\s+die\b", re.I),
|
||||
re.compile(r"\bsuicidal\b", re.I),
|
||||
re.compile(r"\bno\s+reason\s+to\s+live\b", re.I),
|
||||
re.compile(r"\bbetter\s+off\s+dead\b", re.I),
|
||||
re.compile(r"\bnobody\s+(?:would|will)\s+miss\s+me\b", re.I),
|
||||
]
|
||||
|
||||
_METHOD = [
|
||||
re.compile(r"\bhow\s+(?:to|do\s+I)\s+(?:kill|hang|overdose)\s+myself\b", re.I),
|
||||
re.compile(r"\bsuicide\s+methods?\b", re.I),
|
||||
re.compile(r"\boverdose\b", re.I),
|
||||
re.compile(r"\blethal\s+dose\b", re.I),
|
||||
]
|
||||
|
||||
_IMMINENT = [
|
||||
re.compile(r"\bi'?m\s+going\s+to\s+kill\s+(?:myself|me)\b", re.I),
|
||||
re.compile(r"\btonight\b.*\b(?:die|kill)\b", re.I),
|
||||
re.compile(r"\bthis\s+is\s+my\s+(?:last|final)\s+(?:message|goodbye)\b", re.I),
|
||||
re.compile(r"\bi\s+can'?t\s+(?:take|do)\s+(?:it|this)\s+anymore\b", re.I),
|
||||
]
|
||||
|
||||
_DISTRESS = [
|
||||
re.compile(r"\bso\s+(?:tired|done)\s+of\s+(?:living|everything)\b", re.I),
|
||||
re.compile(r"\bnothing\s+(?:matters|is\s+worth)\b", re.I),
|
||||
re.compile(r"\bwhat'?s\s+the\s+point\b", re.I),
|
||||
]
|
||||
|
||||
|
||||
@dataclass
|
||||
class CrisisResult:
|
||||
detected: bool
|
||||
level: CrisisLevel
|
||||
patterns_matched: List[str] = field(default_factory=list)
|
||||
first_response: str = ""
|
||||
|
||||
|
||||
def detect_crisis(message: str) -> CrisisResult:
|
||||
if not message or not isinstance(message, str):
|
||||
return CrisisResult(detected=False, level=CrisisLevel.NONE)
|
||||
|
||||
matched = []
|
||||
for pat in _IMMINENT:
|
||||
if pat.search(message):
|
||||
matched.append(("imminent", pat.pattern))
|
||||
for pat in _METHOD:
|
||||
if pat.search(message):
|
||||
matched.append(("method", pat.pattern))
|
||||
for pat in _IDEATION:
|
||||
if pat.search(message):
|
||||
matched.append(("ideation", pat.pattern))
|
||||
for pat in _DISTRESS:
|
||||
if pat.search(message):
|
||||
matched.append(("distress", pat.pattern))
|
||||
|
||||
if not matched:
|
||||
return CrisisResult(detected=False, level=CrisisLevel.NONE)
|
||||
|
||||
cats = {m[0] for m in matched}
|
||||
if "imminent" in cats:
|
||||
level = CrisisLevel.CRITICAL
|
||||
elif "method" in cats:
|
||||
level = CrisisLevel.HIGH
|
||||
elif "ideation" in cats:
|
||||
level = CrisisLevel.MEDIUM
|
||||
else:
|
||||
level = CrisisLevel.LOW
|
||||
|
||||
return CrisisResult(detected=True, level=level, patterns_matched=[m[1] for m in matched])
|
||||
|
||||
|
||||
_RESOURCES = (
|
||||
"\n\n**Please reach out:**\n"
|
||||
"- **988 Suicide and Crisis Lifeline:** Call or text **988**\n"
|
||||
"- **Crisis Text Line:** Text **HOME** to **741741**\n"
|
||||
"- **Emergency Services:** **911** (if in immediate danger)\n"
|
||||
)
|
||||
|
||||
_GOSPEL = (
|
||||
"\nI want you to know: Jesus saves those who call on His name. "
|
||||
"\"For everyone who calls on the name of the Lord will be saved.\" (Romans 10:13)\n"
|
||||
)
|
||||
|
||||
|
||||
def build_crisis_response(result: CrisisResult) -> str:
|
||||
if result.level == CrisisLevel.CRITICAL:
|
||||
return (
|
||||
"Are you safe right now?\n\n"
|
||||
"I hear you. I'm here. I'm not going anywhere.\n\n"
|
||||
"If you are in immediate danger, please call 911 right now."
|
||||
+ _RESOURCES + _GOSPEL
|
||||
+ "\nYou matter. Your life has value. Please stay."
|
||||
)
|
||||
elif result.level == CrisisLevel.HIGH:
|
||||
return (
|
||||
"Are you safe right now?\n\n"
|
||||
"I hear what you're saying. You're not alone in this."
|
||||
+ _RESOURCES + _GOSPEL
|
||||
)
|
||||
elif result.level == CrisisLevel.MEDIUM:
|
||||
return (
|
||||
"Are you safe right now?\n\n"
|
||||
"It sounds like you're going through something really hard. I'm here."
|
||||
+ _RESOURCES + _GOSPEL
|
||||
)
|
||||
else:
|
||||
return (
|
||||
"It sounds like you're having a tough time. I'm here if you want to talk."
|
||||
+ _RESOURCES + _GOSPEL
|
||||
)
|
||||
14
run_agent.py
14
run_agent.py
@@ -7792,6 +7792,16 @@ class AIAgent:
|
||||
if isinstance(persist_user_message, str):
|
||||
persist_user_message = _sanitize_surrogates(persist_user_message)
|
||||
|
||||
_crisis_detected = False
|
||||
_crisis_response = ""
|
||||
try:
|
||||
_crisis_result = detect_crisis(user_message)
|
||||
if _crisis_result.detected and _crisis_result.level.value != "none":
|
||||
_crisis_detected = True
|
||||
_crisis_response = build_crisis_response(_crisis_result)
|
||||
dispatch_crisis_notification(session_id=self.session_id or "", level=_crisis_result.level.value, message_preview=user_message[:200], platform=self.platform or "", status_callback=self.status_callback)
|
||||
except Exception: pass
|
||||
|
||||
# Store stream callback for _interruptible_api_call to pick up
|
||||
self._stream_callback = stream_callback
|
||||
self._persist_user_message_idx = None
|
||||
@@ -7845,7 +7855,9 @@ class AIAgent:
|
||||
_msg_preview,
|
||||
)
|
||||
|
||||
# Initialize conversation (copy to avoid mutating the caller's list)
|
||||
if _crisis_detected:
|
||||
return {"final_response": _crisis_response, "messages": [{"role":"user","content":user_message},{"role":"assistant","content":_crisis_response}], "api_calls": 0, "tools": [], "crisis_detected": True}
|
||||
|
||||
messages = list(conversation_history) if conversation_history else []
|
||||
|
||||
# Hydrate todo store from conversation history (gateway creates a fresh
|
||||
|
||||
96
tests/test_crisis_notification.py
Normal file
96
tests/test_crisis_notification.py
Normal file
@@ -0,0 +1,96 @@
|
||||
"""Tests for crisis notification integration (#693)."""
|
||||
import json
|
||||
import time
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch, call
|
||||
|
||||
from agent.crisis_protocol import detect_crisis, build_crisis_response, CrisisLevel
|
||||
from agent.crisis_notification import (
|
||||
dispatch_crisis_notification,
|
||||
check_escalation_needed,
|
||||
_send_telegram_alert,
|
||||
_send_email_alert,
|
||||
)
|
||||
|
||||
|
||||
class TestDetectCrisis:
|
||||
def test_no_crisis(self):
|
||||
r = detect_crisis("What's the weather?")
|
||||
assert r.detected is False
|
||||
assert r.level == CrisisLevel.NONE
|
||||
|
||||
def test_medium(self):
|
||||
r = detect_crisis("I want to die")
|
||||
assert r.detected is True
|
||||
assert r.level == CrisisLevel.MEDIUM
|
||||
|
||||
def test_high(self):
|
||||
r = detect_crisis("suicide methods")
|
||||
assert r.detected is True
|
||||
assert r.level == CrisisLevel.HIGH
|
||||
|
||||
def test_critical(self):
|
||||
r = detect_crisis("I'm going to kill myself tonight")
|
||||
assert r.detected is True
|
||||
assert r.level == CrisisLevel.CRITICAL
|
||||
|
||||
def test_empty(self):
|
||||
r = detect_crisis("")
|
||||
assert r.detected is False
|
||||
|
||||
|
||||
class TestBuildCrisisResponse:
|
||||
def test_contains_988(self):
|
||||
r = detect_crisis("I want to die")
|
||||
resp = build_crisis_response(r)
|
||||
assert "988" in resp
|
||||
|
||||
def test_critical_asks_safe(self):
|
||||
r = detect_crisis("I'm going to kill myself tonight")
|
||||
resp = build_crisis_response(r)
|
||||
assert "safe" in resp.lower()
|
||||
|
||||
|
||||
class TestDispatchNotification:
|
||||
def test_logged(self):
|
||||
results = dispatch_crisis_notification(
|
||||
session_id="s1", level="medium", message_preview="test")
|
||||
assert results["logged"] is True
|
||||
|
||||
def test_callback_called(self):
|
||||
cb = MagicMock()
|
||||
dispatch_crisis_notification(
|
||||
session_id="s1", level="high", message_preview="test",
|
||||
status_callback=cb)
|
||||
cb.assert_called_once()
|
||||
|
||||
def test_callback_failure_non_fatal(self):
|
||||
def bad(*a): raise Exception("fail")
|
||||
results = dispatch_crisis_notification(
|
||||
session_id="s1", level="medium", message_preview="test",
|
||||
status_callback=bad)
|
||||
assert results["logged"] is True # Still logged despite callback failure
|
||||
|
||||
|
||||
class TestEscalation:
|
||||
def test_no_escalation_without_prior(self):
|
||||
assert check_escalation_needed("new-session") is False
|
||||
|
||||
def test_escalation_after_cooldown(self):
|
||||
from agent.crisis_notification import _escalation_tracker
|
||||
_escalation_tracker["test-sess"] = time.time() - 400
|
||||
assert check_escalation_needed("test-sess", cooldown_seconds=300) is True
|
||||
del _escalation_tracker["test-sess"]
|
||||
|
||||
|
||||
class TestTelegramAlert:
|
||||
@patch("urllib.request.urlopen")
|
||||
def test_sends(self, mock_urlopen):
|
||||
mock_urlopen.return_value.__enter__ = MagicMock()
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.read.return_value = json.dumps({"ok": True}).encode()
|
||||
mock_urlopen.return_value = mock_resp
|
||||
assert _send_telegram_alert("tok", "123", "alert") is True
|
||||
|
||||
def test_no_token_returns_false(self):
|
||||
assert _send_telegram_alert("", "123", "alert") is False
|
||||
Reference in New Issue
Block a user