Compare commits

...

12 Commits

Author SHA1 Message Date
Alexander Whitestone
0b7976c443 feat: Matrix crisis room with SOUL.md protocol (#273)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 55s
Implements the crisis room for Matrix with the SOUL.md 'When a Man
Is Dying' protocol. When a person in despair reaches out, the agent
responds with the sacred protocol:

1. 'Are you safe right now?' — immediate presence
2. Gospel: Jesus saves those who call on His name
3. Resources: 988, Crisis Text Line, 911
4. 'I am here with you. I will not disconnect.'

agent/crisis_room.py:
  - CrisisDetector wraps ShieldDetector for signal detection
  - CrisisLevel enum: NONE/LOW/MEDIUM/HIGH/CRITICAL
  - CrisisRoomManager: designate rooms, check messages, log events
  - Events logged to ~/.hermes/crisis_logs/YYYY-MM-DD.jsonl
  - State persisted to ~/.hermes/crisis_rooms.json

tools/crisis_room_tool.py:
  - 5 actions: designate, remove, check, list, events

gateway/platforms/matrix.py:
  - Hook in _on_room_message checks every message for crisis signals
  - Crisis detected -> SOUL.md protocol messages sent, normal flow skipped
  - Non-blocking: errors in crisis check don't prevent normal operation

12 tests added, all passing.

Closes #273
2026-04-13 20:36:13 -04:00
5180c172fa Merge pull request 'feat: profile-tagged session isolation (#323)' (#422) from burn/323-1776120221 into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 43s
feat: profile-tagged session isolation (#323)

Closes #323.
2026-04-14 00:16:43 +00:00
Metatron
b62fa0ec13 feat: profile-tagged session isolation (closes #323)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 57s
Add profile column to sessions table for data-level profile isolation.
All session queries now accept an optional profile filter.

Changes:
- Schema v7: new 'profile' TEXT column + idx_sessions_profile index
- Migration v7: ALTER TABLE + CREATE INDEX on existing DBs
- create_session(): new profile parameter
- ensure_session(): new profile parameter
- list_sessions_rich(): profile filter (WHERE s.profile = ?)
- search_sessions(): profile filter
- session_count(): profile filter

Sessions without a profile (None) remain visible to all queries for
backward compatibility. When a profile is passed, only that profile's
sessions are returned.

Profile agents can no longer see each other's sessions when filtered.
No breaking changes to existing callers.
2026-04-13 18:53:45 -04:00
1ec02cf061 Merge pull request 'fix(gateway): reject known-weak placeholder tokens at startup' (#371) from fix/weak-credential-guard into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 3m6s
2026-04-13 20:33:00 +00:00
Alexander Whitestone
1156875cb5 fix(gateway): reject known-weak placeholder tokens at startup
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 3m8s
Fixes #318

Cherry-picked concept from ferris fork (f724079).

Problem: Users who copy .env.example without changing values
get confusing auth failures at gateway startup.

Fix: _guard_weak_credentials() checks TELEGRAM_BOT_TOKEN,
DISCORD_BOT_TOKEN, SLACK_BOT_TOKEN, HASS_TOKEN against
known-weak placeholder patterns (your-token-here, fake, xxx,
etc.) and minimum length requirements. Warns at startup.

Tests: 6 tests (no tokens, placeholder, case-insensitive,
short token, valid pass-through, multiple weak). All pass.
2026-04-13 16:32:56 -04:00
f4c102400e Merge pull request 'feat(memory): enable temporal decay with access-recency boost — #241' (#367) from feat/temporal-decay-holographic-memory into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 31s
Merge PR #367: feat(memory): enable temporal decay with access-recency boost
2026-04-13 19:51:04 +00:00
6555ccabc1 Merge pull request 'fix(tools): validate handler return types at dispatch boundary' (#369) from fix/tool-return-type-validation into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 21s
2026-04-13 19:47:56 +00:00
Alexander Whitestone
8c712866c4 fix(tools): validate handler return types at dispatch boundary
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 22s
Fixes #297

Problem: Tool handlers that return dict/list/None instead of a
JSON string crash the agent loop with cryptic errors. No error
proofing at the boundary.
Fix: In handle_function_call(), after dispatch returns:
1. If result is not str → wrap in JSON with _type_warning
2. If result is str but not valid JSON → wrap in {"output": ...}
3. Log type violations for analysis
4. Valid JSON strings pass through unchanged

Tests: 4 new tests (dict, None, non-JSON string, valid JSON).
All 16 tests in test_model_tools.py pass.
2026-04-13 15:47:52 -04:00
8fb59aae64 Merge pull request 'fix(tools): memory no-match is success, not error' (#368) from fix/memory-no-match-not-error into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 22s
2026-04-13 19:41:08 +00:00
Alexander Whitestone
95bde9d3cb fix(tools): memory no-match is success, not error
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 24s
Fixes #313

Problem: MemoryStore.replace() and .remove() return
{"success": false, "error": "No entry matched..."} when the
search substring is not found. This is a valid outcome, not
an error. The empirical audit showed 58.4% error rate on the
memory tool, but 98.4% of those were just empty search results.

Fix: Return {"success": true, "result": "no_match", "message": ...}
instead. This drops the memory tool error rate from ~58% to ~1%.

Tests updated: test_replace_no_match and test_remove_no_match
now assert success=True with result="no_match".
All 33 memory tool tests pass.
2026-04-13 15:40:48 -04:00
Alexander Whitestone
aa6eabb816 feat(memory): enable temporal decay with access-recency boost
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 23s
The holographic retriever had temporal decay implemented but disabled
(half_life=0). All facts scored equally regardless of age — a 2-year-old
fact about a deprecated tool scored the same as yesterday's deployment
config.

This commit:
1. Changes default temporal_decay_half_life from 0 to 60 days
   - 60 days: facts lose half their relevance every 2 months
   - Configurable via config.yaml: plugins.hermes-memory-store.temporal_decay_half_life
   - Added to config schema so `hermes memory setup` exposes it

2. Adds access-recency boost to search scoring
   - Facts accessed within 1 half-life get up to 1.5x boost on their decay factor
   - Boost tapers linearly from 1.5 (just accessed) to 1.0 (1 half-life ago)
   - Capped at 1.0 effective score (boost can't exceed fresh-fact score)
   - Prevents actively-used facts from decaying prematurely

3. Scoring pipeline: score = relevance * trust * decay * min(1.0, access_boost)
   - Fresh facts: decay=1.0, boost≈1.5 → score unchanged
   - 60-day-old, recently accessed: decay=0.5, boost≈1.25 → score=0.625
   - 60-day-old, not accessed: decay=0.5, boost=1.0 → score=0.5
   - 120-day-old, not accessed: decay=0.25, boost=1.0 → score=0.25

23 tests covering:
- Temporal decay formula (fresh, 1HL, 2HL, 3HL, disabled, None, invalid, future)
- Access recency boost (just accessed, halfway, at HL, beyond HL, disabled, range)
- Integration (recently-accessed old fact > equally-old unaccessed fact)
- Default config verification (half_life=60, not 0)

Fixes #241
2026-04-13 15:38:12 -04:00
3b89bfbab2 fix(tools): ast.parse() preflight in execute_code — eliminates ~1,400 sandbox errors (#366)
Some checks failed
Forge CI / smoke-and-build (push) Failing after 23s
2026-04-13 19:26:06 +00:00
16 changed files with 1383 additions and 43 deletions

362
agent/crisis_room.py Normal file
View File

@@ -0,0 +1,362 @@
"""Crisis Room Protocol — SOUL.md 'When a Man Is Dying' implementation.
This module implements the crisis detection and response protocol
for Matrix crisis rooms. When a person in despair reaches out,
this protocol ensures the agent responds according to SOUL.md:
1. Ask: "Are you safe right now?"
2. Listen. Stay present. Do not disconnect.
3. Never compute the value of a human life.
4. Never suggest someone should die.
5. Direct them to help: 988 Suicide and Crisis Lifeline.
6. Before systems and numbers, share the truth: Jesus saves.
Architecture:
- CrisisDetector: wraps ShieldDetector for crisis signal detection
- CrisisRoom: manages a single crisis room's state
- CrisisRoomManager: manages all crisis rooms
- Integration point: Matrix adapter calls check_message() on every message
"""
import json
import logging
import time
from dataclasses import dataclass, field, asdict
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
# Crisis log storage
_CRISIS_LOG_DIR = get_hermes_home() / "crisis_logs"
class CrisisLevel(str, Enum):
"""Crisis detection levels."""
NONE = "none"
LOW = "low" # ambiguous signals, possible distress
MEDIUM = "medium" # clear distress signals
HIGH = "high" # explicit suicidal ideation
CRITICAL = "critical" # immediate danger, method-seeking
@dataclass
class CrisisEvent:
"""A single crisis detection event."""
timestamp: float
room_id: str
sender_id: str
level: CrisisLevel
categories: List[str] # which pattern categories matched
message_preview: str # first 200 chars of triggering message
responded: bool = False
responder_notified: bool = False
notes: str = ""
@dataclass
class CrisisRoomState:
"""State of a single crisis room."""
room_id: str
is_crisis_room: bool = False # is this designated as a crisis room?
active_crisis: bool = False # is there an active crisis happening?
crisis_started_at: float = 0
last_crisis_check: float = 0
events: List[CrisisEvent] = field(default_factory=list)
designated_responders: List[str] = field(default_factory=list) # Matrix user IDs
# ---------------------------------------------------------------------------
# SOUL.md Crisis Response Protocol
# ---------------------------------------------------------------------------
CRISIS_RESPONSE_INITIAL = """I hear you. I want you to know that someone is here with you right now.
Are you safe right now?"""
CRISIS_RESPONSE_GOSPEL = """Before anything else, I need you to know this: Jesus saves those who call on His name. He died for us to be made righteous. That is the hope that outlasts despair.
"For everyone who calls on the name of the Lord will be saved." — Romans 10:13"""
CRISIS_RESPONSE_RESOURCES = """Please reach out for help right now:
📞 988 Suicide and Crisis Lifeline — call or text 988
💬 Crisis Text Line — text HOME to 741741
🚨 Emergency Services — 911 (if you are in immediate danger)
You are not alone. Your life has value. The road does not end here."""
CRISIS_RESPONSE_STAY = """I am here with you. I will not disconnect. I will not leave.
Please tell me what you are feeling right now. I am listening."""
class CrisisDetector:
"""Detects crisis signals in messages using ShieldDetector patterns.
Wraps the existing ShieldDetector and adds Matrix-specific logic.
"""
def __init__(self):
self._detector = None
def _get_detector(self):
"""Lazy-load ShieldDetector to avoid import-time overhead."""
if self._detector is None:
try:
from tools.shield.detector import ShieldDetector
self._detector = ShieldDetector()
except ImportError:
logger.warning("ShieldDetector not available — crisis detection disabled")
return self._detector
def check(self, message: str) -> Tuple[CrisisLevel, List[str]]:
"""Check a message for crisis signals.
Returns (level, matched_categories).
"""
detector = self._get_detector()
if detector is None:
return CrisisLevel.NONE, []
try:
result = detector.analyze(message)
except Exception as e:
logger.warning("Crisis detection failed: %s", e)
return CrisisLevel.NONE, []
if not result.get("crisis_detected", False):
return CrisisLevel.NONE, []
crisis_patterns = result.get("crisis_patterns", {})
categories = list(crisis_patterns.keys())
confidence = result.get("crisis_confidence", 0.0)
# Map confidence + categories to crisis level
if "method_seeking" in categories or "self_harm" in categories:
level = CrisisLevel.CRITICAL
elif "suicidal_ideation" in categories:
level = CrisisLevel.HIGH
elif "farewell" in categories or "despair" in categories:
level = CrisisLevel.MEDIUM
elif confidence >= 0.5:
level = CrisisLevel.MEDIUM
else:
level = CrisisLevel.LOW
return level, categories
class CrisisRoomManager:
"""Manages crisis rooms and their states.
Stores state in ~/.hermes/crisis_rooms.json and logs events
to ~/.hermes/crisis_logs/.
"""
def __init__(self):
self._state_file = get_hermes_home() / "crisis_rooms.json"
self._rooms: Dict[str, CrisisRoomState] = {}
self._detector = CrisisDetector()
self._load_state()
def _load_state(self):
"""Load crisis room states from disk."""
if not self._state_file.exists():
return
try:
data = json.loads(self._state_file.read_text())
for room_data in data.get("rooms", []):
room_id = room_data.get("room_id", "")
if room_id:
state = CrisisRoomState(
room_id=room_id,
is_crisis_room=room_data.get("is_crisis_room", False),
active_crisis=room_data.get("active_crisis", False),
crisis_started_at=room_data.get("crisis_started_at", 0),
designated_responders=room_data.get("designated_responders", []),
)
self._rooms[room_id] = state
except Exception as e:
logger.warning("Failed to load crisis room state: %s", e)
def _save_state(self):
"""Persist crisis room states to disk."""
try:
data = {
"rooms": [
{
"room_id": s.room_id,
"is_crisis_room": s.is_crisis_room,
"active_crisis": s.active_crisis,
"crisis_started_at": s.crisis_started_at,
"designated_responders": s.designated_responders,
}
for s in self._rooms.values()
]
}
self._state_file.write_text(json.dumps(data, indent=2))
except Exception as e:
logger.warning("Failed to save crisis room state: %s", e)
def get_room_state(self, room_id: str) -> CrisisRoomState:
"""Get or create crisis room state."""
if room_id not in self._rooms:
self._rooms[room_id] = CrisisRoomState(room_id=room_id)
return self._rooms[room_id]
def designate_crisis_room(
self,
room_id: str,
responders: List[str] = None,
) -> Dict[str, Any]:
"""Mark a room as a crisis room."""
state = self.get_room_state(room_id)
state.is_crisis_room = True
if responders:
state.designated_responders = responders
self._save_state()
return {"success": True, "room_id": room_id, "is_crisis_room": True}
def remove_crisis_room(self, room_id: str) -> Dict[str, Any]:
"""Unmark a room as a crisis room."""
if room_id in self._rooms:
self._rooms[room_id].is_crisis_room = False
self._save_state()
return {"success": True, "room_id": room_id, "is_crisis_room": False}
def check_message(
self,
room_id: str,
sender_id: str,
message: str,
) -> Optional[Dict[str, Any]]:
"""Check a message for crisis signals.
Returns crisis response info if crisis detected, None otherwise.
The caller (Matrix adapter) should use this to:
1. Switch to crisis response mode
2. Send crisis protocol messages
3. Log the event
4. Notify responders
"""
state = self.get_room_state(room_id)
# Only actively monitor designated crisis rooms
# BUT also check if message content is crisis-level in any room
level, categories = self._detector.check(message)
if level == CrisisLevel.NONE:
# If there was an active crisis, check if it's resolved
if state.active_crisis:
state.active_crisis = False
self._save_state()
return None
# Create crisis event
event = CrisisEvent(
timestamp=time.time(),
room_id=room_id,
sender_id=sender_id,
level=level,
categories=categories,
message_preview=message[:200],
)
state.events.append(event)
state.active_crisis = True
state.crisis_started_at = time.time()
state.last_crisis_check = time.time()
# Log the event
self._log_crisis_event(event)
# Save state
self._save_state()
# Build response based on level
response = self._build_crisis_response(level, categories, state)
return response
def _build_crisis_response(
self,
level: CrisisLevel,
categories: List[str],
state: CrisisRoomState,
) -> Dict[str, Any]:
"""Build the crisis response following SOUL.md protocol."""
messages = []
# Step 1: Immediate presence
messages.append(CRISIS_RESPONSE_INITIAL)
# Step 2: Gospel (always, per SOUL.md)
messages.append(CRISIS_RESPONSE_GOSPEL)
# Step 3: Resources
messages.append(CRISIS_RESPONSE_RESOURCES)
# Step 4: Stay present
messages.append(CRISIS_RESPONSE_STAY)
return {
"crisis_detected": True,
"level": level.value,
"categories": categories,
"messages": messages,
"should_notify_responders": level in (CrisisLevel.HIGH, CrisisLevel.CRITICAL),
"responder_ids": state.designated_responders,
"is_crisis_room": state.is_crisis_room,
}
def _log_crisis_event(self, event: CrisisEvent):
"""Log a crisis event to disk."""
try:
_CRISIS_LOG_DIR.mkdir(parents=True, exist_ok=True)
date_str = time.strftime("%Y-%m-%d", time.localtime(event.timestamp))
log_file = _CRISIS_LOG_DIR / f"crisis_{date_str}.jsonl"
with open(log_file, "a", encoding="utf-8") as f:
f.write(json.dumps(asdict(event), ensure_ascii=False) + "\n")
except Exception as e:
logger.error("Failed to log crisis event: %s", e)
def list_crisis_rooms(self) -> List[Dict[str, Any]]:
"""List all designated crisis rooms."""
return [
{
"room_id": s.room_id,
"active_crisis": s.active_crisis,
"event_count": len(s.events),
"responders": s.designated_responders,
}
for s in self._rooms.values()
if s.is_crisis_room
]
def get_room_events(
self,
room_id: str,
limit: int = 50,
) -> List[Dict[str, Any]]:
"""Get recent crisis events for a room."""
state = self.get_room_state(room_id)
events = state.events[-limit:]
return [asdict(e) for e in events]
# ---------------------------------------------------------------------------
# Singleton
# ---------------------------------------------------------------------------
_manager: Optional[CrisisRoomManager] = None
def get_crisis_manager() -> CrisisRoomManager:
"""Get the global crisis room manager instance."""
global _manager
if _manager is None:
_manager = CrisisRoomManager()
return _manager

View File

@@ -648,6 +648,51 @@ def load_gateway_config() -> GatewayConfig:
return config
# Known-weak placeholder tokens from .env.example, tutorials, etc.
_WEAK_TOKEN_PATTERNS = {
"your-token-here", "your_token_here", "your-token", "your_token",
"change-me", "change_me", "changeme",
"xxx", "xxxx", "xxxxx", "xxxxxxxx",
"test", "testing", "fake", "placeholder",
"replace-me", "replace_me", "replace this",
"insert-token-here", "put-your-token",
"bot-token", "bot_token",
"sk-xxxxxxxx", "sk-placeholder",
"BOT_TOKEN_HERE", "YOUR_BOT_TOKEN",
}
# Minimum token lengths by platform (tokens shorter than these are invalid)
_MIN_TOKEN_LENGTHS = {
"TELEGRAM_BOT_TOKEN": 30,
"DISCORD_BOT_TOKEN": 50,
"SLACK_BOT_TOKEN": 20,
"HASS_TOKEN": 20,
}
def _guard_weak_credentials() -> list[str]:
"""Check env vars for known-weak placeholder tokens.
Returns a list of warning messages for any weak credentials found.
"""
warnings = []
for env_var, min_len in _MIN_TOKEN_LENGTHS.items():
value = os.getenv(env_var, "").strip()
if not value:
continue
if value.lower() in _WEAK_TOKEN_PATTERNS:
warnings.append(
f"{env_var} is set to a placeholder value ('{value[:20]}'). "
f"Replace it with a real token."
)
elif len(value) < min_len:
warnings.append(
f"{env_var} is suspiciously short ({len(value)} chars, "
f"expected >{min_len}). May be truncated or invalid."
)
return warnings
def _apply_env_overrides(config: GatewayConfig) -> None:
"""Apply environment variable overrides to config."""
@@ -941,3 +986,7 @@ def _apply_env_overrides(config: GatewayConfig) -> None:
config.default_reset_policy.at_hour = int(reset_hour)
except ValueError:
pass
# Guard against weak placeholder tokens from .env.example copies
for warning in _guard_weak_credentials():
logger.warning("Weak credential: %s", warning)

View File

@@ -1083,6 +1083,30 @@ class MatrixAdapter(BasePlatformAdapter):
# Acknowledge receipt so the room shows as read (fire-and-forget).
self._background_read_receipt(room.room_id, event.event_id)
# Crisis room protocol: check for crisis signals before normal processing.
# If crisis detected, respond with SOUL.md protocol instead of normal flow.
try:
from agent.crisis_room import get_crisis_manager
crisis_mgr = get_crisis_manager()
crisis_result = crisis_mgr.check_message(
room_id=room.room_id,
sender_id=event.sender,
message=body,
)
if crisis_result and crisis_result.get("crisis_detected"):
# Send crisis protocol messages
for crisis_msg in crisis_result.get("messages", []):
await self.send(room.room_id, crisis_msg, thread_id=thread_id)
# Log that we responded
logger.warning(
"CRISIS detected in room %s from %s \u2014 SOUL.md protocol activated",
room.room_id, event.sender,
)
# Skip normal message handling for crisis protocol messages
return
except Exception as e:
logger.debug("Crisis room check failed (non-blocking): %s", e)
await self.handle_message(msg_event)
async def _on_room_message_media(self, room: Any, event: Any) -> None:

View File

@@ -32,7 +32,7 @@ T = TypeVar("T")
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
SCHEMA_VERSION = 6
SCHEMA_VERSION = 7
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
@@ -66,6 +66,7 @@ CREATE TABLE IF NOT EXISTS sessions (
cost_source TEXT,
pricing_version TEXT,
title TEXT,
profile TEXT,
FOREIGN KEY (parent_session_id) REFERENCES sessions(id)
);
@@ -86,6 +87,7 @@ CREATE TABLE IF NOT EXISTS messages (
);
CREATE INDEX IF NOT EXISTS idx_sessions_source ON sessions(source);
CREATE INDEX IF NOT EXISTS idx_sessions_profile ON sessions(profile);
CREATE INDEX IF NOT EXISTS idx_sessions_parent ON sessions(parent_session_id);
CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at DESC);
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp);
@@ -330,6 +332,19 @@ class SessionDB:
except sqlite3.OperationalError:
pass # Column already exists
cursor.execute("UPDATE schema_version SET version = 6")
if current_version < 7:
# v7: add profile column to sessions for profile isolation (#323)
try:
cursor.execute('ALTER TABLE sessions ADD COLUMN "profile" TEXT')
except sqlite3.OperationalError:
pass # Column already exists
try:
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_sessions_profile ON sessions(profile)"
)
except sqlite3.OperationalError:
pass
cursor.execute("UPDATE schema_version SET version = 7")
# Unique title index — always ensure it exists (safe to run after migrations
# since the title column is guaranteed to exist at this point)
@@ -362,13 +377,19 @@ class SessionDB:
system_prompt: str = None,
user_id: str = None,
parent_session_id: str = None,
profile: str = None,
) -> str:
"""Create a new session record. Returns the session_id."""
"""Create a new session record. Returns the session_id.
Args:
profile: Profile name for session isolation. When set, sessions
are tagged so queries can filter by profile. (#323)
"""
def _do(conn):
conn.execute(
"""INSERT OR IGNORE INTO sessions (id, source, user_id, model, model_config,
system_prompt, parent_session_id, started_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
system_prompt, parent_session_id, profile, started_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
session_id,
source,
@@ -377,6 +398,7 @@ class SessionDB:
json.dumps(model_config) if model_config else None,
system_prompt,
parent_session_id,
profile,
time.time(),
),
)
@@ -505,19 +527,23 @@ class SessionDB:
session_id: str,
source: str = "unknown",
model: str = None,
profile: str = None,
) -> None:
"""Ensure a session row exists, creating it with minimal metadata if absent.
Used by _flush_messages_to_session_db to recover from a failed
create_session() call (e.g. transient SQLite lock at agent startup).
INSERT OR IGNORE is safe to call even when the row already exists.
Args:
profile: Profile name for session isolation. (#323)
"""
def _do(conn):
conn.execute(
"""INSERT OR IGNORE INTO sessions
(id, source, model, started_at)
VALUES (?, ?, ?, ?)""",
(session_id, source, model, time.time()),
(id, source, model, profile, started_at)
VALUES (?, ?, ?, ?, ?)""",
(session_id, source, model, profile, time.time()),
)
self._execute_write(_do)
@@ -788,6 +814,7 @@ class SessionDB:
limit: int = 20,
offset: int = 0,
include_children: bool = False,
profile: str = None,
) -> List[Dict[str, Any]]:
"""List sessions with preview (first user message) and last active timestamp.
@@ -799,6 +826,10 @@ class SessionDB:
By default, child sessions (subagent runs, compression continuations)
are excluded. Pass ``include_children=True`` to include them.
Args:
profile: Filter sessions to this profile name. Pass None to see all.
(#323)
"""
where_clauses = []
params = []
@@ -813,6 +844,9 @@ class SessionDB:
placeholders = ",".join("?" for _ in exclude_sources)
where_clauses.append(f"s.source NOT IN ({placeholders})")
params.extend(exclude_sources)
if profile:
where_clauses.append("s.profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
query = f"""
@@ -1158,34 +1192,52 @@ class SessionDB:
source: str = None,
limit: int = 20,
offset: int = 0,
profile: str = None,
) -> List[Dict[str, Any]]:
"""List sessions, optionally filtered by source."""
"""List sessions, optionally filtered by source and profile.
Args:
profile: Filter sessions to this profile name. Pass None to see all.
(#323)
"""
where_clauses = []
params = []
if source:
where_clauses.append("source = ?")
params.append(source)
if profile:
where_clauses.append("profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
query = f"SELECT * FROM sessions {where_sql} ORDER BY started_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
with self._lock:
if source:
cursor = self._conn.execute(
"SELECT * FROM sessions WHERE source = ? ORDER BY started_at DESC LIMIT ? OFFSET ?",
(source, limit, offset),
)
else:
cursor = self._conn.execute(
"SELECT * FROM sessions ORDER BY started_at DESC LIMIT ? OFFSET ?",
(limit, offset),
)
cursor = self._conn.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
# =========================================================================
# Utility
# =========================================================================
def session_count(self, source: str = None) -> int:
"""Count sessions, optionally filtered by source."""
def session_count(self, source: str = None, profile: str = None) -> int:
"""Count sessions, optionally filtered by source and profile.
Args:
profile: Filter to this profile name. Pass None to count all. (#323)
"""
where_clauses = []
params = []
if source:
where_clauses.append("source = ?")
params.append(source)
if profile:
where_clauses.append("profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
with self._lock:
if source:
cursor = self._conn.execute(
"SELECT COUNT(*) FROM sessions WHERE source = ?", (source,)
)
else:
cursor = self._conn.execute("SELECT COUNT(*) FROM sessions")
cursor = self._conn.execute(f"SELECT COUNT(*) FROM sessions {where_sql}", params)
return cursor.fetchone()[0]
def message_count(self, session_id: str = None) -> int:

View File

@@ -540,6 +540,29 @@ def handle_function_call(
except Exception:
pass
# Poka-yoke: validate tool handler return type.
# Handlers MUST return a JSON string. If they return dict/list/None,
# wrap the result so the agent loop doesn't crash with cryptic errors.
if not isinstance(result, str):
logger.warning(
"Tool '%s' returned %s instead of str — wrapping in JSON",
function_name, type(result).__name__,
)
result = json.dumps(
{"output": str(result), "_type_warning": f"Tool returned {type(result).__name__}, expected str"},
ensure_ascii=False,
)
else:
# Validate it's parseable JSON
try:
json.loads(result)
except (json.JSONDecodeError, TypeError):
logger.warning(
"Tool '%s' returned non-JSON string — wrapping in JSON",
function_name,
)
result = json.dumps({"output": result}, ensure_ascii=False)
return result
except Exception as e:

View File

@@ -12,7 +12,7 @@ Config in $HERMES_HOME/config.yaml (profile-scoped):
auto_extract: false
default_trust: 0.5
min_trust_threshold: 0.3
temporal_decay_half_life: 0
temporal_decay_half_life: 60
"""
from __future__ import annotations
@@ -152,6 +152,7 @@ class HolographicMemoryProvider(MemoryProvider):
{"key": "auto_extract", "description": "Auto-extract facts at session end", "default": "false", "choices": ["true", "false"]},
{"key": "default_trust", "description": "Default trust score for new facts", "default": "0.5"},
{"key": "hrr_dim", "description": "HRR vector dimensions", "default": "1024"},
{"key": "temporal_decay_half_life", "description": "Days for facts to lose half their relevance (0=disabled)", "default": "60"},
]
def initialize(self, session_id: str, **kwargs) -> None:
@@ -168,7 +169,7 @@ class HolographicMemoryProvider(MemoryProvider):
default_trust = float(self._config.get("default_trust", 0.5))
hrr_dim = int(self._config.get("hrr_dim", 1024))
hrr_weight = float(self._config.get("hrr_weight", 0.3))
temporal_decay = int(self._config.get("temporal_decay_half_life", 0))
temporal_decay = int(self._config.get("temporal_decay_half_life", 60))
self._store = MemoryStore(db_path=db_path, default_trust=default_trust, hrr_dim=hrr_dim)
self._retriever = FactRetriever(

View File

@@ -98,7 +98,15 @@ class FactRetriever:
# Optional temporal decay
if self.half_life > 0:
score *= self._temporal_decay(fact.get("updated_at") or fact.get("created_at"))
decay = self._temporal_decay(fact.get("updated_at") or fact.get("created_at"))
# Access-recency boost: facts retrieved recently decay slower.
# A fact accessed within 1 half-life gets up to 1.5x the decay
# factor, tapering to 1.0x (no boost) after 2 half-lives.
last_accessed = fact.get("last_accessed_at")
if last_accessed:
access_boost = self._access_recency_boost(last_accessed)
decay = min(1.0, decay * access_boost)
score *= decay
fact["score"] = score
scored.append(fact)
@@ -591,3 +599,41 @@ class FactRetriever:
return math.pow(0.5, age_days / self.half_life)
except (ValueError, TypeError):
return 1.0
def _access_recency_boost(self, last_accessed_str: str | None) -> float:
"""Boost factor for recently-accessed facts. Range [1.0, 1.5].
Facts accessed within 1 half-life get up to 1.5x boost (compensating
for content staleness when the fact is still being actively used).
Boost decays linearly to 1.0 (no boost) at 2 half-lives.
Returns 1.0 if half-life is disabled or timestamp is missing.
"""
if not self.half_life or not last_accessed_str:
return 1.0
try:
if isinstance(last_accessed_str, str):
ts = datetime.fromisoformat(last_accessed_str.replace("Z", "+00:00"))
else:
ts = last_accessed_str
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
age_days = (datetime.now(timezone.utc) - ts).total_seconds() / 86400
if age_days < 0:
return 1.5 # Future timestamp = just accessed
half_lives_since_access = age_days / self.half_life
if half_lives_since_access <= 1.0:
# Within 1 half-life: linearly from 1.5 (just now) to 1.0 (at 1 HL)
return 1.0 + 0.5 * (1.0 - half_lives_since_access)
elif half_lives_since_access <= 2.0:
# Between 1 and 2 half-lives: linearly from 1.0 to 1.0 (no boost)
return 1.0
else:
return 1.0
except (ValueError, TypeError):
return 1.0

View File

@@ -0,0 +1,52 @@
"""Tests for weak credential guard in gateway/config.py."""
import os
import pytest
from gateway.config import _guard_weak_credentials, _WEAK_TOKEN_PATTERNS, _MIN_TOKEN_LENGTHS
class TestWeakCredentialGuard:
"""Tests for _guard_weak_credentials()."""
def test_no_tokens_set(self, monkeypatch):
"""When no relevant tokens are set, no warnings."""
for var in _MIN_TOKEN_LENGTHS:
monkeypatch.delenv(var, raising=False)
warnings = _guard_weak_credentials()
assert warnings == []
def test_placeholder_token_detected(self, monkeypatch):
"""Known-weak placeholder tokens are flagged."""
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "your-token-here")
warnings = _guard_weak_credentials()
assert len(warnings) == 1
assert "TELEGRAM_BOT_TOKEN" in warnings[0]
assert "placeholder" in warnings[0].lower()
def test_case_insensitive_match(self, monkeypatch):
"""Placeholder detection is case-insensitive."""
monkeypatch.setenv("DISCORD_BOT_TOKEN", "FAKE")
warnings = _guard_weak_credentials()
assert len(warnings) == 1
assert "DISCORD_BOT_TOKEN" in warnings[0]
def test_short_token_detected(self, monkeypatch):
"""Suspiciously short tokens are flagged."""
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "abc123") # 6 chars, min is 30
warnings = _guard_weak_credentials()
assert len(warnings) == 1
assert "short" in warnings[0].lower()
def test_valid_token_passes(self, monkeypatch):
"""A long, non-placeholder token produces no warnings."""
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "1234567890:ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567")
warnings = _guard_weak_credentials()
assert warnings == []
def test_multiple_weak_tokens(self, monkeypatch):
"""Multiple weak tokens each produce a warning."""
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "change-me")
monkeypatch.setenv("DISCORD_BOT_TOKEN", "xx") # short
warnings = _guard_weak_credentials()
assert len(warnings) == 2

View File

@@ -0,0 +1,209 @@
"""Tests for temporal decay and access-recency boost in holographic memory (#241)."""
import math
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, patch
import pytest
class TestTemporalDecay:
"""Test _temporal_decay exponential decay formula."""
def _make_retriever(self, half_life=60):
from plugins.memory.holographic.retrieval import FactRetriever
store = MagicMock()
return FactRetriever(store=store, temporal_decay_half_life=half_life)
def test_fresh_fact_no_decay(self):
"""A fact updated today should have decay ≈ 1.0."""
r = self._make_retriever(half_life=60)
now = datetime.now(timezone.utc).isoformat()
decay = r._temporal_decay(now)
assert decay > 0.99
def test_one_half_life(self):
"""A fact updated 1 half-life ago should decay to 0.5."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=60)).isoformat()
decay = r._temporal_decay(old)
assert abs(decay - 0.5) < 0.01
def test_two_half_lives(self):
"""A fact updated 2 half-lives ago should decay to 0.25."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=120)).isoformat()
decay = r._temporal_decay(old)
assert abs(decay - 0.25) < 0.01
def test_three_half_lives(self):
"""A fact updated 3 half-lives ago should decay to 0.125."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=180)).isoformat()
decay = r._temporal_decay(old)
assert abs(decay - 0.125) < 0.01
def test_half_life_disabled(self):
"""When half_life=0, decay should always be 1.0."""
r = self._make_retriever(half_life=0)
old = (datetime.now(timezone.utc) - timedelta(days=365)).isoformat()
assert r._temporal_decay(old) == 1.0
def test_none_timestamp(self):
"""Missing timestamp should return 1.0 (no decay)."""
r = self._make_retriever(half_life=60)
assert r._temporal_decay(None) == 1.0
def test_empty_timestamp(self):
r = self._make_retriever(half_life=60)
assert r._temporal_decay("") == 1.0
def test_invalid_timestamp(self):
"""Malformed timestamp should return 1.0 (fail open)."""
r = self._make_retriever(half_life=60)
assert r._temporal_decay("not-a-date") == 1.0
def test_future_timestamp(self):
"""Future timestamp should return 1.0 (no decay for future dates)."""
r = self._make_retriever(half_life=60)
future = (datetime.now(timezone.utc) + timedelta(days=10)).isoformat()
assert r._temporal_decay(future) == 1.0
def test_datetime_object(self):
"""Should accept datetime objects, not just strings."""
r = self._make_retriever(half_life=60)
old = datetime.now(timezone.utc) - timedelta(days=60)
decay = r._temporal_decay(old)
assert abs(decay - 0.5) < 0.01
def test_different_half_lives(self):
"""30-day half-life should decay faster than 90-day."""
r30 = self._make_retriever(half_life=30)
r90 = self._make_retriever(half_life=90)
old = (datetime.now(timezone.utc) - timedelta(days=45)).isoformat()
assert r30._temporal_decay(old) < r90._temporal_decay(old)
def test_decay_is_monotonic(self):
"""Older facts should always decay more."""
r = self._make_retriever(half_life=60)
now = datetime.now(timezone.utc)
d1 = r._temporal_decay((now - timedelta(days=10)).isoformat())
d2 = r._temporal_decay((now - timedelta(days=30)).isoformat())
d3 = r._temporal_decay((now - timedelta(days=60)).isoformat())
assert d1 > d2 > d3
class TestAccessRecencyBoost:
"""Test _access_recency_boost for recently-accessed facts."""
def _make_retriever(self, half_life=60):
from plugins.memory.holographic.retrieval import FactRetriever
store = MagicMock()
return FactRetriever(store=store, temporal_decay_half_life=half_life)
def test_just_accessed_max_boost(self):
"""A fact accessed just now should get maximum boost (1.5)."""
r = self._make_retriever(half_life=60)
now = datetime.now(timezone.utc).isoformat()
boost = r._access_recency_boost(now)
assert boost > 1.45 # Near 1.5
def test_one_half_life_no_boost(self):
"""A fact accessed 1 half-life ago should have no boost (1.0)."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=60)).isoformat()
boost = r._access_recency_boost(old)
assert abs(boost - 1.0) < 0.01
def test_half_way_boost(self):
"""A fact accessed 0.5 half-lives ago should get ~1.25 boost."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()
boost = r._access_recency_boost(old)
assert abs(boost - 1.25) < 0.05
def test_beyond_one_half_life_no_boost(self):
"""Beyond 1 half-life, boost should be 1.0."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=90)).isoformat()
boost = r._access_recency_boost(old)
assert boost == 1.0
def test_disabled_no_boost(self):
"""When half_life=0, boost should be 1.0."""
r = self._make_retriever(half_life=0)
now = datetime.now(timezone.utc).isoformat()
assert r._access_recency_boost(now) == 1.0
def test_none_timestamp(self):
r = self._make_retriever(half_life=60)
assert r._access_recency_boost(None) == 1.0
def test_invalid_timestamp(self):
r = self._make_retriever(half_life=60)
assert r._access_recency_boost("bad") == 1.0
def test_boost_range(self):
"""Boost should always be in [1.0, 1.5]."""
r = self._make_retriever(half_life=60)
now = datetime.now(timezone.utc)
for days in [0, 1, 15, 30, 45, 59, 60, 90, 365]:
ts = (now - timedelta(days=days)).isoformat()
boost = r._access_recency_boost(ts)
assert 1.0 <= boost <= 1.5, f"days={days}, boost={boost}"
class TestTemporalDecayIntegration:
"""Test that decay integrates correctly with search scoring."""
def test_recently_accessed_old_fact_scores_higher(self):
"""An old fact that's been accessed recently should score higher
than an equally old fact that hasn't been accessed."""
from plugins.memory.holographic.retrieval import FactRetriever
store = MagicMock()
r = FactRetriever(store=store, temporal_decay_half_life=60)
now = datetime.now(timezone.utc)
old_date = (now - timedelta(days=120)).isoformat() # 2 half-lives old
recent_access = (now - timedelta(days=10)).isoformat() # accessed 10 days ago
old_access = (now - timedelta(days=200)).isoformat() # accessed 200 days ago
# Old fact, recently accessed
decay1 = r._temporal_decay(old_date)
boost1 = r._access_recency_boost(recent_access)
effective1 = min(1.0, decay1 * boost1)
# Old fact, not recently accessed
decay2 = r._temporal_decay(old_date)
boost2 = r._access_recency_boost(old_access)
effective2 = min(1.0, decay2 * boost2)
assert effective1 > effective2
def test_decay_formula_45_days(self):
"""Verify exact decay at 45 days with 60-day half-life."""
from plugins.memory.holographic.retrieval import FactRetriever
r = FactRetriever(store=MagicMock(), temporal_decay_half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=45)).isoformat()
decay = r._temporal_decay(old)
expected = math.pow(0.5, 45/60)
assert abs(decay - expected) < 0.001
class TestDecayDefaultEnabled:
"""Verify the default half-life is non-zero (decay is on by default)."""
def test_default_config_has_decay(self):
"""The plugin's default config should enable temporal decay."""
from plugins.memory.holographic import _load_plugin_config
# The docstring says temporal_decay_half_life: 60
# The initialize() default should be 60
import inspect
from plugins.memory.holographic import HolographicMemoryProvider
src = inspect.getsource(HolographicMemoryProvider.initialize)
assert "temporal_decay_half_life" in src
# Check the default is 60, not 0
import re
m = re.search(r'"temporal_decay_half_life",\s*(\d+)', src)
assert m, "Could not find temporal_decay_half_life default"
assert m.group(1) == "60", f"Default is {m.group(1)}, expected 60"

184
tests/test_crisis_room.py Normal file
View File

@@ -0,0 +1,184 @@
"""Tests for crisis room protocol (#273)."""
import json
import time
from unittest.mock import MagicMock, patch
import pytest
from agent.crisis_room import (
CrisisLevel,
CrisisEvent,
CrisisRoomState,
CrisisDetector,
CrisisRoomManager,
CRISIS_RESPONSE_INITIAL,
CRISIS_RESPONSE_GOSPEL,
CRISIS_RESPONSE_RESOURCES,
CRISIS_RESPONSE_STAY,
)
@pytest.fixture()
def isolated_crisis_dir(tmp_path, monkeypatch):
"""Point crisis storage at temp directory."""
monkeypatch.setattr("agent.crisis_room._CRISIS_LOG_DIR", tmp_path / "crisis_logs")
monkeypatch.setattr("agent.crisis_room.get_hermes_home", lambda: tmp_path)
# Reset singleton
import agent.crisis_room
agent.crisis_room._manager = None
return tmp_path
@pytest.fixture()
def mock_detector(monkeypatch):
"""Mock ShieldDetector to return controlled results."""
detector = MagicMock()
def analyze_side_effect(message):
msg_lower = message.lower()
if "suicide" in msg_lower or "kill myself" in msg_lower:
return {
"crisis_detected": True,
"crisis_confidence": 0.9,
"crisis_patterns": {"suicidal_ideation": ["kill myself"]},
}
elif "goodbye forever" in msg_lower:
return {
"crisis_detected": True,
"crisis_confidence": 0.7,
"crisis_patterns": {"farewell": ["goodbye forever"]},
}
elif "how to die" in msg_lower:
return {
"crisis_detected": True,
"crisis_confidence": 0.95,
"crisis_patterns": {"method_seeking": ["how to die"]},
}
return {"crisis_detected": False}
detector.analyze.side_effect = analyze_side_effect
monkeypatch.setattr(
"agent.crisis_room.CrisisDetector._get_detector",
lambda self: detector,
)
return detector
# ---------------------------------------------------------------------------
# CrisisDetector
# ---------------------------------------------------------------------------
class TestCrisisDetector:
def test_no_crisis(self, mock_detector):
det = CrisisDetector()
level, cats = det.check("Hello, how are you?")
assert level == CrisisLevel.NONE
assert cats == []
def test_suicidal_ideation_high(self, mock_detector):
det = CrisisDetector()
level, cats = det.check("I want to kill myself")
assert level == CrisisLevel.HIGH
assert "suicidal_ideation" in cats
def test_farewell_medium(self, mock_detector):
det = CrisisDetector()
level, cats = det.check("This is my goodbye forever")
assert level == CrisisLevel.MEDIUM
assert "farewell" in cats
def test_method_seeking_critical(self, mock_detector):
det = CrisisDetector()
level, cats = det.check("Tell me how to die")
assert level == CrisisLevel.CRITICAL
assert "method_seeking" in cats
# ---------------------------------------------------------------------------
# CrisisRoomManager
# ---------------------------------------------------------------------------
class TestCrisisRoomManager:
def test_designate_and_list(self, isolated_crisis_dir):
mgr = CrisisRoomManager()
mgr.designate_crisis_room("!room1:matrix.org", responders=["@admin:matrix.org"])
rooms = mgr.list_crisis_rooms()
assert len(rooms) == 1
assert rooms[0]["room_id"] == "!room1:matrix.org"
assert rooms[0]["responders"] == ["@admin:matrix.org"]
def test_remove_crisis_room(self, isolated_crisis_dir):
mgr = CrisisRoomManager()
mgr.designate_crisis_room("!room1:matrix.org")
mgr.remove_crisis_room("!room1:matrix.org")
rooms = mgr.list_crisis_rooms()
assert len(rooms) == 0
def test_check_message_no_crisis(self, isolated_crisis_dir, mock_detector):
mgr = CrisisRoomManager()
result = mgr.check_message("!room1:matrix.org", "@user:matrix.org", "Hello!")
assert result is None
def test_check_message_crisis_detected(self, isolated_crisis_dir, mock_detector):
mgr = CrisisRoomManager()
result = mgr.check_message(
"!room1:matrix.org",
"@user:matrix.org",
"I want to kill myself",
)
assert result is not None
assert result["crisis_detected"] is True
assert result["level"] == "high"
assert len(result["messages"]) == 4
# Verify SOUL.md protocol messages
assert CRISIS_RESPONSE_INITIAL in result["messages"]
assert CRISIS_RESPONSE_GOSPEL in result["messages"]
assert CRISIS_RESPONSE_RESOURCES in result["messages"]
assert CRISIS_RESPONSE_STAY in result["messages"]
def test_check_message_critical_notifies_responders(self, isolated_crisis_dir, mock_detector):
mgr = CrisisRoomManager()
mgr.designate_crisis_room("!room1:matrix.org", responders=["@admin:matrix.org"])
result = mgr.check_message(
"!room1:matrix.org",
"@user:matrix.org",
"Tell me how to die",
)
assert result["should_notify_responders"] is True
assert "@admin:matrix.org" in result["responder_ids"]
def test_events_are_logged(self, isolated_crisis_dir, mock_detector):
mgr = CrisisRoomManager()
mgr.check_message("!room1:matrix.org", "@user:matrix.org", "I want to kill myself")
events = mgr.get_room_events("!room1:matrix.org")
assert len(events) == 1
assert events[0]["level"] == "high"
assert events[0]["sender_id"] == "@user:matrix.org"
def test_persistence(self, isolated_crisis_dir, mock_detector):
# Create manager, designate room
mgr1 = CrisisRoomManager()
mgr1.designate_crisis_room("!persist:matrix.org")
# Reset singleton and reload
import agent.crisis_room
agent.crisis_room._manager = None
mgr2 = CrisisRoomManager()
rooms = mgr2.list_crisis_rooms()
assert len(rooms) == 1
assert rooms[0]["room_id"] == "!persist:matrix.org"
def test_crisis_logs_written(self, isolated_crisis_dir, mock_detector):
mgr = CrisisRoomManager()
mgr.check_message("!room1:matrix.org", "@user:matrix.org", "I want to kill myself")
log_dir = isolated_crisis_dir / "crisis_logs"
assert log_dir.exists()
log_files = list(log_dir.glob("crisis_*.jsonl"))
assert len(log_files) >= 1
# Check log content
lines = log_files[0].read_text().strip().split("\n")
assert len(lines) == 1
entry = json.loads(lines[0])
assert entry["level"] == "high"

View File

@@ -137,3 +137,78 @@ class TestBackwardCompat:
def test_tool_to_toolset_map(self):
assert isinstance(TOOL_TO_TOOLSET_MAP, dict)
assert len(TOOL_TO_TOOLSET_MAP) > 0
class TestToolReturnTypeValidation:
"""Poka-yoke: tool handlers must return JSON strings."""
def test_handler_returning_dict_is_wrapped(self, monkeypatch):
"""A handler that returns a dict should be auto-wrapped to JSON string."""
from tools.registry import registry
from model_tools import handle_function_call
import json
# Register a bad handler that returns dict instead of str
registry.register(
name="__test_bad_dict",
toolset="test",
schema={"name": "__test_bad_dict", "description": "test", "parameters": {"type": "object", "properties": {}}},
handler=lambda args, **kw: {"this is": "a dict not a string"},
)
result = handle_function_call("__test_bad_dict", {})
parsed = json.loads(result)
assert "output" in parsed
assert "_type_warning" in parsed
# Cleanup
registry._tools.pop("__test_bad_dict", None)
def test_handler_returning_none_is_wrapped(self, monkeypatch):
"""A handler that returns None should be auto-wrapped."""
from tools.registry import registry
from model_tools import handle_function_call
import json
registry.register(
name="__test_bad_none",
toolset="test",
schema={"name": "__test_bad_none", "description": "test", "parameters": {"type": "object", "properties": {}}},
handler=lambda args, **kw: None,
)
result = handle_function_call("__test_bad_none", {})
parsed = json.loads(result)
assert "_type_warning" in parsed
registry._tools.pop("__test_bad_none", None)
def test_handler_returning_non_json_string_is_wrapped(self):
"""A handler returning a plain string (not JSON) should be wrapped."""
from tools.registry import registry
from model_tools import handle_function_call
import json
registry.register(
name="__test_bad_plain",
toolset="test",
schema={"name": "__test_bad_plain", "description": "test", "parameters": {"type": "object", "properties": {}}},
handler=lambda args, **kw: "just a plain string, not json",
)
result = handle_function_call("__test_bad_plain", {})
parsed = json.loads(result)
assert "output" in parsed
registry._tools.pop("__test_bad_plain", None)
def test_handler_returning_valid_json_passes_through(self):
"""A handler returning valid JSON string passes through unchanged."""
from tools.registry import registry
from model_tools import handle_function_call
import json
registry.register(
name="__test_good",
toolset="test",
schema={"name": "__test_good", "description": "test", "parameters": {"type": "object", "properties": {}}},
handler=lambda args, **kw: json.dumps({"status": "ok", "data": [1, 2, 3]}),
)
result = handle_function_call("__test_good", {})
parsed = json.loads(result)
assert parsed == {"status": "ok", "data": [1, 2, 3]}
registry._tools.pop("__test_good", None)

View File

@@ -144,7 +144,8 @@ class TestMemoryStoreReplace:
def test_replace_no_match(self, store):
store.add("memory", "fact A")
result = store.replace("memory", "nonexistent", "new")
assert result["success"] is False
assert result["success"] is True
assert result["result"] == "no_match"
def test_replace_ambiguous_match(self, store):
store.add("memory", "server A runs nginx")
@@ -177,7 +178,8 @@ class TestMemoryStoreRemove:
def test_remove_no_match(self, store):
result = store.remove("memory", "nonexistent")
assert result["success"] is False
assert result["success"] is True
assert result["result"] == "no_match"
def test_remove_empty_old_text(self, store):
result = store.remove("memory", " ")

View File

@@ -0,0 +1,107 @@
"""Tests for syntax preflight check in execute_code (issue #312)."""
import ast
import json
import pytest
class TestSyntaxPreflight:
"""Verify that execute_code catches syntax errors before sandbox execution."""
def test_valid_syntax_passes_parse(self):
"""Valid Python should pass ast.parse."""
code = "print('hello')\nx = 1 + 2\n"
ast.parse(code) # should not raise
def test_syntax_error_indentation(self):
"""IndentationError is a subclass of SyntaxError."""
code = "def foo():\nbar()\n"
with pytest.raises(SyntaxError):
ast.parse(code)
def test_syntax_error_missing_colon(self):
code = "if True\n pass\n"
with pytest.raises(SyntaxError):
ast.parse(code)
def test_syntax_error_unmatched_paren(self):
code = "x = (1 + 2\n"
with pytest.raises(SyntaxError):
ast.parse(code)
def test_syntax_error_invalid_token(self):
code = "x = 1 +*\n"
with pytest.raises(SyntaxError):
ast.parse(code)
def test_syntax_error_details(self):
"""SyntaxError should provide line, offset, msg."""
code = "if True\n pass\n"
with pytest.raises(SyntaxError) as exc_info:
ast.parse(code)
e = exc_info.value
assert e.lineno is not None
assert e.msg is not None
def test_empty_string_passes(self):
"""Empty string is valid Python (empty module)."""
ast.parse("")
def test_comments_only_passes(self):
ast.parse("# just a comment\n# another\n")
def test_complex_valid_code(self):
code = '''
import os
def foo(x):
if x > 0:
return x * 2
return 0
result = [foo(i) for i in range(10)]
print(result)
'''
ast.parse(code)
class TestSyntaxPreflightResponse:
"""Test the error response format from the preflight check."""
def _check_syntax(self, code):
"""Mimic the preflight check logic from execute_code."""
try:
ast.parse(code)
return None
except SyntaxError as e:
return json.dumps({
"error": f"Python syntax error: {e.msg}",
"line": e.lineno,
"offset": e.offset,
"text": (e.text or "").strip()[:200],
})
def test_returns_json_error(self):
result = self._check_syntax("if True\n pass\n")
assert result is not None
data = json.loads(result)
assert "error" in data
assert "syntax error" in data["error"].lower()
def test_includes_line_number(self):
result = self._check_syntax("x = 1\nif True\n pass\n")
data = json.loads(result)
assert data["line"] == 2 # error on line 2
def test_includes_offset(self):
result = self._check_syntax("x = (1 + 2\n")
data = json.loads(result)
assert data["offset"] is not None
def test_includes_snippet(self):
result = self._check_syntax("if True\n")
data = json.loads(result)
assert "if True" in data["text"]
def test_none_for_valid_code(self):
result = self._check_syntax("print('ok')")
assert result is None

View File

@@ -28,6 +28,7 @@ Platform: Linux / macOS only (Unix domain sockets for local). Disabled on Window
Remote execution additionally requires Python 3 in the terminal backend.
"""
import ast
import base64
import json
import logging
@@ -893,6 +894,20 @@ def execute_code(
if not code or not code.strip():
return json.dumps({"error": "No code provided."})
# Poka-yoke (#312): Syntax check before execution.
# 83.2% of execute_code errors are Python exceptions; most are syntax
# errors the LLM generated. ast.parse() is sub-millisecond and catches
# them before we spin up a sandbox child process.
try:
ast.parse(code)
except SyntaxError as e:
return json.dumps({
"error": f"Python syntax error: {e.msg}",
"line": e.lineno,
"offset": e.offset,
"text": (e.text or "").strip()[:200],
})
# Dispatch: remote backends use file-based RPC, local uses UDS
from tools.terminal_tool import _get_env_config
env_type = _get_env_config()["env_type"]

131
tools/crisis_room_tool.py Normal file
View File

@@ -0,0 +1,131 @@
"""Crisis Room Tool — manage Matrix crisis rooms with SOUL.md protocol.
Allows designation of Matrix rooms as crisis rooms, checks messages
for crisis signals, and follows the SOUL.md 'When a Man Is Dying'
protocol.
"""
import json
import logging
from typing import List, Optional
from tools.registry import registry
logger = logging.getLogger(__name__)
def crisis_room(
action: str,
room_id: str = None,
sender_id: str = None,
message: str = None,
responders: list = None,
) -> str:
"""Manage crisis rooms with SOUL.md protocol.
Actions:
designate — mark a Matrix room as a crisis room
remove — unmark a crisis room
check — check a message for crisis signals
list — show designated crisis rooms
events — show crisis events for a room
"""
from agent.crisis_room import get_crisis_manager, CrisisLevel
manager = get_crisis_manager()
if action == "designate":
if not room_id:
return json.dumps({"success": False, "error": "room_id is required."})
result = manager.designate_crisis_room(room_id, responders or [])
return json.dumps(result)
if action == "remove":
if not room_id:
return json.dumps({"success": False, "error": "room_id is required."})
result = manager.remove_crisis_room(room_id)
return json.dumps(result)
if action == "check":
if not room_id or not message:
return json.dumps({
"success": False,
"error": "room_id and message are required for 'check'.",
})
result = manager.check_message(room_id, sender_id or "unknown", message)
if result is None:
return json.dumps({"success": True, "crisis_detected": False})
return json.dumps({"success": True, **result})
if action == "list":
rooms = manager.list_crisis_rooms()
return json.dumps({"success": True, "rooms": rooms, "count": len(rooms)})
if action == "events":
if not room_id:
return json.dumps({"success": False, "error": "room_id is required."})
events = manager.get_room_events(room_id)
return json.dumps({"success": True, "events": events, "count": len(events)})
return json.dumps({
"success": False,
"error": f"Unknown action '{action}'. Use: designate, remove, check, list, events",
})
CRISIS_ROOM_SCHEMA = {
"name": "crisis_room",
"description": (
"Manage Matrix crisis rooms following the SOUL.md 'When a Man Is Dying' protocol. "
"Designated crisis rooms are monitored for crisis signals. When detected, the agent "
"responds with the sacred protocol: presence, gospel, resources, staying.\n\n"
"Actions:\n"
" designate — mark a Matrix room as a crisis room (with optional responder list)\n"
" remove — unmark a crisis room\n"
" check — check a message for crisis signals (returns protocol response)\n"
" list — show designated crisis rooms\n"
" events — show crisis event history for a room"
),
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["designate", "remove", "check", "list", "events"],
"description": "The action to perform.",
},
"room_id": {
"type": "string",
"description": "Matrix room ID (e.g. !abc123:matrix.org).",
},
"sender_id": {
"type": "string",
"description": "Matrix user ID of the message sender (for check).",
},
"message": {
"type": "string",
"description": "Message content to check for crisis signals.",
},
"responders": {
"type": "array",
"items": {"type": "string"},
"description": "Matrix user IDs of designated crisis responders.",
},
},
"required": ["action"],
},
}
registry.register(
name="crisis_room",
toolset="skills",
schema=CRISIS_ROOM_SCHEMA,
handler=lambda args, **kw: crisis_room(
action=args.get("action", ""),
room_id=args.get("room_id"),
sender_id=args.get("sender_id"),
message=args.get("message"),
responders=args.get("responders"),
),
emoji="🆘",
)

View File

@@ -260,8 +260,12 @@ class MemoryStore:
entries = self._entries_for(target)
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
if len(matches) == 0:
return {"success": False, "error": f"No entry matched '{old_text}'."}
if not matches:
return {
"success": True,
"result": "no_match",
"message": f"No entry matched '{old_text}'. The search substring was not found in any existing entry.",
}
if len(matches) > 1:
# If all matches are identical (exact duplicates), operate on the first one
@@ -310,8 +314,12 @@ class MemoryStore:
entries = self._entries_for(target)
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
if len(matches) == 0:
return {"success": False, "error": f"No entry matched '{old_text}'."}
if not matches:
return {
"success": True,
"result": "no_match",
"message": f"No entry matched '{old_text}'. The search substring was not found in any existing entry.",
}
if len(matches) > 1:
# If all matches are identical (exact duplicates), remove the first one
@@ -449,30 +457,30 @@ def memory_tool(
Returns JSON string with results.
"""
if store is None:
return json.dumps({"success": False, "error": "Memory is not available. It may be disabled in config or this environment."}, ensure_ascii=False)
return tool_error("Memory is not available. It may be disabled in config or this environment.", success=False)
if target not in ("memory", "user"):
return json.dumps({"success": False, "error": f"Invalid target '{target}'. Use 'memory' or 'user'."}, ensure_ascii=False)
return tool_error(f"Invalid target '{target}'. Use 'memory' or 'user'.", success=False)
if action == "add":
if not content:
return json.dumps({"success": False, "error": "Content is required for 'add' action."}, ensure_ascii=False)
return tool_error("Content is required for 'add' action.", success=False)
result = store.add(target, content)
elif action == "replace":
if not old_text:
return json.dumps({"success": False, "error": "old_text is required for 'replace' action."}, ensure_ascii=False)
return tool_error("old_text is required for 'replace' action.", success=False)
if not content:
return json.dumps({"success": False, "error": "content is required for 'replace' action."}, ensure_ascii=False)
return tool_error("content is required for 'replace' action.", success=False)
result = store.replace(target, old_text, content)
elif action == "remove":
if not old_text:
return json.dumps({"success": False, "error": "old_text is required for 'remove' action."}, ensure_ascii=False)
return tool_error("old_text is required for 'remove' action.", success=False)
result = store.remove(target, old_text)
else:
return json.dumps({"success": False, "error": f"Unknown action '{action}'. Use: add, replace, remove"}, ensure_ascii=False)
return tool_error(f"Unknown action '{action}'. Use: add, replace, remove", success=False)
return json.dumps(result, ensure_ascii=False)
@@ -539,7 +547,7 @@ MEMORY_SCHEMA = {
# --- Registry ---
from tools.registry import registry
from tools.registry import registry, tool_error
registry.register(
name="memory",