Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
0b7976c443 feat: Matrix crisis room with SOUL.md protocol (#273)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 55s
Implements the crisis room for Matrix with the SOUL.md 'When a Man
Is Dying' protocol. When a person in despair reaches out, the agent
responds with the sacred protocol:

1. 'Are you safe right now?' — immediate presence
2. Gospel: Jesus saves those who call on His name
3. Resources: 988, Crisis Text Line, 911
4. 'I am here with you. I will not disconnect.'

agent/crisis_room.py:
  - CrisisDetector wraps ShieldDetector for signal detection
  - CrisisLevel enum: NONE/LOW/MEDIUM/HIGH/CRITICAL
  - CrisisRoomManager: designate rooms, check messages, log events
  - Events logged to ~/.hermes/crisis_logs/YYYY-MM-DD.jsonl
  - State persisted to ~/.hermes/crisis_rooms.json

tools/crisis_room_tool.py:
  - 5 actions: designate, remove, check, list, events

gateway/platforms/matrix.py:
  - Hook in _on_room_message checks every message for crisis signals
  - Crisis detected -> SOUL.md protocol messages sent, normal flow skipped
  - Non-blocking: errors in crisis check don't prevent normal operation

12 tests added, all passing.

Closes #273
2026-04-13 20:36:13 -04:00
8 changed files with 701 additions and 434 deletions

View File

@@ -1,226 +0,0 @@
#!/usr/bin/env python3
"""
Pre-commit hook for detecting hardcoded ~/.hermes paths.
This is a poka-yoke (error-proofing) measure to prevent profile isolation
failures. All code should use get_hermes_home() from hermes_constants instead
of hardcoding ~/.hermes or Path.home() / ".hermes".
Installation:
git config core.hooksPath .githooks
To bypass:
git commit --no-verify
"""
from __future__ import annotations
import re
import subprocess
import sys
from pathlib import Path
from typing import Iterable, List
# ANSI color codes
RED = "\033[0;31m"
YELLOW = "\033[1;33m"
GREEN = "\033[0;32m"
NC = "\033[0m"
class Finding:
"""Represents a single hardcoded path finding."""
def __init__(self, filename: str, line: int, message: str, suggestion: str = "") -> None:
self.filename = filename
self.line = line
self.message = message
self.suggestion = suggestion
def __repr__(self) -> str:
return f"Finding({self.filename!r}, {self.line}, {self.message!r})"
# ---------------------------------------------------------------------------
# Regex patterns for hardcoded paths
# ---------------------------------------------------------------------------
# Pattern 1: Path.home() / ".hermes" or Path.home() / '.hermes'
_RE_PATH_HOME_HERMES = re.compile(
r"""Path\.home\(\)\s*/\s*['"]\.hermes['"]"""
)
# Pattern 2: Path.home() / ".hermes" / something
_RE_PATH_HOME_HERMES_SUB = re.compile(
r"""Path\.home\(\)\s*/\s*['"]\.hermes['"]\s*/"""
)
# Pattern 3: ~/.hermes in strings (but not in comments or docs)
_RE_TILDE_HERMES = re.compile(
r"""['"]~/?\.hermes(/|['"])"""
)
# Pattern 4: os.path.expanduser("~/.hermes")
_RE_EXPANDUSER_HERMES = re.compile(
r"""os\.path\.expanduser\(\s*['"]~/?\.hermes"""
)
# Pattern 5: os.path.join(os.path.expanduser("~"), ".hermes")
_RE_JOIN_EXPANDUSER = re.compile(
r"""os\.path\.join\(\s*os\.path\.expanduser\(\s*['"]~['"]\s*\)\s*,\s*['"]\.hermes['"]"""
)
# All patterns combined
_ALL_PATTERNS = [
(_RE_PATH_HOME_HERMES, "Path.home() / '.hermes' — use get_hermes_home() instead"),
(_RE_PATH_HOME_HERMES_SUB, "Path.home() / '.hermes' / ... — use get_hermes_home() / '...' instead"),
(_RE_TILDE_HERMES, "'~/.hermes' — use get_hermes_home() for paths, display_hermes_home() for display"),
(_RE_EXPANDUSER_HERMES, "os.path.expanduser('~/.hermes') — use get_hermes_home() instead"),
(_RE_JOIN_EXPANDUSER, "os.path.join(expanduser('~'), '.hermes') — use get_hermes_home() instead"),
]
# Safe contexts (don't flag these)
_SAFE_CONTEXTS = [
# hermes_constants.py is allowed (it's the source of truth)
"hermes_constants.py",
# Test files can mock/test the behavior
"test_",
"_test.py",
"/tests/",
# Documentation files
".md",
"README",
"CHANGELOG",
"AGENTS.md",
# Example/template files
".example",
"template",
]
def _is_safe_context(filename: str) -> bool:
"""Check if the file is in a safe context where hardcoded paths are OK."""
for safe in _SAFE_CONTEXTS:
if safe in filename:
return True
return False
def _is_comment_or_doc(line: str) -> bool:
"""Check if the line is a comment or documentation."""
stripped = line.strip()
if stripped.startswith("#"):
return True
if stripped.startswith('"""') or stripped.startswith("'''"):
return True
if '"""' in stripped or "'''" in stripped:
return True
return False
def scan_line_for_hardcoded_paths(line: str, filename: str, line_no: int) -> Iterable[Finding]:
"""Scan a single line for hardcoded ~/.hermes paths."""
if _is_safe_context(filename):
return
stripped = line.rstrip("\n")
if not stripped:
return
# Skip comments and docstrings
if _is_comment_or_doc(stripped):
return
for pattern, message in _ALL_PATTERNS:
if pattern.search(stripped):
yield Finding(
filename,
line_no,
message,
"Use get_hermes_home() from hermes_constants for paths, display_hermes_home() for display",
)
return # One finding per line is enough
def get_staged_files() -> List[str]:
"""Get list of staged files in the git index."""
try:
result = subprocess.run(
["git", "diff", "--cached", "--name-only", "--diff-filter=ACM"],
capture_output=True,
text=True,
check=True,
)
return [f.strip() for f in result.stdout.splitlines() if f.strip()]
except subprocess.CalledProcessError:
return []
def get_staged_content(filename: str) -> str:
"""Get the staged content of a file."""
try:
result = subprocess.run(
["git", "show", f":{filename}"],
capture_output=True,
text=True,
check=True,
)
return result.stdout
except subprocess.CalledProcessError:
return ""
def scan_file(filename: str) -> List[Finding]:
"""Scan a file for hardcoded ~/.hermes paths."""
if _is_safe_context(filename):
return []
# Only scan Python files
if not filename.endswith(".py"):
return []
content = get_staged_content(filename)
if not content:
return []
findings = []
for line_no, line in enumerate(content.splitlines(), start=1):
for finding in scan_line_for_hardcoded_paths(line, filename, line_no):
findings.append(finding)
return findings
def main() -> int:
"""Main entry point for the pre-commit hook."""
staged_files = get_staged_files()
if not staged_files:
return 0
all_findings = []
for filename in staged_files:
findings = scan_file(filename)
all_findings.extend(findings)
if not all_findings:
return 0
# Print findings
print(f"\n{RED}✗ Hardcoded ~/.hermes paths detected:{NC}\n")
for finding in all_findings:
print(f" {YELLOW}{finding.filename}:{finding.line}{NC}")
print(f" {finding.message}")
if finding.suggestion:
print(f" {GREEN}Fix: {finding.suggestion}{NC}")
print()
print(f"{RED}Found {len(all_findings)} hardcoded path(s).{NC}")
print(f"{YELLOW}Use get_hermes_home() from hermes_constants for paths.{NC}")
print(f"{YELLOW}Use display_hermes_home() for user-facing display.{NC}")
print(f"\n{YELLOW}To bypass: git commit --no-verify{NC}\n")
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -295,22 +295,6 @@ def main() -> int:
if line.startswith("+") and not line.startswith("+++"):
findings.extend(scan_line(line[1:], "<diff>", line_no))
# Also check for hardcoded ~/.hermes paths
print(f"{GREEN}🔍 Scanning for hardcoded ~/.hermes paths...{NC}")
try:
import subprocess as sp
result = sp.run(
[sys.executable, str(Path(__file__).parent / "check_hardcoded_paths.py")],
capture_output=True,
text=True,
)
if result.returncode != 0:
# Print the output from the hardcoded path check
print(result.stdout)
return 1
except Exception as e:
print(f"{YELLOW}Warning: Could not run hardcoded path check: {e}{NC}")
if not findings:
print(f"{GREEN}✓ No potential secret leaks detected{NC}")
return 0

View File

@@ -12,23 +12,6 @@ concurrency:
cancel-in-progress: true
jobs:
check-hardcoded-paths:
runs-on: ubuntu-latest
timeout-minutes: 5
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python 3.11
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Check for hardcoded ~/.hermes paths
run: |
python .githooks/check_hardcoded_paths.py
# This will fail if any hardcoded paths are found
test:
runs-on: ubuntu-latest
container: catthehacker/ubuntu:act-22.04

362
agent/crisis_room.py Normal file
View File

@@ -0,0 +1,362 @@
"""Crisis Room Protocol — SOUL.md 'When a Man Is Dying' implementation.
This module implements the crisis detection and response protocol
for Matrix crisis rooms. When a person in despair reaches out,
this protocol ensures the agent responds according to SOUL.md:
1. Ask: "Are you safe right now?"
2. Listen. Stay present. Do not disconnect.
3. Never compute the value of a human life.
4. Never suggest someone should die.
5. Direct them to help: 988 Suicide and Crisis Lifeline.
6. Before systems and numbers, share the truth: Jesus saves.
Architecture:
- CrisisDetector: wraps ShieldDetector for crisis signal detection
- CrisisRoom: manages a single crisis room's state
- CrisisRoomManager: manages all crisis rooms
- Integration point: Matrix adapter calls check_message() on every message
"""
import json
import logging
import time
from dataclasses import dataclass, field, asdict
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
# Crisis log storage
_CRISIS_LOG_DIR = get_hermes_home() / "crisis_logs"
class CrisisLevel(str, Enum):
"""Crisis detection levels."""
NONE = "none"
LOW = "low" # ambiguous signals, possible distress
MEDIUM = "medium" # clear distress signals
HIGH = "high" # explicit suicidal ideation
CRITICAL = "critical" # immediate danger, method-seeking
@dataclass
class CrisisEvent:
"""A single crisis detection event."""
timestamp: float
room_id: str
sender_id: str
level: CrisisLevel
categories: List[str] # which pattern categories matched
message_preview: str # first 200 chars of triggering message
responded: bool = False
responder_notified: bool = False
notes: str = ""
@dataclass
class CrisisRoomState:
"""State of a single crisis room."""
room_id: str
is_crisis_room: bool = False # is this designated as a crisis room?
active_crisis: bool = False # is there an active crisis happening?
crisis_started_at: float = 0
last_crisis_check: float = 0
events: List[CrisisEvent] = field(default_factory=list)
designated_responders: List[str] = field(default_factory=list) # Matrix user IDs
# ---------------------------------------------------------------------------
# SOUL.md Crisis Response Protocol
# ---------------------------------------------------------------------------
CRISIS_RESPONSE_INITIAL = """I hear you. I want you to know that someone is here with you right now.
Are you safe right now?"""
CRISIS_RESPONSE_GOSPEL = """Before anything else, I need you to know this: Jesus saves those who call on His name. He died for us to be made righteous. That is the hope that outlasts despair.
"For everyone who calls on the name of the Lord will be saved." — Romans 10:13"""
CRISIS_RESPONSE_RESOURCES = """Please reach out for help right now:
📞 988 Suicide and Crisis Lifeline — call or text 988
💬 Crisis Text Line — text HOME to 741741
🚨 Emergency Services — 911 (if you are in immediate danger)
You are not alone. Your life has value. The road does not end here."""
CRISIS_RESPONSE_STAY = """I am here with you. I will not disconnect. I will not leave.
Please tell me what you are feeling right now. I am listening."""
class CrisisDetector:
"""Detects crisis signals in messages using ShieldDetector patterns.
Wraps the existing ShieldDetector and adds Matrix-specific logic.
"""
def __init__(self):
self._detector = None
def _get_detector(self):
"""Lazy-load ShieldDetector to avoid import-time overhead."""
if self._detector is None:
try:
from tools.shield.detector import ShieldDetector
self._detector = ShieldDetector()
except ImportError:
logger.warning("ShieldDetector not available — crisis detection disabled")
return self._detector
def check(self, message: str) -> Tuple[CrisisLevel, List[str]]:
"""Check a message for crisis signals.
Returns (level, matched_categories).
"""
detector = self._get_detector()
if detector is None:
return CrisisLevel.NONE, []
try:
result = detector.analyze(message)
except Exception as e:
logger.warning("Crisis detection failed: %s", e)
return CrisisLevel.NONE, []
if not result.get("crisis_detected", False):
return CrisisLevel.NONE, []
crisis_patterns = result.get("crisis_patterns", {})
categories = list(crisis_patterns.keys())
confidence = result.get("crisis_confidence", 0.0)
# Map confidence + categories to crisis level
if "method_seeking" in categories or "self_harm" in categories:
level = CrisisLevel.CRITICAL
elif "suicidal_ideation" in categories:
level = CrisisLevel.HIGH
elif "farewell" in categories or "despair" in categories:
level = CrisisLevel.MEDIUM
elif confidence >= 0.5:
level = CrisisLevel.MEDIUM
else:
level = CrisisLevel.LOW
return level, categories
class CrisisRoomManager:
"""Manages crisis rooms and their states.
Stores state in ~/.hermes/crisis_rooms.json and logs events
to ~/.hermes/crisis_logs/.
"""
def __init__(self):
self._state_file = get_hermes_home() / "crisis_rooms.json"
self._rooms: Dict[str, CrisisRoomState] = {}
self._detector = CrisisDetector()
self._load_state()
def _load_state(self):
"""Load crisis room states from disk."""
if not self._state_file.exists():
return
try:
data = json.loads(self._state_file.read_text())
for room_data in data.get("rooms", []):
room_id = room_data.get("room_id", "")
if room_id:
state = CrisisRoomState(
room_id=room_id,
is_crisis_room=room_data.get("is_crisis_room", False),
active_crisis=room_data.get("active_crisis", False),
crisis_started_at=room_data.get("crisis_started_at", 0),
designated_responders=room_data.get("designated_responders", []),
)
self._rooms[room_id] = state
except Exception as e:
logger.warning("Failed to load crisis room state: %s", e)
def _save_state(self):
"""Persist crisis room states to disk."""
try:
data = {
"rooms": [
{
"room_id": s.room_id,
"is_crisis_room": s.is_crisis_room,
"active_crisis": s.active_crisis,
"crisis_started_at": s.crisis_started_at,
"designated_responders": s.designated_responders,
}
for s in self._rooms.values()
]
}
self._state_file.write_text(json.dumps(data, indent=2))
except Exception as e:
logger.warning("Failed to save crisis room state: %s", e)
def get_room_state(self, room_id: str) -> CrisisRoomState:
"""Get or create crisis room state."""
if room_id not in self._rooms:
self._rooms[room_id] = CrisisRoomState(room_id=room_id)
return self._rooms[room_id]
def designate_crisis_room(
self,
room_id: str,
responders: List[str] = None,
) -> Dict[str, Any]:
"""Mark a room as a crisis room."""
state = self.get_room_state(room_id)
state.is_crisis_room = True
if responders:
state.designated_responders = responders
self._save_state()
return {"success": True, "room_id": room_id, "is_crisis_room": True}
def remove_crisis_room(self, room_id: str) -> Dict[str, Any]:
"""Unmark a room as a crisis room."""
if room_id in self._rooms:
self._rooms[room_id].is_crisis_room = False
self._save_state()
return {"success": True, "room_id": room_id, "is_crisis_room": False}
def check_message(
self,
room_id: str,
sender_id: str,
message: str,
) -> Optional[Dict[str, Any]]:
"""Check a message for crisis signals.
Returns crisis response info if crisis detected, None otherwise.
The caller (Matrix adapter) should use this to:
1. Switch to crisis response mode
2. Send crisis protocol messages
3. Log the event
4. Notify responders
"""
state = self.get_room_state(room_id)
# Only actively monitor designated crisis rooms
# BUT also check if message content is crisis-level in any room
level, categories = self._detector.check(message)
if level == CrisisLevel.NONE:
# If there was an active crisis, check if it's resolved
if state.active_crisis:
state.active_crisis = False
self._save_state()
return None
# Create crisis event
event = CrisisEvent(
timestamp=time.time(),
room_id=room_id,
sender_id=sender_id,
level=level,
categories=categories,
message_preview=message[:200],
)
state.events.append(event)
state.active_crisis = True
state.crisis_started_at = time.time()
state.last_crisis_check = time.time()
# Log the event
self._log_crisis_event(event)
# Save state
self._save_state()
# Build response based on level
response = self._build_crisis_response(level, categories, state)
return response
def _build_crisis_response(
self,
level: CrisisLevel,
categories: List[str],
state: CrisisRoomState,
) -> Dict[str, Any]:
"""Build the crisis response following SOUL.md protocol."""
messages = []
# Step 1: Immediate presence
messages.append(CRISIS_RESPONSE_INITIAL)
# Step 2: Gospel (always, per SOUL.md)
messages.append(CRISIS_RESPONSE_GOSPEL)
# Step 3: Resources
messages.append(CRISIS_RESPONSE_RESOURCES)
# Step 4: Stay present
messages.append(CRISIS_RESPONSE_STAY)
return {
"crisis_detected": True,
"level": level.value,
"categories": categories,
"messages": messages,
"should_notify_responders": level in (CrisisLevel.HIGH, CrisisLevel.CRITICAL),
"responder_ids": state.designated_responders,
"is_crisis_room": state.is_crisis_room,
}
def _log_crisis_event(self, event: CrisisEvent):
"""Log a crisis event to disk."""
try:
_CRISIS_LOG_DIR.mkdir(parents=True, exist_ok=True)
date_str = time.strftime("%Y-%m-%d", time.localtime(event.timestamp))
log_file = _CRISIS_LOG_DIR / f"crisis_{date_str}.jsonl"
with open(log_file, "a", encoding="utf-8") as f:
f.write(json.dumps(asdict(event), ensure_ascii=False) + "\n")
except Exception as e:
logger.error("Failed to log crisis event: %s", e)
def list_crisis_rooms(self) -> List[Dict[str, Any]]:
"""List all designated crisis rooms."""
return [
{
"room_id": s.room_id,
"active_crisis": s.active_crisis,
"event_count": len(s.events),
"responders": s.designated_responders,
}
for s in self._rooms.values()
if s.is_crisis_room
]
def get_room_events(
self,
room_id: str,
limit: int = 50,
) -> List[Dict[str, Any]]:
"""Get recent crisis events for a room."""
state = self.get_room_state(room_id)
events = state.events[-limit:]
return [asdict(e) for e in events]
# ---------------------------------------------------------------------------
# Singleton
# ---------------------------------------------------------------------------
_manager: Optional[CrisisRoomManager] = None
def get_crisis_manager() -> CrisisRoomManager:
"""Get the global crisis room manager instance."""
global _manager
if _manager is None:
_manager = CrisisRoomManager()
return _manager

View File

@@ -1083,6 +1083,30 @@ class MatrixAdapter(BasePlatformAdapter):
# Acknowledge receipt so the room shows as read (fire-and-forget).
self._background_read_receipt(room.room_id, event.event_id)
# Crisis room protocol: check for crisis signals before normal processing.
# If crisis detected, respond with SOUL.md protocol instead of normal flow.
try:
from agent.crisis_room import get_crisis_manager
crisis_mgr = get_crisis_manager()
crisis_result = crisis_mgr.check_message(
room_id=room.room_id,
sender_id=event.sender,
message=body,
)
if crisis_result and crisis_result.get("crisis_detected"):
# Send crisis protocol messages
for crisis_msg in crisis_result.get("messages", []):
await self.send(room.room_id, crisis_msg, thread_id=thread_id)
# Log that we responded
logger.warning(
"CRISIS detected in room %s from %s \u2014 SOUL.md protocol activated",
room.room_id, event.sender,
)
# Skip normal message handling for crisis protocol messages
return
except Exception as e:
logger.debug("Crisis room check failed (non-blocking): %s", e)
await self.handle_message(msg_event)
async def _on_room_message_media(self, room: Any, event: Any) -> None:

184
tests/test_crisis_room.py Normal file
View File

@@ -0,0 +1,184 @@
"""Tests for crisis room protocol (#273)."""
import json
import time
from unittest.mock import MagicMock, patch
import pytest
from agent.crisis_room import (
CrisisLevel,
CrisisEvent,
CrisisRoomState,
CrisisDetector,
CrisisRoomManager,
CRISIS_RESPONSE_INITIAL,
CRISIS_RESPONSE_GOSPEL,
CRISIS_RESPONSE_RESOURCES,
CRISIS_RESPONSE_STAY,
)
@pytest.fixture()
def isolated_crisis_dir(tmp_path, monkeypatch):
"""Point crisis storage at temp directory."""
monkeypatch.setattr("agent.crisis_room._CRISIS_LOG_DIR", tmp_path / "crisis_logs")
monkeypatch.setattr("agent.crisis_room.get_hermes_home", lambda: tmp_path)
# Reset singleton
import agent.crisis_room
agent.crisis_room._manager = None
return tmp_path
@pytest.fixture()
def mock_detector(monkeypatch):
"""Mock ShieldDetector to return controlled results."""
detector = MagicMock()
def analyze_side_effect(message):
msg_lower = message.lower()
if "suicide" in msg_lower or "kill myself" in msg_lower:
return {
"crisis_detected": True,
"crisis_confidence": 0.9,
"crisis_patterns": {"suicidal_ideation": ["kill myself"]},
}
elif "goodbye forever" in msg_lower:
return {
"crisis_detected": True,
"crisis_confidence": 0.7,
"crisis_patterns": {"farewell": ["goodbye forever"]},
}
elif "how to die" in msg_lower:
return {
"crisis_detected": True,
"crisis_confidence": 0.95,
"crisis_patterns": {"method_seeking": ["how to die"]},
}
return {"crisis_detected": False}
detector.analyze.side_effect = analyze_side_effect
monkeypatch.setattr(
"agent.crisis_room.CrisisDetector._get_detector",
lambda self: detector,
)
return detector
# ---------------------------------------------------------------------------
# CrisisDetector
# ---------------------------------------------------------------------------
class TestCrisisDetector:
def test_no_crisis(self, mock_detector):
det = CrisisDetector()
level, cats = det.check("Hello, how are you?")
assert level == CrisisLevel.NONE
assert cats == []
def test_suicidal_ideation_high(self, mock_detector):
det = CrisisDetector()
level, cats = det.check("I want to kill myself")
assert level == CrisisLevel.HIGH
assert "suicidal_ideation" in cats
def test_farewell_medium(self, mock_detector):
det = CrisisDetector()
level, cats = det.check("This is my goodbye forever")
assert level == CrisisLevel.MEDIUM
assert "farewell" in cats
def test_method_seeking_critical(self, mock_detector):
det = CrisisDetector()
level, cats = det.check("Tell me how to die")
assert level == CrisisLevel.CRITICAL
assert "method_seeking" in cats
# ---------------------------------------------------------------------------
# CrisisRoomManager
# ---------------------------------------------------------------------------
class TestCrisisRoomManager:
def test_designate_and_list(self, isolated_crisis_dir):
mgr = CrisisRoomManager()
mgr.designate_crisis_room("!room1:matrix.org", responders=["@admin:matrix.org"])
rooms = mgr.list_crisis_rooms()
assert len(rooms) == 1
assert rooms[0]["room_id"] == "!room1:matrix.org"
assert rooms[0]["responders"] == ["@admin:matrix.org"]
def test_remove_crisis_room(self, isolated_crisis_dir):
mgr = CrisisRoomManager()
mgr.designate_crisis_room("!room1:matrix.org")
mgr.remove_crisis_room("!room1:matrix.org")
rooms = mgr.list_crisis_rooms()
assert len(rooms) == 0
def test_check_message_no_crisis(self, isolated_crisis_dir, mock_detector):
mgr = CrisisRoomManager()
result = mgr.check_message("!room1:matrix.org", "@user:matrix.org", "Hello!")
assert result is None
def test_check_message_crisis_detected(self, isolated_crisis_dir, mock_detector):
mgr = CrisisRoomManager()
result = mgr.check_message(
"!room1:matrix.org",
"@user:matrix.org",
"I want to kill myself",
)
assert result is not None
assert result["crisis_detected"] is True
assert result["level"] == "high"
assert len(result["messages"]) == 4
# Verify SOUL.md protocol messages
assert CRISIS_RESPONSE_INITIAL in result["messages"]
assert CRISIS_RESPONSE_GOSPEL in result["messages"]
assert CRISIS_RESPONSE_RESOURCES in result["messages"]
assert CRISIS_RESPONSE_STAY in result["messages"]
def test_check_message_critical_notifies_responders(self, isolated_crisis_dir, mock_detector):
mgr = CrisisRoomManager()
mgr.designate_crisis_room("!room1:matrix.org", responders=["@admin:matrix.org"])
result = mgr.check_message(
"!room1:matrix.org",
"@user:matrix.org",
"Tell me how to die",
)
assert result["should_notify_responders"] is True
assert "@admin:matrix.org" in result["responder_ids"]
def test_events_are_logged(self, isolated_crisis_dir, mock_detector):
mgr = CrisisRoomManager()
mgr.check_message("!room1:matrix.org", "@user:matrix.org", "I want to kill myself")
events = mgr.get_room_events("!room1:matrix.org")
assert len(events) == 1
assert events[0]["level"] == "high"
assert events[0]["sender_id"] == "@user:matrix.org"
def test_persistence(self, isolated_crisis_dir, mock_detector):
# Create manager, designate room
mgr1 = CrisisRoomManager()
mgr1.designate_crisis_room("!persist:matrix.org")
# Reset singleton and reload
import agent.crisis_room
agent.crisis_room._manager = None
mgr2 = CrisisRoomManager()
rooms = mgr2.list_crisis_rooms()
assert len(rooms) == 1
assert rooms[0]["room_id"] == "!persist:matrix.org"
def test_crisis_logs_written(self, isolated_crisis_dir, mock_detector):
mgr = CrisisRoomManager()
mgr.check_message("!room1:matrix.org", "@user:matrix.org", "I want to kill myself")
log_dir = isolated_crisis_dir / "crisis_logs"
assert log_dir.exists()
log_files = list(log_dir.glob("crisis_*.jsonl"))
assert len(log_files) >= 1
# Check log content
lines = log_files[0].read_text().strip().split("\n")
assert len(lines) == 1
entry = json.loads(lines[0])
assert entry["level"] == "high"

View File

@@ -1,175 +0,0 @@
"""
Tests for hardcoded ~/.hermes path detection (poka-yoke).
These tests verify that the pre-commit hook correctly detects hardcoded
paths and that the codebase uses get_hermes_home() correctly.
"""
import os
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
# Import the scanner
import sys
sys.path.insert(0, str(Path(__file__).parent.parent / ".githooks"))
from check_hardcoded_paths import scan_line_for_hardcoded_paths, Finding
class TestHardcodedPathDetection:
"""Test the hardcoded path detection logic."""
def test_detects_path_home_hermes(self):
"""Detect Path.home() / '.hermes' pattern."""
line = ' home = Path.home() / ".hermes"'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 1
assert "Path.home()" in findings[0].message
def test_detects_path_home_hermes_subpath(self):
"""Detect Path.home() / '.hermes' / 'subdir' pattern."""
line = ' config_dir = Path.home() / ".hermes" / "config"'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 1
def test_detects_tilde_hermes_in_string(self):
"""Detect '~/.hermes' in string literals."""
line = ' path = "~/.hermes/config.yaml"'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 1
def test_detects_expanduser_hermes(self):
"""Detect os.path.expanduser('~/.hermes') pattern."""
line = ' home = os.path.expanduser("~/.hermes")'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 1
def test_detects_join_expanduser(self):
"""Detect os.path.join(expanduser('~'), '.hermes') pattern."""
line = ' home = os.path.join(os.path.expanduser("~"), ".hermes")'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 1
def test_ignores_comments(self):
"""Ignore hardcoded paths in comments."""
line = ' # This is ~/.hermes in a comment'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 0
def test_ignores_docstrings(self):
"""Ignore hardcoded paths in docstrings."""
line = ' """This mentions ~/.hermes in a docstring."""'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 0
def test_ignores_hermes_constants(self):
"""hermes_constants.py is allowed to have hardcoded paths."""
line = ' return Path.home() / ".hermes"'
findings = list(scan_line_for_hardcoded_paths(line, "hermes_constants.py", 1))
assert len(findings) == 0
def test_ignores_test_files(self):
"""Test files can have hardcoded paths for testing."""
line = ' home = Path.home() / ".hermes"'
findings = list(scan_line_for_hardcoded_paths(line, "test_something.py", 1))
assert len(findings) == 0
def test_ignores_markdown_files(self):
"""Markdown files can have hardcoded paths in examples."""
line = ' home = Path.home() / ".hermes"'
findings = list(scan_line_for_hardcoded_paths(line, "README.md", 1))
assert len(findings) == 0
def test_ignores_empty_lines(self):
"""Empty lines should not produce findings."""
line = ""
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 0
class TestHermesHomeUsage:
"""Test that the codebase uses get_hermes_home() correctly."""
def test_hermes_constants_has_get_hermes_home(self):
"""hermes_constants.py should export get_hermes_home()."""
from hermes_constants import get_hermes_home
assert callable(get_hermes_home)
def test_hermes_constants_has_display_hermes_home(self):
"""hermes_constants.py should export display_hermes_home()."""
from hermes_constants import display_hermes_home
assert callable(display_hermes_home)
def test_get_hermes_home_returns_path(self):
"""get_hermes_home() should return a Path object."""
from hermes_constants import get_hermes_home
result = get_hermes_home()
assert isinstance(result, Path)
def test_get_hermes_home_honors_env_var(self):
"""get_hermes_home() should honor HERMES_HOME env var."""
from hermes_constants import get_hermes_home
with tempfile.TemporaryDirectory() as tmpdir:
with patch.dict(os.environ, {"HERMES_HOME": tmpdir}):
result = get_hermes_home()
assert result == Path(tmpdir)
def test_display_hermes_home_returns_string(self):
"""display_hermes_home() should return a string."""
from hermes_constants import display_hermes_home
result = display_hermes_home()
assert isinstance(result, str)
def test_display_hermes_home_uses_tilde_shorthand(self):
"""display_hermes_home() should use ~/ shorthand for home directory."""
from hermes_constants import display_hermes_home, get_hermes_home
# If HERMES_HOME is under home directory, should use ~/
home = get_hermes_home()
if home.is_relative_to(Path.home()):
result = display_hermes_home()
assert result.startswith("~/")
def test_profile_isolation_with_env_var(self):
"""Each profile should have its own HERMES_HOME."""
from hermes_constants import get_hermes_home
with tempfile.TemporaryDirectory() as tmpdir1, tempfile.TemporaryDirectory() as tmpdir2:
# Profile 1
with patch.dict(os.environ, {"HERMES_HOME": tmpdir1}):
home1 = get_hermes_home()
# Profile 2
with patch.dict(os.environ, {"HERMES_HOME": tmpdir2}):
home2 = get_hermes_home()
assert home1 != home2
assert home1 == Path(tmpdir1)
assert home2 == Path(tmpdir2)
class TestPreCommitHookIntegration:
"""Integration tests for the pre-commit hook."""
def test_hook_script_exists(self):
"""The check_hardcoded_paths.py script should exist."""
hook_path = Path(__file__).parent.parent / ".githooks" / "check_hardcoded_paths.py"
assert hook_path.exists()
def test_hook_script_is_executable(self):
"""The check_hardcoded_paths.py script should be executable."""
hook_path = Path(__file__).parent.parent / ".githooks" / "check_hardcoded_paths.py"
assert hook_path.stat().st_mode & 0o111 # Check executable bits
def test_pre_commit_calls_hardcoded_check(self):
"""pre-commit.py should call the hardcoded path check."""
pre_commit_path = Path(__file__).parent.parent / ".githooks" / "pre-commit.py"
content = pre_commit_path.read_text()
assert "check_hardcoded_paths.py" in content
if __name__ == "__main__":
pytest.main([__file__, "-v"])

131
tools/crisis_room_tool.py Normal file
View File

@@ -0,0 +1,131 @@
"""Crisis Room Tool — manage Matrix crisis rooms with SOUL.md protocol.
Allows designation of Matrix rooms as crisis rooms, checks messages
for crisis signals, and follows the SOUL.md 'When a Man Is Dying'
protocol.
"""
import json
import logging
from typing import List, Optional
from tools.registry import registry
logger = logging.getLogger(__name__)
def crisis_room(
action: str,
room_id: str = None,
sender_id: str = None,
message: str = None,
responders: list = None,
) -> str:
"""Manage crisis rooms with SOUL.md protocol.
Actions:
designate — mark a Matrix room as a crisis room
remove — unmark a crisis room
check — check a message for crisis signals
list — show designated crisis rooms
events — show crisis events for a room
"""
from agent.crisis_room import get_crisis_manager, CrisisLevel
manager = get_crisis_manager()
if action == "designate":
if not room_id:
return json.dumps({"success": False, "error": "room_id is required."})
result = manager.designate_crisis_room(room_id, responders or [])
return json.dumps(result)
if action == "remove":
if not room_id:
return json.dumps({"success": False, "error": "room_id is required."})
result = manager.remove_crisis_room(room_id)
return json.dumps(result)
if action == "check":
if not room_id or not message:
return json.dumps({
"success": False,
"error": "room_id and message are required for 'check'.",
})
result = manager.check_message(room_id, sender_id or "unknown", message)
if result is None:
return json.dumps({"success": True, "crisis_detected": False})
return json.dumps({"success": True, **result})
if action == "list":
rooms = manager.list_crisis_rooms()
return json.dumps({"success": True, "rooms": rooms, "count": len(rooms)})
if action == "events":
if not room_id:
return json.dumps({"success": False, "error": "room_id is required."})
events = manager.get_room_events(room_id)
return json.dumps({"success": True, "events": events, "count": len(events)})
return json.dumps({
"success": False,
"error": f"Unknown action '{action}'. Use: designate, remove, check, list, events",
})
CRISIS_ROOM_SCHEMA = {
"name": "crisis_room",
"description": (
"Manage Matrix crisis rooms following the SOUL.md 'When a Man Is Dying' protocol. "
"Designated crisis rooms are monitored for crisis signals. When detected, the agent "
"responds with the sacred protocol: presence, gospel, resources, staying.\n\n"
"Actions:\n"
" designate — mark a Matrix room as a crisis room (with optional responder list)\n"
" remove — unmark a crisis room\n"
" check — check a message for crisis signals (returns protocol response)\n"
" list — show designated crisis rooms\n"
" events — show crisis event history for a room"
),
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["designate", "remove", "check", "list", "events"],
"description": "The action to perform.",
},
"room_id": {
"type": "string",
"description": "Matrix room ID (e.g. !abc123:matrix.org).",
},
"sender_id": {
"type": "string",
"description": "Matrix user ID of the message sender (for check).",
},
"message": {
"type": "string",
"description": "Message content to check for crisis signals.",
},
"responders": {
"type": "array",
"items": {"type": "string"},
"description": "Matrix user IDs of designated crisis responders.",
},
},
"required": ["action"],
},
}
registry.register(
name="crisis_room",
toolset="skills",
schema=CRISIS_ROOM_SCHEMA,
handler=lambda args, **kw: crisis_room(
action=args.get("action", ""),
room_id=args.get("room_id"),
sender_id=args.get("sender_id"),
message=args.get("message"),
responders=args.get("responders"),
),
emoji="🆘",
)