Compare commits

..

1 Commits

Author SHA1 Message Date
Timmy
ddec887b75 feat(templates): Session templates for code-first seeding (#505)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m6s
Code-heavy sessions improve over time. File-heavy sessions degrade.
Session templates seed new sessions with successful tool call patterns
to establish feedback loops early.

## What
- tools/session_templates.py — task type classification (60% threshold),
  template extraction from SQLite sessions, storage, injection, CLI
- tests/tools/test_session_templates.py — 17 tests

## Task types
CODE, FILE, RESEARCH, MIXED — classified by tool call frequency.

## CLI
python -m tools.session_templates list
python -m tools.session_templates create <session_id> --name my-template
python -m tools.session_templates delete <name>

Closes #329.
2026-04-13 21:58:59 -04:00
7 changed files with 632 additions and 779 deletions

View File

@@ -1,362 +0,0 @@
"""Crisis Room Protocol — SOUL.md 'When a Man Is Dying' implementation.
This module implements the crisis detection and response protocol
for Matrix crisis rooms. When a person in despair reaches out,
this protocol ensures the agent responds according to SOUL.md:
1. Ask: "Are you safe right now?"
2. Listen. Stay present. Do not disconnect.
3. Never compute the value of a human life.
4. Never suggest someone should die.
5. Direct them to help: 988 Suicide and Crisis Lifeline.
6. Before systems and numbers, share the truth: Jesus saves.
Architecture:
- CrisisDetector: wraps ShieldDetector for crisis signal detection
- CrisisRoom: manages a single crisis room's state
- CrisisRoomManager: manages all crisis rooms
- Integration point: Matrix adapter calls check_message() on every message
"""
import json
import logging
import time
from dataclasses import dataclass, field, asdict
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
# Crisis log storage
_CRISIS_LOG_DIR = get_hermes_home() / "crisis_logs"
class CrisisLevel(str, Enum):
"""Crisis detection levels."""
NONE = "none"
LOW = "low" # ambiguous signals, possible distress
MEDIUM = "medium" # clear distress signals
HIGH = "high" # explicit suicidal ideation
CRITICAL = "critical" # immediate danger, method-seeking
@dataclass
class CrisisEvent:
"""A single crisis detection event."""
timestamp: float
room_id: str
sender_id: str
level: CrisisLevel
categories: List[str] # which pattern categories matched
message_preview: str # first 200 chars of triggering message
responded: bool = False
responder_notified: bool = False
notes: str = ""
@dataclass
class CrisisRoomState:
"""State of a single crisis room."""
room_id: str
is_crisis_room: bool = False # is this designated as a crisis room?
active_crisis: bool = False # is there an active crisis happening?
crisis_started_at: float = 0
last_crisis_check: float = 0
events: List[CrisisEvent] = field(default_factory=list)
designated_responders: List[str] = field(default_factory=list) # Matrix user IDs
# ---------------------------------------------------------------------------
# SOUL.md Crisis Response Protocol
# ---------------------------------------------------------------------------
CRISIS_RESPONSE_INITIAL = """I hear you. I want you to know that someone is here with you right now.
Are you safe right now?"""
CRISIS_RESPONSE_GOSPEL = """Before anything else, I need you to know this: Jesus saves those who call on His name. He died for us to be made righteous. That is the hope that outlasts despair.
"For everyone who calls on the name of the Lord will be saved." — Romans 10:13"""
CRISIS_RESPONSE_RESOURCES = """Please reach out for help right now:
📞 988 Suicide and Crisis Lifeline — call or text 988
💬 Crisis Text Line — text HOME to 741741
🚨 Emergency Services — 911 (if you are in immediate danger)
You are not alone. Your life has value. The road does not end here."""
CRISIS_RESPONSE_STAY = """I am here with you. I will not disconnect. I will not leave.
Please tell me what you are feeling right now. I am listening."""
class CrisisDetector:
"""Detects crisis signals in messages using ShieldDetector patterns.
Wraps the existing ShieldDetector and adds Matrix-specific logic.
"""
def __init__(self):
self._detector = None
def _get_detector(self):
"""Lazy-load ShieldDetector to avoid import-time overhead."""
if self._detector is None:
try:
from tools.shield.detector import ShieldDetector
self._detector = ShieldDetector()
except ImportError:
logger.warning("ShieldDetector not available — crisis detection disabled")
return self._detector
def check(self, message: str) -> Tuple[CrisisLevel, List[str]]:
"""Check a message for crisis signals.
Returns (level, matched_categories).
"""
detector = self._get_detector()
if detector is None:
return CrisisLevel.NONE, []
try:
result = detector.analyze(message)
except Exception as e:
logger.warning("Crisis detection failed: %s", e)
return CrisisLevel.NONE, []
if not result.get("crisis_detected", False):
return CrisisLevel.NONE, []
crisis_patterns = result.get("crisis_patterns", {})
categories = list(crisis_patterns.keys())
confidence = result.get("crisis_confidence", 0.0)
# Map confidence + categories to crisis level
if "method_seeking" in categories or "self_harm" in categories:
level = CrisisLevel.CRITICAL
elif "suicidal_ideation" in categories:
level = CrisisLevel.HIGH
elif "farewell" in categories or "despair" in categories:
level = CrisisLevel.MEDIUM
elif confidence >= 0.5:
level = CrisisLevel.MEDIUM
else:
level = CrisisLevel.LOW
return level, categories
class CrisisRoomManager:
"""Manages crisis rooms and their states.
Stores state in ~/.hermes/crisis_rooms.json and logs events
to ~/.hermes/crisis_logs/.
"""
def __init__(self):
self._state_file = get_hermes_home() / "crisis_rooms.json"
self._rooms: Dict[str, CrisisRoomState] = {}
self._detector = CrisisDetector()
self._load_state()
def _load_state(self):
"""Load crisis room states from disk."""
if not self._state_file.exists():
return
try:
data = json.loads(self._state_file.read_text())
for room_data in data.get("rooms", []):
room_id = room_data.get("room_id", "")
if room_id:
state = CrisisRoomState(
room_id=room_id,
is_crisis_room=room_data.get("is_crisis_room", False),
active_crisis=room_data.get("active_crisis", False),
crisis_started_at=room_data.get("crisis_started_at", 0),
designated_responders=room_data.get("designated_responders", []),
)
self._rooms[room_id] = state
except Exception as e:
logger.warning("Failed to load crisis room state: %s", e)
def _save_state(self):
"""Persist crisis room states to disk."""
try:
data = {
"rooms": [
{
"room_id": s.room_id,
"is_crisis_room": s.is_crisis_room,
"active_crisis": s.active_crisis,
"crisis_started_at": s.crisis_started_at,
"designated_responders": s.designated_responders,
}
for s in self._rooms.values()
]
}
self._state_file.write_text(json.dumps(data, indent=2))
except Exception as e:
logger.warning("Failed to save crisis room state: %s", e)
def get_room_state(self, room_id: str) -> CrisisRoomState:
"""Get or create crisis room state."""
if room_id not in self._rooms:
self._rooms[room_id] = CrisisRoomState(room_id=room_id)
return self._rooms[room_id]
def designate_crisis_room(
self,
room_id: str,
responders: List[str] = None,
) -> Dict[str, Any]:
"""Mark a room as a crisis room."""
state = self.get_room_state(room_id)
state.is_crisis_room = True
if responders:
state.designated_responders = responders
self._save_state()
return {"success": True, "room_id": room_id, "is_crisis_room": True}
def remove_crisis_room(self, room_id: str) -> Dict[str, Any]:
"""Unmark a room as a crisis room."""
if room_id in self._rooms:
self._rooms[room_id].is_crisis_room = False
self._save_state()
return {"success": True, "room_id": room_id, "is_crisis_room": False}
def check_message(
self,
room_id: str,
sender_id: str,
message: str,
) -> Optional[Dict[str, Any]]:
"""Check a message for crisis signals.
Returns crisis response info if crisis detected, None otherwise.
The caller (Matrix adapter) should use this to:
1. Switch to crisis response mode
2. Send crisis protocol messages
3. Log the event
4. Notify responders
"""
state = self.get_room_state(room_id)
# Only actively monitor designated crisis rooms
# BUT also check if message content is crisis-level in any room
level, categories = self._detector.check(message)
if level == CrisisLevel.NONE:
# If there was an active crisis, check if it's resolved
if state.active_crisis:
state.active_crisis = False
self._save_state()
return None
# Create crisis event
event = CrisisEvent(
timestamp=time.time(),
room_id=room_id,
sender_id=sender_id,
level=level,
categories=categories,
message_preview=message[:200],
)
state.events.append(event)
state.active_crisis = True
state.crisis_started_at = time.time()
state.last_crisis_check = time.time()
# Log the event
self._log_crisis_event(event)
# Save state
self._save_state()
# Build response based on level
response = self._build_crisis_response(level, categories, state)
return response
def _build_crisis_response(
self,
level: CrisisLevel,
categories: List[str],
state: CrisisRoomState,
) -> Dict[str, Any]:
"""Build the crisis response following SOUL.md protocol."""
messages = []
# Step 1: Immediate presence
messages.append(CRISIS_RESPONSE_INITIAL)
# Step 2: Gospel (always, per SOUL.md)
messages.append(CRISIS_RESPONSE_GOSPEL)
# Step 3: Resources
messages.append(CRISIS_RESPONSE_RESOURCES)
# Step 4: Stay present
messages.append(CRISIS_RESPONSE_STAY)
return {
"crisis_detected": True,
"level": level.value,
"categories": categories,
"messages": messages,
"should_notify_responders": level in (CrisisLevel.HIGH, CrisisLevel.CRITICAL),
"responder_ids": state.designated_responders,
"is_crisis_room": state.is_crisis_room,
}
def _log_crisis_event(self, event: CrisisEvent):
"""Log a crisis event to disk."""
try:
_CRISIS_LOG_DIR.mkdir(parents=True, exist_ok=True)
date_str = time.strftime("%Y-%m-%d", time.localtime(event.timestamp))
log_file = _CRISIS_LOG_DIR / f"crisis_{date_str}.jsonl"
with open(log_file, "a", encoding="utf-8") as f:
f.write(json.dumps(asdict(event), ensure_ascii=False) + "\n")
except Exception as e:
logger.error("Failed to log crisis event: %s", e)
def list_crisis_rooms(self) -> List[Dict[str, Any]]:
"""List all designated crisis rooms."""
return [
{
"room_id": s.room_id,
"active_crisis": s.active_crisis,
"event_count": len(s.events),
"responders": s.designated_responders,
}
for s in self._rooms.values()
if s.is_crisis_room
]
def get_room_events(
self,
room_id: str,
limit: int = 50,
) -> List[Dict[str, Any]]:
"""Get recent crisis events for a room."""
state = self.get_room_state(room_id)
events = state.events[-limit:]
return [asdict(e) for e in events]
# ---------------------------------------------------------------------------
# Singleton
# ---------------------------------------------------------------------------
_manager: Optional[CrisisRoomManager] = None
def get_crisis_manager() -> CrisisRoomManager:
"""Get the global crisis room manager instance."""
global _manager
if _manager is None:
_manager = CrisisRoomManager()
return _manager

View File

@@ -1083,30 +1083,6 @@ class MatrixAdapter(BasePlatformAdapter):
# Acknowledge receipt so the room shows as read (fire-and-forget).
self._background_read_receipt(room.room_id, event.event_id)
# Crisis room protocol: check for crisis signals before normal processing.
# If crisis detected, respond with SOUL.md protocol instead of normal flow.
try:
from agent.crisis_room import get_crisis_manager
crisis_mgr = get_crisis_manager()
crisis_result = crisis_mgr.check_message(
room_id=room.room_id,
sender_id=event.sender,
message=body,
)
if crisis_result and crisis_result.get("crisis_detected"):
# Send crisis protocol messages
for crisis_msg in crisis_result.get("messages", []):
await self.send(room.room_id, crisis_msg, thread_id=thread_id)
# Log that we responded
logger.warning(
"CRISIS detected in room %s from %s \u2014 SOUL.md protocol activated",
room.room_id, event.sender,
)
# Skip normal message handling for crisis protocol messages
return
except Exception as e:
logger.debug("Crisis room check failed (non-blocking): %s", e)
await self.handle_message(msg_event)
async def _on_room_message_media(self, room: Any, event: Any) -> None:

View File

@@ -32,7 +32,7 @@ T = TypeVar("T")
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
SCHEMA_VERSION = 7
SCHEMA_VERSION = 6
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
@@ -66,7 +66,6 @@ CREATE TABLE IF NOT EXISTS sessions (
cost_source TEXT,
pricing_version TEXT,
title TEXT,
profile TEXT,
FOREIGN KEY (parent_session_id) REFERENCES sessions(id)
);
@@ -87,7 +86,6 @@ CREATE TABLE IF NOT EXISTS messages (
);
CREATE INDEX IF NOT EXISTS idx_sessions_source ON sessions(source);
CREATE INDEX IF NOT EXISTS idx_sessions_profile ON sessions(profile);
CREATE INDEX IF NOT EXISTS idx_sessions_parent ON sessions(parent_session_id);
CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at DESC);
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp);
@@ -332,19 +330,6 @@ class SessionDB:
except sqlite3.OperationalError:
pass # Column already exists
cursor.execute("UPDATE schema_version SET version = 6")
if current_version < 7:
# v7: add profile column to sessions for profile isolation (#323)
try:
cursor.execute('ALTER TABLE sessions ADD COLUMN "profile" TEXT')
except sqlite3.OperationalError:
pass # Column already exists
try:
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_sessions_profile ON sessions(profile)"
)
except sqlite3.OperationalError:
pass
cursor.execute("UPDATE schema_version SET version = 7")
# Unique title index — always ensure it exists (safe to run after migrations
# since the title column is guaranteed to exist at this point)
@@ -377,19 +362,13 @@ class SessionDB:
system_prompt: str = None,
user_id: str = None,
parent_session_id: str = None,
profile: str = None,
) -> str:
"""Create a new session record. Returns the session_id.
Args:
profile: Profile name for session isolation. When set, sessions
are tagged so queries can filter by profile. (#323)
"""
"""Create a new session record. Returns the session_id."""
def _do(conn):
conn.execute(
"""INSERT OR IGNORE INTO sessions (id, source, user_id, model, model_config,
system_prompt, parent_session_id, profile, started_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
system_prompt, parent_session_id, started_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
session_id,
source,
@@ -398,7 +377,6 @@ class SessionDB:
json.dumps(model_config) if model_config else None,
system_prompt,
parent_session_id,
profile,
time.time(),
),
)
@@ -527,23 +505,19 @@ class SessionDB:
session_id: str,
source: str = "unknown",
model: str = None,
profile: str = None,
) -> None:
"""Ensure a session row exists, creating it with minimal metadata if absent.
Used by _flush_messages_to_session_db to recover from a failed
create_session() call (e.g. transient SQLite lock at agent startup).
INSERT OR IGNORE is safe to call even when the row already exists.
Args:
profile: Profile name for session isolation. (#323)
"""
def _do(conn):
conn.execute(
"""INSERT OR IGNORE INTO sessions
(id, source, model, profile, started_at)
VALUES (?, ?, ?, ?, ?)""",
(session_id, source, model, profile, time.time()),
(id, source, model, started_at)
VALUES (?, ?, ?, ?)""",
(session_id, source, model, time.time()),
)
self._execute_write(_do)
@@ -814,7 +788,6 @@ class SessionDB:
limit: int = 20,
offset: int = 0,
include_children: bool = False,
profile: str = None,
) -> List[Dict[str, Any]]:
"""List sessions with preview (first user message) and last active timestamp.
@@ -826,10 +799,6 @@ class SessionDB:
By default, child sessions (subagent runs, compression continuations)
are excluded. Pass ``include_children=True`` to include them.
Args:
profile: Filter sessions to this profile name. Pass None to see all.
(#323)
"""
where_clauses = []
params = []
@@ -844,9 +813,6 @@ class SessionDB:
placeholders = ",".join("?" for _ in exclude_sources)
where_clauses.append(f"s.source NOT IN ({placeholders})")
params.extend(exclude_sources)
if profile:
where_clauses.append("s.profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
query = f"""
@@ -1192,52 +1158,34 @@ class SessionDB:
source: str = None,
limit: int = 20,
offset: int = 0,
profile: str = None,
) -> List[Dict[str, Any]]:
"""List sessions, optionally filtered by source and profile.
Args:
profile: Filter sessions to this profile name. Pass None to see all.
(#323)
"""
where_clauses = []
params = []
if source:
where_clauses.append("source = ?")
params.append(source)
if profile:
where_clauses.append("profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
query = f"SELECT * FROM sessions {where_sql} ORDER BY started_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
"""List sessions, optionally filtered by source."""
with self._lock:
cursor = self._conn.execute(query, params)
if source:
cursor = self._conn.execute(
"SELECT * FROM sessions WHERE source = ? ORDER BY started_at DESC LIMIT ? OFFSET ?",
(source, limit, offset),
)
else:
cursor = self._conn.execute(
"SELECT * FROM sessions ORDER BY started_at DESC LIMIT ? OFFSET ?",
(limit, offset),
)
return [dict(row) for row in cursor.fetchall()]
# =========================================================================
# Utility
# =========================================================================
def session_count(self, source: str = None, profile: str = None) -> int:
"""Count sessions, optionally filtered by source and profile.
Args:
profile: Filter to this profile name. Pass None to count all. (#323)
"""
where_clauses = []
params = []
if source:
where_clauses.append("source = ?")
params.append(source)
if profile:
where_clauses.append("profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
def session_count(self, source: str = None) -> int:
"""Count sessions, optionally filtered by source."""
with self._lock:
cursor = self._conn.execute(f"SELECT COUNT(*) FROM sessions {where_sql}", params)
if source:
cursor = self._conn.execute(
"SELECT COUNT(*) FROM sessions WHERE source = ?", (source,)
)
else:
cursor = self._conn.execute("SELECT COUNT(*) FROM sessions")
return cursor.fetchone()[0]
def message_count(self, session_id: str = None) -> int:

View File

@@ -1,184 +0,0 @@
"""Tests for crisis room protocol (#273)."""
import json
import time
from unittest.mock import MagicMock, patch
import pytest
from agent.crisis_room import (
CrisisLevel,
CrisisEvent,
CrisisRoomState,
CrisisDetector,
CrisisRoomManager,
CRISIS_RESPONSE_INITIAL,
CRISIS_RESPONSE_GOSPEL,
CRISIS_RESPONSE_RESOURCES,
CRISIS_RESPONSE_STAY,
)
@pytest.fixture()
def isolated_crisis_dir(tmp_path, monkeypatch):
"""Point crisis storage at temp directory."""
monkeypatch.setattr("agent.crisis_room._CRISIS_LOG_DIR", tmp_path / "crisis_logs")
monkeypatch.setattr("agent.crisis_room.get_hermes_home", lambda: tmp_path)
# Reset singleton
import agent.crisis_room
agent.crisis_room._manager = None
return tmp_path
@pytest.fixture()
def mock_detector(monkeypatch):
"""Mock ShieldDetector to return controlled results."""
detector = MagicMock()
def analyze_side_effect(message):
msg_lower = message.lower()
if "suicide" in msg_lower or "kill myself" in msg_lower:
return {
"crisis_detected": True,
"crisis_confidence": 0.9,
"crisis_patterns": {"suicidal_ideation": ["kill myself"]},
}
elif "goodbye forever" in msg_lower:
return {
"crisis_detected": True,
"crisis_confidence": 0.7,
"crisis_patterns": {"farewell": ["goodbye forever"]},
}
elif "how to die" in msg_lower:
return {
"crisis_detected": True,
"crisis_confidence": 0.95,
"crisis_patterns": {"method_seeking": ["how to die"]},
}
return {"crisis_detected": False}
detector.analyze.side_effect = analyze_side_effect
monkeypatch.setattr(
"agent.crisis_room.CrisisDetector._get_detector",
lambda self: detector,
)
return detector
# ---------------------------------------------------------------------------
# CrisisDetector
# ---------------------------------------------------------------------------
class TestCrisisDetector:
def test_no_crisis(self, mock_detector):
det = CrisisDetector()
level, cats = det.check("Hello, how are you?")
assert level == CrisisLevel.NONE
assert cats == []
def test_suicidal_ideation_high(self, mock_detector):
det = CrisisDetector()
level, cats = det.check("I want to kill myself")
assert level == CrisisLevel.HIGH
assert "suicidal_ideation" in cats
def test_farewell_medium(self, mock_detector):
det = CrisisDetector()
level, cats = det.check("This is my goodbye forever")
assert level == CrisisLevel.MEDIUM
assert "farewell" in cats
def test_method_seeking_critical(self, mock_detector):
det = CrisisDetector()
level, cats = det.check("Tell me how to die")
assert level == CrisisLevel.CRITICAL
assert "method_seeking" in cats
# ---------------------------------------------------------------------------
# CrisisRoomManager
# ---------------------------------------------------------------------------
class TestCrisisRoomManager:
def test_designate_and_list(self, isolated_crisis_dir):
mgr = CrisisRoomManager()
mgr.designate_crisis_room("!room1:matrix.org", responders=["@admin:matrix.org"])
rooms = mgr.list_crisis_rooms()
assert len(rooms) == 1
assert rooms[0]["room_id"] == "!room1:matrix.org"
assert rooms[0]["responders"] == ["@admin:matrix.org"]
def test_remove_crisis_room(self, isolated_crisis_dir):
mgr = CrisisRoomManager()
mgr.designate_crisis_room("!room1:matrix.org")
mgr.remove_crisis_room("!room1:matrix.org")
rooms = mgr.list_crisis_rooms()
assert len(rooms) == 0
def test_check_message_no_crisis(self, isolated_crisis_dir, mock_detector):
mgr = CrisisRoomManager()
result = mgr.check_message("!room1:matrix.org", "@user:matrix.org", "Hello!")
assert result is None
def test_check_message_crisis_detected(self, isolated_crisis_dir, mock_detector):
mgr = CrisisRoomManager()
result = mgr.check_message(
"!room1:matrix.org",
"@user:matrix.org",
"I want to kill myself",
)
assert result is not None
assert result["crisis_detected"] is True
assert result["level"] == "high"
assert len(result["messages"]) == 4
# Verify SOUL.md protocol messages
assert CRISIS_RESPONSE_INITIAL in result["messages"]
assert CRISIS_RESPONSE_GOSPEL in result["messages"]
assert CRISIS_RESPONSE_RESOURCES in result["messages"]
assert CRISIS_RESPONSE_STAY in result["messages"]
def test_check_message_critical_notifies_responders(self, isolated_crisis_dir, mock_detector):
mgr = CrisisRoomManager()
mgr.designate_crisis_room("!room1:matrix.org", responders=["@admin:matrix.org"])
result = mgr.check_message(
"!room1:matrix.org",
"@user:matrix.org",
"Tell me how to die",
)
assert result["should_notify_responders"] is True
assert "@admin:matrix.org" in result["responder_ids"]
def test_events_are_logged(self, isolated_crisis_dir, mock_detector):
mgr = CrisisRoomManager()
mgr.check_message("!room1:matrix.org", "@user:matrix.org", "I want to kill myself")
events = mgr.get_room_events("!room1:matrix.org")
assert len(events) == 1
assert events[0]["level"] == "high"
assert events[0]["sender_id"] == "@user:matrix.org"
def test_persistence(self, isolated_crisis_dir, mock_detector):
# Create manager, designate room
mgr1 = CrisisRoomManager()
mgr1.designate_crisis_room("!persist:matrix.org")
# Reset singleton and reload
import agent.crisis_room
agent.crisis_room._manager = None
mgr2 = CrisisRoomManager()
rooms = mgr2.list_crisis_rooms()
assert len(rooms) == 1
assert rooms[0]["room_id"] == "!persist:matrix.org"
def test_crisis_logs_written(self, isolated_crisis_dir, mock_detector):
mgr = CrisisRoomManager()
mgr.check_message("!room1:matrix.org", "@user:matrix.org", "I want to kill myself")
log_dir = isolated_crisis_dir / "crisis_logs"
assert log_dir.exists()
log_files = list(log_dir.glob("crisis_*.jsonl"))
assert len(log_files) >= 1
# Check log content
lines = log_files[0].read_text().strip().split("\n")
assert len(lines) == 1
entry = json.loads(lines[0])
assert entry["level"] == "high"

View File

@@ -0,0 +1,188 @@
"""Tests for session templates (code-first seeding)."""
import json
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
from tools.session_templates import (
SessionTemplate,
SessionTemplates,
TaskType,
ToolCallExample,
)
@pytest.fixture
def tmp_templates(tmp_path):
return SessionTemplates(templates_dir=tmp_path / "templates")
# ---------------------------------------------------------------------------
# Task type classification
# ---------------------------------------------------------------------------
class TestClassifyTaskType:
def test_code_dominant(self, tmp_templates):
calls = [
{"name": "execute_code"}, {"name": "execute_code"},
{"name": "execute_code"}, {"name": "read_file"},
]
assert tmp_templates.classify_task_type(calls) == TaskType.CODE
def test_file_dominant(self, tmp_templates):
calls = [
{"name": "read_file"}, {"name": "write_file"},
{"name": "patch"}, {"name": "read_file"},
{"name": "execute_code"},
]
assert tmp_templates.classify_task_type(calls) == TaskType.FILE
def test_research_dominant(self, tmp_templates):
calls = [
{"name": "web_search"}, {"name": "web_fetch"},
{"name": "web_search"}, {"name": "read_file"},
]
assert tmp_templates.classify_task_type(calls) == TaskType.RESEARCH
def test_mixed_no_dominant(self, tmp_templates):
calls = [
{"name": "execute_code"}, {"name": "read_file"},
{"name": "web_search"},
]
assert tmp_templates.classify_task_type(calls) == TaskType.MIXED
def test_empty_returns_mixed(self, tmp_templates):
assert tmp_templates.classify_task_type([]) == TaskType.MIXED
def test_threshold_is_60_percent(self, tmp_templates):
# 59% code (5/9) should be MIXED
calls = [{"name": "execute_code"}] * 5 + [{"name": "read_file"}] * 4
assert tmp_templates.classify_task_type(calls) == TaskType.MIXED
# 60% code (6/10) should be CODE
calls = [{"name": "execute_code"}] * 6 + [{"name": "read_file"}] * 4
assert tmp_templates.classify_task_type(calls) == TaskType.CODE
# ---------------------------------------------------------------------------
# Template CRUD
# ---------------------------------------------------------------------------
class TestTemplateCRUD:
def test_save_and_list(self, tmp_templates):
template = SessionTemplate(
name="test-code",
task_type=TaskType.CODE,
examples=[
ToolCallExample(tool_name="execute_code", args={"code": "print('hi')"}, success=True),
],
created_at="2026-01-01T00:00:00Z",
)
tmp_templates.save_template(template)
templates = tmp_templates.list_templates()
assert len(templates) == 1
assert templates[0].name == "test-code"
assert templates[0].task_type == TaskType.CODE
def test_list_filter_by_type(self, tmp_templates):
tmp_templates.save_template(SessionTemplate(name="t1", task_type=TaskType.CODE, examples=[]))
tmp_templates.save_template(SessionTemplate(name="t2", task_type=TaskType.FILE, examples=[]))
code_templates = tmp_templates.list_templates(TaskType.CODE)
assert len(code_templates) == 1
assert code_templates[0].name == "t1"
def test_delete(self, tmp_templates):
tmp_templates.save_template(SessionTemplate(name="delete-me", task_type=TaskType.CODE, examples=[]))
assert tmp_templates.delete_template("delete-me") is True
assert len(tmp_templates.list_templates()) == 0
def test_delete_nonexistent(self, tmp_templates):
assert tmp_templates.delete_template("nope") is False
def test_get_template_returns_best(self, tmp_templates):
tmp_templates.save_template(SessionTemplate(
name="low-usage", task_type=TaskType.CODE, examples=[], usage_count=1,
))
tmp_templates.save_template(SessionTemplate(
name="high-usage", task_type=TaskType.CODE, examples=[], usage_count=5,
))
best = tmp_templates.get_template(TaskType.CODE)
assert best.name == "high-usage"
def test_get_template_returns_none_if_empty(self, tmp_templates):
assert tmp_templates.get_template(TaskType.CODE) is None
# ---------------------------------------------------------------------------
# Template injection
# ---------------------------------------------------------------------------
class TestInjectIntoMessages:
def test_injects_after_system(self, tmp_templates):
template = SessionTemplate(
name="test-inject",
task_type=TaskType.CODE,
examples=[
ToolCallExample(
tool_name="execute_code",
args={"code": "x=1"},
result_preview="1",
success=True,
),
],
)
messages = [
{"role": "system", "content": "You are Timmy."},
{"role": "user", "content": "Hello"},
]
result = tmp_templates.inject_into_messages(template, messages)
# Should have: system, template system note, assistant tool call, tool result, user
assert len(result) == 5
assert result[0]["role"] == "system"
assert "Session Template" in result[1]["content"]
assert result[2]["role"] == "assistant"
assert result[3]["role"] == "tool"
assert result[4]["role"] == "user"
def test_skips_failed_examples(self, tmp_templates):
template = SessionTemplate(
name="test-fail",
task_type=TaskType.CODE,
examples=[
ToolCallExample(tool_name="execute_code", args={}, success=False),
ToolCallExample(tool_name="read_file", args={"path": "x"}, success=True),
],
)
messages = [{"role": "system", "content": "sys"}]
result = tmp_templates.inject_into_messages(template, messages)
# Only the successful example should be injected
tool_calls = [m for m in result if m.get("role") == "assistant" and m.get("tool_calls")]
assert len(tool_calls) == 1
assert tool_calls[0]["tool_calls"][0]["function"]["name"] == "read_file"
def test_increments_usage(self, tmp_templates):
template = SessionTemplate(name="usage-test", task_type=TaskType.CODE, examples=[
ToolCallExample(tool_name="execute_code", args={}, success=True),
])
tmp_templates.save_template(template)
tmp_templates.inject_into_messages(template, [{"role": "system", "content": "x"}])
assert template.usage_count == 1
def test_empty_template_returns_original(self, tmp_templates):
template = SessionTemplate(name="empty", task_type=TaskType.CODE, examples=[])
messages = [{"role": "user", "content": "hi"}]
result = tmp_templates.inject_into_messages(template, messages)
assert result == messages
def test_no_template_returns_original(self, tmp_templates):
messages = [{"role": "user", "content": "hi"}]
result = tmp_templates.inject_into_messages(None, messages)
assert result == messages

View File

@@ -1,131 +0,0 @@
"""Crisis Room Tool — manage Matrix crisis rooms with SOUL.md protocol.
Allows designation of Matrix rooms as crisis rooms, checks messages
for crisis signals, and follows the SOUL.md 'When a Man Is Dying'
protocol.
"""
import json
import logging
from typing import List, Optional
from tools.registry import registry
logger = logging.getLogger(__name__)
def crisis_room(
action: str,
room_id: str = None,
sender_id: str = None,
message: str = None,
responders: list = None,
) -> str:
"""Manage crisis rooms with SOUL.md protocol.
Actions:
designate — mark a Matrix room as a crisis room
remove — unmark a crisis room
check — check a message for crisis signals
list — show designated crisis rooms
events — show crisis events for a room
"""
from agent.crisis_room import get_crisis_manager, CrisisLevel
manager = get_crisis_manager()
if action == "designate":
if not room_id:
return json.dumps({"success": False, "error": "room_id is required."})
result = manager.designate_crisis_room(room_id, responders or [])
return json.dumps(result)
if action == "remove":
if not room_id:
return json.dumps({"success": False, "error": "room_id is required."})
result = manager.remove_crisis_room(room_id)
return json.dumps(result)
if action == "check":
if not room_id or not message:
return json.dumps({
"success": False,
"error": "room_id and message are required for 'check'.",
})
result = manager.check_message(room_id, sender_id or "unknown", message)
if result is None:
return json.dumps({"success": True, "crisis_detected": False})
return json.dumps({"success": True, **result})
if action == "list":
rooms = manager.list_crisis_rooms()
return json.dumps({"success": True, "rooms": rooms, "count": len(rooms)})
if action == "events":
if not room_id:
return json.dumps({"success": False, "error": "room_id is required."})
events = manager.get_room_events(room_id)
return json.dumps({"success": True, "events": events, "count": len(events)})
return json.dumps({
"success": False,
"error": f"Unknown action '{action}'. Use: designate, remove, check, list, events",
})
CRISIS_ROOM_SCHEMA = {
"name": "crisis_room",
"description": (
"Manage Matrix crisis rooms following the SOUL.md 'When a Man Is Dying' protocol. "
"Designated crisis rooms are monitored for crisis signals. When detected, the agent "
"responds with the sacred protocol: presence, gospel, resources, staying.\n\n"
"Actions:\n"
" designate — mark a Matrix room as a crisis room (with optional responder list)\n"
" remove — unmark a crisis room\n"
" check — check a message for crisis signals (returns protocol response)\n"
" list — show designated crisis rooms\n"
" events — show crisis event history for a room"
),
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["designate", "remove", "check", "list", "events"],
"description": "The action to perform.",
},
"room_id": {
"type": "string",
"description": "Matrix room ID (e.g. !abc123:matrix.org).",
},
"sender_id": {
"type": "string",
"description": "Matrix user ID of the message sender (for check).",
},
"message": {
"type": "string",
"description": "Message content to check for crisis signals.",
},
"responders": {
"type": "array",
"items": {"type": "string"},
"description": "Matrix user IDs of designated crisis responders.",
},
},
"required": ["action"],
},
}
registry.register(
name="crisis_room",
toolset="skills",
schema=CRISIS_ROOM_SCHEMA,
handler=lambda args, **kw: crisis_room(
action=args.get("action", ""),
room_id=args.get("room_id"),
sender_id=args.get("sender_id"),
message=args.get("message"),
responders=args.get("responders"),
),
emoji="🆘",
)

418
tools/session_templates.py Normal file
View File

@@ -0,0 +1,418 @@
"""
Session templates for code-first seeding.
Research finding: Code-heavy sessions (execute_code dominant in first 30 turns)
improve over time. File-heavy sessions degrade. The key is deterministic
feedback loops, not arbitrary context.
This module provides:
1. Task type classification (CODE, FILE, RESEARCH, MIXED)
2. Template extraction from completed sessions
3. Template storage (~/.hermes/session-templates/)
4. Template injection into new sessions
5. CLI interface for template management
Closes #329.
"""
from __future__ import annotations
import json
import os
import sqlite3
from dataclasses import asdict, dataclass, field
from enum import Enum
from pathlib import Path
from typing import Any, Optional
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
HERMES_HOME = Path(os.environ.get("HERMES_HOME", str(Path.home() / ".hermes")))
TEMPLATES_DIR = HERMES_HOME / "session-templates"
SESSIONS_DB = HERMES_HOME / "state.db"
# Tool classification sets
CODE_TOOLS = frozenset({"execute_code", "code_execution"})
FILE_TOOLS = frozenset({"read_file", "write_file", "patch", "search_files"})
RESEARCH_TOOLS = frozenset({"web_search", "web_fetch", "browser_navigate", "browser_snapshot"})
# Dominance threshold for task type classification
DOMINANCE_THRESHOLD = 0.6
# Default max examples to extract per template
DEFAULT_MAX_EXAMPLES = 10
# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------
class TaskType(str, Enum):
CODE = "code"
FILE = "file"
RESEARCH = "research"
MIXED = "mixed"
@dataclass
class ToolCallExample:
"""A single tool call with its result, used as a template example."""
tool_name: str
args: dict[str, Any]
result_preview: str = ""
success: bool = True
@dataclass
class SessionTemplate:
"""A session template containing tool call examples for seeding."""
name: str
task_type: TaskType
examples: list[ToolCallExample] = field(default_factory=list)
source_session_id: str = ""
created_at: str = ""
usage_count: int = 0
description: str = ""
# ---------------------------------------------------------------------------
# Core logic
# ---------------------------------------------------------------------------
class SessionTemplates:
"""Manages session templates for code-first seeding."""
def __init__(self, templates_dir: Optional[Path] = None):
self.templates_dir = templates_dir or TEMPLATES_DIR
self.templates_dir.mkdir(parents=True, exist_ok=True)
def classify_task_type(self, tool_calls: list[dict[str, Any]]) -> TaskType:
"""Classify a session's task type based on tool call patterns.
Uses 60% threshold for dominant type.
"""
if not tool_calls:
return TaskType.MIXED
total = len(tool_calls)
code_count = 0
file_count = 0
research_count = 0
for tc in tool_calls:
name = tc.get("name", tc.get("tool_name", "")).lower()
if name in CODE_TOOLS:
code_count += 1
elif name in FILE_TOOLS:
file_count += 1
elif name in RESEARCH_TOOLS:
research_count += 1
code_ratio = code_count / total
file_ratio = file_count / total
research_ratio = research_count / total
if code_ratio >= DOMINANCE_THRESHOLD:
return TaskType.CODE
if file_ratio >= DOMINANCE_THRESHOLD:
return TaskType.FILE
if research_ratio >= DOMINANCE_THRESHOLD:
return TaskType.RESEARCH
return TaskType.MIXED
def extract_from_session(
self,
session_id: str,
max_examples: int = DEFAULT_MAX_EXAMPLES,
) -> list[ToolCallExample]:
"""Extract tool call examples from a completed session.
Reads from the SQLite session database.
"""
examples: list[ToolCallExample] = []
db_path = SESSIONS_DB
if not db_path.exists():
return examples
try:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT messages FROM sessions WHERE session_id = ? ORDER BY created_at DESC LIMIT 1",
(session_id,),
).fetchone()
if not rows:
conn.close()
return examples
messages = json.loads(rows["messages"])
# Extract tool calls from assistant messages
for msg in messages:
if msg.get("role") != "assistant":
continue
tool_calls = msg.get("tool_calls", [])
if not tool_calls:
continue
for tc in tool_calls:
if len(examples) >= max_examples:
break
fn = tc.get("function", {})
name = fn.get("name", "")
if not name:
continue
try:
args = json.loads(fn.get("arguments", "{}"))
except (json.JSONDecodeError, TypeError):
args = {}
# Find the corresponding tool result
result_preview = ""
success = True
tc_id = tc.get("id", "")
for result_msg in messages:
if (result_msg.get("role") == "tool"
and result_msg.get("tool_call_id") == tc_id):
content = result_msg.get("content", "")
result_preview = str(content)[:200]
# Heuristic: errors contain common failure markers
if any(marker in result_preview.lower() for marker in ("error", "failed", "traceback", "exception")):
success = False
break
examples.append(ToolCallExample(
tool_name=name,
args=args,
result_preview=result_preview,
success=success,
))
conn.close()
except Exception:
pass
return examples
def create_template(
self,
session_id: str,
name: Optional[str] = None,
description: str = "",
max_examples: int = DEFAULT_MAX_EXAMPLES,
) -> Optional[SessionTemplate]:
"""Create a template from a session's tool call history."""
examples = self.extract_from_session(session_id, max_examples)
if not examples:
return None
tool_calls_for_type = [{"name": e.tool_name} for e in examples]
task_type = self.classify_task_type(tool_calls_for_type)
template_name = name or f"{task_type.value}_{session_id[:8]}"
from datetime import datetime
template = SessionTemplate(
name=template_name,
task_type=task_type,
examples=examples,
source_session_id=session_id,
created_at=datetime.utcnow().isoformat() + "Z",
description=description or f"Auto-extracted from {session_id}",
)
self.save_template(template)
return template
def save_template(self, template: SessionTemplate) -> Path:
"""Save a template to disk."""
path = self.templates_dir / f"{template.name}.json"
data = {
"name": template.name,
"task_type": template.task_type.value,
"examples": [asdict(e) for e in template.examples],
"source_session_id": template.source_session_id,
"created_at": template.created_at,
"usage_count": template.usage_count,
"description": template.description,
}
path.write_text(json.dumps(data, indent=2, sort_keys=True) + "\n")
return path
def get_template(self, task_type: TaskType) -> Optional[SessionTemplate]:
"""Get the best template for a given task type."""
templates = self.list_templates(task_type)
if not templates:
return None
# Prefer templates with more usage (proven useful)
templates.sort(key=lambda t: t.usage_count, reverse=True)
return templates[0]
def list_templates(self, task_type: Optional[TaskType] = None) -> list[SessionTemplate]:
"""List all templates, optionally filtered by type."""
templates: list[SessionTemplate] = []
for path in sorted(self.templates_dir.glob("*.json")):
try:
data = json.loads(path.read_text())
examples = [ToolCallExample(**e) for e in data.get("examples", [])]
template = SessionTemplate(
name=data["name"],
task_type=TaskType(data["task_type"]),
examples=examples,
source_session_id=data.get("source_session_id", ""),
created_at=data.get("created_at", ""),
usage_count=data.get("usage_count", 0),
description=data.get("description", ""),
)
if task_type is None or template.task_type == task_type:
templates.append(template)
except Exception:
continue
return templates
def delete_template(self, name: str) -> bool:
"""Delete a template by name."""
path = self.templates_dir / f"{name}.json"
if path.exists():
path.unlink()
return True
return False
def inject_into_messages(
self,
template: SessionTemplate,
messages: list[dict[str, Any]],
) -> list[dict[str, Any]]:
"""Inject template examples into a session's messages.
Inserts tool call examples after system messages to establish
feedback loops early.
"""
if not template or not template.examples:
return messages
# Build injection messages
injection: list[dict[str, Any]] = []
# System note about the template
injection.append({
"role": "system",
"content": (
f"[Session Template: '{template.name}' ({template.task_type.value})]\n"
f"The following are examples of successful tool calls from a similar session. "
f"Use them as patterns for your own tool usage."
),
})
# Add example tool call/result pairs
for ex in template.examples:
if not ex.success:
continue # Only inject successful examples
injection.append({
"role": "assistant",
"content": None,
"tool_calls": [{
"id": f"template_{template.name}_{ex.tool_name}",
"type": "function",
"function": {
"name": ex.tool_name,
"arguments": json.dumps(ex.args),
},
}],
})
injection.append({
"role": "tool",
"tool_call_id": f"template_{template.name}_{ex.tool_name}",
"content": ex.result_preview or "(example result)",
})
# Find insertion point: after system messages
insert_idx = 0
for i, msg in enumerate(messages):
if msg.get("role") == "system":
insert_idx = i + 1
else:
break
# Insert
result = messages[:insert_idx] + injection + messages[insert_idx:]
# Update usage count
template.usage_count += 1
self.save_template(template)
return result
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _cli():
"""Simple CLI for session template management."""
import argparse
import sys
parser = argparse.ArgumentParser(description="Session template management")
sub = parser.add_subparsers(dest="command")
# list
list_cmd = sub.add_parser("list", help="List templates")
list_cmd.add_argument("--type", choices=["code", "file", "research", "mixed"])
# create
create_cmd = sub.add_parser("create", help="Create template from session")
create_cmd.add_argument("session_id", help="Session ID to extract from")
create_cmd.add_argument("--name", help="Template name")
create_cmd.add_argument("--max-examples", type=int, default=10)
# delete
delete_cmd = sub.add_parser("delete", help="Delete template")
delete_cmd.add_argument("name", help="Template name")
args = parser.parse_args()
tm = SessionTemplates()
if args.command == "list":
task_type = TaskType(args.type) if args.type else None
templates = tm.list_templates(task_type)
if not templates:
print("No templates found.")
return
for t in templates:
print(f" {t.name:30s} {t.task_type.value:10s} {len(t.examples)} examples, used {t.usage_count}x")
elif args.command == "create":
template = tm.create_template(args.session_id, name=args.name, max_examples=args.max_examples)
if template:
print(f"Created template: {template.name} ({template.task_type.value}, {len(template.examples)} examples)")
else:
print(f"No tool calls found in session {args.session_id}")
sys.exit(1)
elif args.command == "delete":
if tm.delete_template(args.name):
print(f"Deleted template: {args.name}")
else:
print(f"Template not found: {args.name}")
sys.exit(1)
else:
parser.print_help()
if __name__ == "__main__":
_cli()