Files
hermes-agent/agent/stop_protocol.py
Allegro 6da0d15590 M1: Implement Stop Protocol (Epic #842)
- Add agent/stop_protocol.py with hard pre-tool-check gate
- Detect explicit stop/halt commands in last user message
- STOP_ACK logging to ~/.hermes/burn-logs/allegro.log
- Hands-off registry with 24-hour expiry in allegro-cycle-state.json
- Integrate gate into _execute_tool_calls in run_agent.py (fail-open)
- Add 33 compliance tests in tests/agent/test_stop_protocol.py
2026-04-06 16:21:16 +00:00

144 lines
5.2 KiB
Python

"""
Stop Protocol — M1 of Epic #842.
Implements a hard pre-tool-check interrupt for explicit stop/halt commands.
Provides STOP_ACK logging, hands-off registry management, and compliance hooks.
@soul:service.sovereignty Every agent must respect the user's right to halt.
"""
import json
import os
import re
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional
# Matches explicit stop/halt commands at the start of a message or in SYSTEM tags.
STOP_PATTERN = re.compile(
r"^\s*(?:\[SYSTEM:\s*)?(?:stop|halt)(?:\s+means\s+(?:stop|halt))?[\.!\s]*"
r"|^\s*(?:stop|halt)\s+(?:all\s+work|everything|immediately|now)[\.!\s]*"
r"|^\s*(?:stop|halt)\s*$",
re.IGNORECASE,
)
SYSTEM_STOP_PATTERN = re.compile(r"\[SYSTEM:\s*.*?\bstop\b.*?\]", re.IGNORECASE)
ALLEGRO_LOG_PATH = os.path.expanduser("~/.hermes/burn-logs/allegro.log")
CYCLE_STATE_PATH = os.path.expanduser("~/.hermes/allegro-cycle-state.json")
class StopProtocol:
"""Detects stop commands, logs STOP_ACK, and manages hands-off registry."""
def __init__(
self,
cycle_state_path: str = CYCLE_STATE_PATH,
log_path: str = ALLEGRO_LOG_PATH,
):
self.cycle_state_path = cycle_state_path
self.log_path = log_path
def is_stop_command(self, text: str) -> bool:
"""Return True if *text* is an explicit stop/halt command."""
if not text or not isinstance(text, str):
return False
stripped = text.strip()
if SYSTEM_STOP_PATTERN.search(stripped):
return True
return bool(STOP_PATTERN.search(stripped))
def check_messages(self, messages: List[Dict[str, Any]]) -> bool:
"""Check the most recent user message for a stop command."""
if not messages:
return False
for msg in reversed(messages):
if isinstance(msg, dict) and msg.get("role") == "user":
return self.is_stop_command(msg.get("content", "") or "")
return False
def _load_state(self) -> Dict[str, Any]:
try:
with open(self.cycle_state_path, "r", encoding="utf-8") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def _save_state(self, state: Dict[str, Any]) -> None:
os.makedirs(os.path.dirname(self.cycle_state_path), exist_ok=True)
with open(self.cycle_state_path, "w", encoding="utf-8") as f:
json.dump(state, f, indent=2)
def is_hands_off(self, target: Optional[str] = None) -> bool:
"""Return True if *target* (or global) is currently under hands-off lock."""
state = self._load_state()
registry = state.get("hands_off_registry", {})
expiry_str = registry.get("global") or (
registry.get(target) if target else None
)
if not expiry_str:
return False
try:
expiry = datetime.fromisoformat(expiry_str)
now = datetime.now(timezone.utc)
if expiry.tzinfo is None:
expiry = expiry.replace(tzinfo=timezone.utc)
return now < expiry
except (ValueError, TypeError):
return False
def add_hands_off(
self, target: Optional[str] = None, duration_hours: int = 24
) -> None:
"""Register a hands-off lock for *target* (or global) for *duration_hours*."""
now = datetime.now(timezone.utc)
expiry = now + timedelta(hours=duration_hours)
state = self._load_state()
if "hands_off_registry" not in state:
state["hands_off_registry"] = {}
key = target or "global"
state["hands_off_registry"][key] = expiry.isoformat()
self._save_state(state)
def log_stop_ack(self, context: str = "") -> None:
"""Append a STOP_ACK entry to the Allegro burn log."""
now = datetime.now(timezone.utc).isoformat()
entry = (
f"[{now}] STOP_ACK: Stop command detected and enforced. "
f"Context: {context}\n"
)
os.makedirs(os.path.dirname(self.log_path), exist_ok=True)
with open(self.log_path, "a", encoding="utf-8") as f:
f.write(entry)
def enforce(self, messages: List[Dict[str, Any]]) -> bool:
"""
Detect stop in *messages*, log ACK, and set hands-off.
Returns True when stop is enforced (caller must abort tool execution).
"""
if not self.check_messages(messages):
return False
context = ""
for msg in reversed(messages):
if isinstance(msg, dict) and msg.get("role") == "user":
raw = (msg.get("content", "") or "").strip()
context = raw[:200].replace("\n", " ")
break
self.log_stop_ack(context)
self.add_hands_off(target=None, duration_hours=24)
return True
@staticmethod
def build_cancelled_result(function_name: str) -> str:
"""JSON result string for a tool cancelled by stop protocol."""
return json.dumps(
{
"success": False,
"error": (
"STOP_ACK: Stop command enforced. "
f"{function_name} was not executed."
),
}
)