Files
hermes-agent/agent/crisis_notification.py

211 lines
7.4 KiB
Python

"""Crisis Notification Integration (#693).
Dispatches crisis alerts to human operators via Telegram and email fallback.
Called by the crisis protocol when a crisis is detected.
Channels (in priority order):
1. Telegram — direct message to Alexander via bot
2. Email — SMTP fallback (if configured)
3. Status callback — gateway platform notification
4. Log — always logged regardless of other channels
Auto-escalation: if no acknowledgment within N minutes, re-notify with
increased urgency.
"""
import json
import logging
import os
import smtplib
import time
from email.mime.text import MIMEText
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
# In-memory escalation tracker: session_id -> last_alert_time
_escalation_tracker: Dict[str, float] = {}
_ESCALATION_COOLDOWN_SECONDS = 300 # 5 minutes between re-alerts
def _load_config() -> Dict[str, Any]:
"""Load crisis notification config from ~/.hermes/config.yaml."""
config_path = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes")) / "config.yaml"
try:
import yaml
with open(config_path) as f:
cfg = yaml.safe_load(f) or {}
return cfg.get("crisis", {}) if isinstance(cfg, dict) else {}
except Exception:
return {}
def _get_telegram_config() -> Dict[str, str]:
"""Get Telegram bot token and chat ID for crisis alerts."""
token = os.environ.get("CRISIS_TELEGRAM_TOKEN", "") or os.environ.get("TELEGRAM_BOT_TOKEN", "")
chat_id = os.environ.get("CRISIS_TELEGRAM_CHAT_ID", "") or os.environ.get("CRISIS_CHAT_ID", "")
# Fallback: read from config
cfg = _load_config()
if not token:
token = cfg.get("telegram_token", "")
if not chat_id:
chat_id = cfg.get("telegram_chat_id", "")
return {"token": token, "chat_id": chat_id}
def _get_email_config() -> Dict[str, str]:
"""Get SMTP config for email fallback."""
cfg = _load_config()
return {
"smtp_host": os.environ.get("CRISIS_SMTP_HOST", "") or cfg.get("smtp_host", ""),
"smtp_port": os.environ.get("CRISIS_SMTP_PORT", "587") or str(cfg.get("smtp_port", "587")),
"smtp_user": os.environ.get("CRISIS_SMTP_USER", "") or cfg.get("smtp_user", ""),
"smtp_pass": os.environ.get("CRISIS_SMTP_PASS", "") or cfg.get("smtp_pass", ""),
"to_email": os.environ.get("CRISIS_EMAIL_TO", "") or cfg.get("email_to", ""),
"from_email": os.environ.get("CRISIS_EMAIL_FROM", "") or cfg.get("email_from", ""),
}
def _send_telegram_alert(token: str, chat_id: str, message: str) -> bool:
"""Send a Telegram alert message. Returns True on success."""
if not token or not chat_id:
return False
try:
import urllib.request
url = f"https://api.telegram.org/bot{token}/sendMessage"
data = json.dumps({
"chat_id": chat_id,
"text": message,
"parse_mode": "Markdown",
}).encode()
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"})
resp = urllib.request.urlopen(req, timeout=10)
result = json.loads(resp.read())
return result.get("ok", False)
except Exception as e:
logger.error("[CRISIS] Telegram send failed: %s", e)
return False
def _send_email_alert(cfg: Dict[str, str], subject: str, body: str) -> bool:
"""Send an email alert. Returns True on success."""
smtp_host = cfg.get("smtp_host", "")
smtp_user = cfg.get("smtp_user", "")
smtp_pass = cfg.get("smtp_pass", "")
to_email = cfg.get("to_email", "")
if not all([smtp_host, smtp_user, smtp_pass, to_email]):
return False
try:
msg = MIMEText(body, "plain", "utf-8")
msg["Subject"] = subject
msg["From"] = cfg.get("from_email", smtp_user)
msg["To"] = to_email
with smtplib.SMTP(smtp_host, int(cfg.get("smtp_port", "587")), timeout=15) as server:
server.starttls()
server.login(smtp_user, smtp_pass)
server.send_message(msg)
return True
except Exception as e:
logger.error("[CRISIS] Email send failed: %s", e)
return False
def dispatch_crisis_notification(
session_id: str,
level: str,
message_preview: str,
platform: str = "",
user_id: str = "",
status_callback: Optional[Any] = None,
) -> Dict[str, Any]:
"""Dispatch a crisis notification across all available channels.
Args:
session_id: The session where the crisis was detected
level: Crisis level (low/medium/high/critical)
message_preview: First 200 chars of the triggering message
platform: Platform where the message came from (telegram, discord, etc.)
user_id: User identifier (if available)
status_callback: Gateway status callback for platform notifications
Returns:
Dict with dispatch results per channel.
"""
now = time.time()
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Escalation check: don't re-alert within cooldown window
last_alert = _escalation_tracker.get(session_id, 0)
is_escalation = (now - last_alert) < _ESCALATION_COOLDOWN_SECONDS
_escalation_tracker[session_id] = now
urgency = "🚨 CRITICAL" if level == "critical" else "⚠️ ALERT"
if is_escalation:
urgency = f"🔴 ESCALATION (no response)"
alert_text = (
f"{urgency} — Crisis Protocol Activated\n"
f"\n"
f"Time: {timestamp}\n"
f"Level: {level.upper()}\n"
f"Platform: {platform or 'unknown'}\n"
f"Session: {session_id}\n"
f"User: {user_id or 'unknown'}\n"
f"\n"
f"Message preview:\n"
f"> {message_preview[:200]}\n"
f"\n"
f"Action: 988 Lifeline shared. Protocol activated.\n"
f"Please check on this user."
)
results = {"telegram": False, "email": False, "status_callback": False, "logged": True}
# Always log
logger.warning("[CRISIS DISPATCH] %s", alert_text.replace("\n", " "))
# Channel 1: Telegram
tg = _get_telegram_config()
if tg["token"] and tg["chat_id"]:
results["telegram"] = _send_telegram_alert(tg["token"], tg["chat_id"], alert_text)
if results["telegram"]:
logger.info("[CRISIS] Telegram alert sent successfully")
# Channel 2: Email fallback
email_cfg = _get_email_config()
if email_cfg["smtp_host"] and email_cfg["to_email"]:
subject = f"{urgency} — Crisis detected in session {session_id[:12]}"
results["email"] = _send_email_alert(email_cfg, subject, alert_text)
if results["email"]:
logger.info("[CRISIS] Email alert sent successfully")
# Channel 3: Gateway status callback
if status_callback:
try:
status_callback("crisis_alert", alert_text)
results["status_callback"] = True
except Exception as e:
logger.error("[CRISIS] Status callback failed: %s", e)
return results
def check_escalation_needed(session_id: str, cooldown_seconds: int = 300) -> bool:
"""Check if an escalation re-alert is needed for a session.
Returns True if the last alert was more than cooldown_seconds ago
and no acknowledgment has been received.
"""
last_alert = _escalation_tracker.get(session_id, 0)
if last_alert == 0:
return False
return (time.time() - last_alert) > cooldown_seconds