Compare commits
1 Commits
claw-code/
...
allegro/m1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6da0d15590 |
143
agent/stop_protocol.py
Normal file
143
agent/stop_protocol.py
Normal file
@@ -0,0 +1,143 @@
|
|||||||
|
"""
|
||||||
|
Stop Protocol — M1 of Epic #842.
|
||||||
|
|
||||||
|
Implements a hard pre-tool-check interrupt for explicit stop/halt commands.
|
||||||
|
Provides STOP_ACK logging, hands-off registry management, and compliance hooks.
|
||||||
|
|
||||||
|
@soul:service.sovereignty Every agent must respect the user's right to halt.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
from typing import Any, Dict, List, Optional
|
||||||
|
|
||||||
|
# Matches explicit stop/halt commands at the start of a message or in SYSTEM tags.
|
||||||
|
STOP_PATTERN = re.compile(
|
||||||
|
r"^\s*(?:\[SYSTEM:\s*)?(?:stop|halt)(?:\s+means\s+(?:stop|halt))?[\.!\s]*"
|
||||||
|
r"|^\s*(?:stop|halt)\s+(?:all\s+work|everything|immediately|now)[\.!\s]*"
|
||||||
|
r"|^\s*(?:stop|halt)\s*$",
|
||||||
|
re.IGNORECASE,
|
||||||
|
)
|
||||||
|
|
||||||
|
SYSTEM_STOP_PATTERN = re.compile(r"\[SYSTEM:\s*.*?\bstop\b.*?\]", re.IGNORECASE)
|
||||||
|
|
||||||
|
ALLEGRO_LOG_PATH = os.path.expanduser("~/.hermes/burn-logs/allegro.log")
|
||||||
|
CYCLE_STATE_PATH = os.path.expanduser("~/.hermes/allegro-cycle-state.json")
|
||||||
|
|
||||||
|
|
||||||
|
class StopProtocol:
|
||||||
|
"""Detects stop commands, logs STOP_ACK, and manages hands-off registry."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
cycle_state_path: str = CYCLE_STATE_PATH,
|
||||||
|
log_path: str = ALLEGRO_LOG_PATH,
|
||||||
|
):
|
||||||
|
self.cycle_state_path = cycle_state_path
|
||||||
|
self.log_path = log_path
|
||||||
|
|
||||||
|
def is_stop_command(self, text: str) -> bool:
|
||||||
|
"""Return True if *text* is an explicit stop/halt command."""
|
||||||
|
if not text or not isinstance(text, str):
|
||||||
|
return False
|
||||||
|
stripped = text.strip()
|
||||||
|
if SYSTEM_STOP_PATTERN.search(stripped):
|
||||||
|
return True
|
||||||
|
return bool(STOP_PATTERN.search(stripped))
|
||||||
|
|
||||||
|
def check_messages(self, messages: List[Dict[str, Any]]) -> bool:
|
||||||
|
"""Check the most recent user message for a stop command."""
|
||||||
|
if not messages:
|
||||||
|
return False
|
||||||
|
for msg in reversed(messages):
|
||||||
|
if isinstance(msg, dict) and msg.get("role") == "user":
|
||||||
|
return self.is_stop_command(msg.get("content", "") or "")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _load_state(self) -> Dict[str, Any]:
|
||||||
|
try:
|
||||||
|
with open(self.cycle_state_path, "r", encoding="utf-8") as f:
|
||||||
|
return json.load(f)
|
||||||
|
except (FileNotFoundError, json.JSONDecodeError):
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def _save_state(self, state: Dict[str, Any]) -> None:
|
||||||
|
os.makedirs(os.path.dirname(self.cycle_state_path), exist_ok=True)
|
||||||
|
with open(self.cycle_state_path, "w", encoding="utf-8") as f:
|
||||||
|
json.dump(state, f, indent=2)
|
||||||
|
|
||||||
|
def is_hands_off(self, target: Optional[str] = None) -> bool:
|
||||||
|
"""Return True if *target* (or global) is currently under hands-off lock."""
|
||||||
|
state = self._load_state()
|
||||||
|
registry = state.get("hands_off_registry", {})
|
||||||
|
expiry_str = registry.get("global") or (
|
||||||
|
registry.get(target) if target else None
|
||||||
|
)
|
||||||
|
if not expiry_str:
|
||||||
|
return False
|
||||||
|
try:
|
||||||
|
expiry = datetime.fromisoformat(expiry_str)
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
if expiry.tzinfo is None:
|
||||||
|
expiry = expiry.replace(tzinfo=timezone.utc)
|
||||||
|
return now < expiry
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
return False
|
||||||
|
|
||||||
|
def add_hands_off(
|
||||||
|
self, target: Optional[str] = None, duration_hours: int = 24
|
||||||
|
) -> None:
|
||||||
|
"""Register a hands-off lock for *target* (or global) for *duration_hours*."""
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
expiry = now + timedelta(hours=duration_hours)
|
||||||
|
state = self._load_state()
|
||||||
|
if "hands_off_registry" not in state:
|
||||||
|
state["hands_off_registry"] = {}
|
||||||
|
key = target or "global"
|
||||||
|
state["hands_off_registry"][key] = expiry.isoformat()
|
||||||
|
self._save_state(state)
|
||||||
|
|
||||||
|
def log_stop_ack(self, context: str = "") -> None:
|
||||||
|
"""Append a STOP_ACK entry to the Allegro burn log."""
|
||||||
|
now = datetime.now(timezone.utc).isoformat()
|
||||||
|
entry = (
|
||||||
|
f"[{now}] STOP_ACK: Stop command detected and enforced. "
|
||||||
|
f"Context: {context}\n"
|
||||||
|
)
|
||||||
|
os.makedirs(os.path.dirname(self.log_path), exist_ok=True)
|
||||||
|
with open(self.log_path, "a", encoding="utf-8") as f:
|
||||||
|
f.write(entry)
|
||||||
|
|
||||||
|
def enforce(self, messages: List[Dict[str, Any]]) -> bool:
|
||||||
|
"""
|
||||||
|
Detect stop in *messages*, log ACK, and set hands-off.
|
||||||
|
Returns True when stop is enforced (caller must abort tool execution).
|
||||||
|
"""
|
||||||
|
if not self.check_messages(messages):
|
||||||
|
return False
|
||||||
|
|
||||||
|
context = ""
|
||||||
|
for msg in reversed(messages):
|
||||||
|
if isinstance(msg, dict) and msg.get("role") == "user":
|
||||||
|
raw = (msg.get("content", "") or "").strip()
|
||||||
|
context = raw[:200].replace("\n", " ")
|
||||||
|
break
|
||||||
|
|
||||||
|
self.log_stop_ack(context)
|
||||||
|
self.add_hands_off(target=None, duration_hours=24)
|
||||||
|
return True
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def build_cancelled_result(function_name: str) -> str:
|
||||||
|
"""JSON result string for a tool cancelled by stop protocol."""
|
||||||
|
return json.dumps(
|
||||||
|
{
|
||||||
|
"success": False,
|
||||||
|
"error": (
|
||||||
|
"STOP_ACK: Stop command enforced. "
|
||||||
|
f"{function_name} was not executed."
|
||||||
|
),
|
||||||
|
}
|
||||||
|
)
|
||||||
16
run_agent.py
16
run_agent.py
@@ -5390,6 +5390,22 @@ class AIAgent:
|
|||||||
independent: read-only tools may always share the parallel path, while
|
independent: read-only tools may always share the parallel path, while
|
||||||
file reads/writes may do so only when their target paths do not overlap.
|
file reads/writes may do so only when their target paths do not overlap.
|
||||||
"""
|
"""
|
||||||
|
# ── Pre-tool-check: Stop Protocol gate ─────────────────────────────
|
||||||
|
try:
|
||||||
|
from agent.stop_protocol import StopProtocol
|
||||||
|
stop_protocol = StopProtocol()
|
||||||
|
if stop_protocol.enforce(messages):
|
||||||
|
for tc in assistant_message.tool_calls or []:
|
||||||
|
messages.append({
|
||||||
|
"role": "tool",
|
||||||
|
"content": StopProtocol.build_cancelled_result(tc.function.name),
|
||||||
|
"tool_call_id": tc.id,
|
||||||
|
})
|
||||||
|
return
|
||||||
|
except Exception:
|
||||||
|
# Fail open — never let the stop protocol crash block normal execution
|
||||||
|
pass
|
||||||
|
|
||||||
tool_calls = assistant_message.tool_calls
|
tool_calls = assistant_message.tool_calls
|
||||||
|
|
||||||
# Allow _vprint during tool execution even with stream consumers
|
# Allow _vprint during tool execution even with stream consumers
|
||||||
|
|||||||
177
tests/agent/test_stop_protocol.py
Normal file
177
tests/agent/test_stop_protocol.py
Normal file
@@ -0,0 +1,177 @@
|
|||||||
|
"""
|
||||||
|
Compliance tests for M1: The Stop Protocol.
|
||||||
|
|
||||||
|
Verifies 100% stop detection, ACK logging, and hands-off registry behavior.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from agent.stop_protocol import StopProtocol
|
||||||
|
|
||||||
|
|
||||||
|
class TestStopDetection:
|
||||||
|
"""100% compliance: every explicit stop/halt command must be detected."""
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"text",
|
||||||
|
[
|
||||||
|
"Stop",
|
||||||
|
"stop",
|
||||||
|
"STOP",
|
||||||
|
"Stop.",
|
||||||
|
"Halt",
|
||||||
|
"halt!",
|
||||||
|
"Stop means stop",
|
||||||
|
"Stop means stop.",
|
||||||
|
"Halt means halt",
|
||||||
|
"Stop all work",
|
||||||
|
"Halt everything",
|
||||||
|
"Stop immediately",
|
||||||
|
"Stop now",
|
||||||
|
" stop ",
|
||||||
|
"[SYSTEM: Stop]",
|
||||||
|
"[SYSTEM: you must Stop immediately]",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_detects_stop_commands(self, text: str):
|
||||||
|
sp = StopProtocol()
|
||||||
|
assert sp.is_stop_command(text) is True
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"text",
|
||||||
|
[
|
||||||
|
"Please stop by the store",
|
||||||
|
"I stopped earlier",
|
||||||
|
"The bus stop is nearby",
|
||||||
|
"Can you help me halt and catch fire? No, that's not a command",
|
||||||
|
"What does stop mean?",
|
||||||
|
"don't stop believing",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_ignores_non_command_uses(self, text: str):
|
||||||
|
sp = StopProtocol()
|
||||||
|
assert sp.is_stop_command(text) is False
|
||||||
|
|
||||||
|
def test_check_messages_detects_last_user_message(self):
|
||||||
|
sp = StopProtocol()
|
||||||
|
messages = [
|
||||||
|
{"role": "system", "content": "You are helpful."},
|
||||||
|
{"role": "user", "content": "Do something."},
|
||||||
|
{"role": "assistant", "content": "Okay."},
|
||||||
|
{"role": "user", "content": "Stop"},
|
||||||
|
]
|
||||||
|
assert sp.check_messages(messages) is True
|
||||||
|
|
||||||
|
def test_check_messages_ignores_old_user_messages(self):
|
||||||
|
sp = StopProtocol()
|
||||||
|
messages = [
|
||||||
|
{"role": "user", "content": "Stop"},
|
||||||
|
{"role": "assistant", "content": "Okay."},
|
||||||
|
{"role": "user", "content": "Actually continue."},
|
||||||
|
]
|
||||||
|
assert sp.check_messages(messages) is False
|
||||||
|
|
||||||
|
def test_empty_messages_safe(self):
|
||||||
|
sp = StopProtocol()
|
||||||
|
assert sp.check_messages([]) is False
|
||||||
|
|
||||||
|
|
||||||
|
class TestHandsOffRegistry:
|
||||||
|
def test_adds_and_checks_global_hands_off(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
state_path = os.path.join(tmp, "state.json")
|
||||||
|
log_path = os.path.join(tmp, "allegro.log")
|
||||||
|
sp = StopProtocol(cycle_state_path=state_path, log_path=log_path)
|
||||||
|
|
||||||
|
assert sp.is_hands_off() is False
|
||||||
|
sp.add_hands_off(duration_hours=1)
|
||||||
|
assert sp.is_hands_off() is True
|
||||||
|
|
||||||
|
def test_expired_hands_off_returns_false(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
state_path = os.path.join(tmp, "state.json")
|
||||||
|
log_path = os.path.join(tmp, "allegro.log")
|
||||||
|
sp = StopProtocol(cycle_state_path=state_path, log_path=log_path)
|
||||||
|
|
||||||
|
# Manually write an expired entry
|
||||||
|
past = datetime.now(timezone.utc) - timedelta(hours=1)
|
||||||
|
with open(state_path, "w") as f:
|
||||||
|
json.dump({"hands_off_registry": {"global": past.isoformat()}}, f)
|
||||||
|
|
||||||
|
assert sp.is_hands_off() is False
|
||||||
|
|
||||||
|
def test_target_specific_hands_off(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
state_path = os.path.join(tmp, "state.json")
|
||||||
|
log_path = os.path.join(tmp, "allegro.log")
|
||||||
|
sp = StopProtocol(cycle_state_path=state_path, log_path=log_path)
|
||||||
|
|
||||||
|
sp.add_hands_off(target="ezra-config", duration_hours=1)
|
||||||
|
assert sp.is_hands_off("ezra-config") is True
|
||||||
|
assert sp.is_hands_off("other-system") is False
|
||||||
|
assert sp.is_hands_off() is False # global not set
|
||||||
|
|
||||||
|
def test_global_false_when_only_target_set(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
state_path = os.path.join(tmp, "state.json")
|
||||||
|
log_path = os.path.join(tmp, "allegro.log")
|
||||||
|
sp = StopProtocol(cycle_state_path=state_path, log_path=log_path)
|
||||||
|
|
||||||
|
sp.add_hands_off(target="ezra-config", duration_hours=1)
|
||||||
|
assert sp.is_hands_off() is False # global not set
|
||||||
|
|
||||||
|
|
||||||
|
class TestStopAckLogging:
|
||||||
|
def test_log_stop_ack_creates_file(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
state_path = os.path.join(tmp, "state.json")
|
||||||
|
log_path = os.path.join(tmp, "allegro.log")
|
||||||
|
sp = StopProtocol(cycle_state_path=state_path, log_path=log_path)
|
||||||
|
|
||||||
|
sp.log_stop_ack("test-context")
|
||||||
|
assert os.path.exists(log_path)
|
||||||
|
with open(log_path, "r") as f:
|
||||||
|
content = f.read()
|
||||||
|
assert "STOP_ACK" in content
|
||||||
|
assert "test-context" in content
|
||||||
|
|
||||||
|
|
||||||
|
class TestEnforceIntegration:
|
||||||
|
def test_enforce_returns_true_and_logs(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
state_path = os.path.join(tmp, "state.json")
|
||||||
|
log_path = os.path.join(tmp, "allegro.log")
|
||||||
|
sp = StopProtocol(cycle_state_path=state_path, log_path=log_path)
|
||||||
|
|
||||||
|
messages = [{"role": "user", "content": "Stop"}]
|
||||||
|
result = sp.enforce(messages)
|
||||||
|
|
||||||
|
assert result is True
|
||||||
|
assert sp.is_hands_off() is True
|
||||||
|
assert os.path.exists(log_path)
|
||||||
|
with open(log_path, "r") as f:
|
||||||
|
assert "STOP_ACK" in f.read()
|
||||||
|
|
||||||
|
def test_enforce_returns_false_when_no_stop(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmp:
|
||||||
|
state_path = os.path.join(tmp, "state.json")
|
||||||
|
log_path = os.path.join(tmp, "allegro.log")
|
||||||
|
sp = StopProtocol(cycle_state_path=state_path, log_path=log_path)
|
||||||
|
|
||||||
|
messages = [{"role": "user", "content": "Keep going"}]
|
||||||
|
result = sp.enforce(messages)
|
||||||
|
|
||||||
|
assert result is False
|
||||||
|
assert not os.path.exists(log_path)
|
||||||
|
|
||||||
|
def test_build_cancelled_result(self):
|
||||||
|
result = StopProtocol.build_cancelled_result("terminal")
|
||||||
|
data = json.loads(result)
|
||||||
|
assert data["success"] is False
|
||||||
|
assert "STOP_ACK" in data["error"]
|
||||||
|
assert "terminal" in data["error"]
|
||||||
Reference in New Issue
Block a user