Compare commits

...

2 Commits

Author SHA1 Message Date
Allegro
7b93d34374 feat(allegro): migrate to Claude Sonnet 4 with fallback providers
- Switch from kimi-for-coding to claude-sonnet-4-20250514
- Add proper fallback chain (Anthropic -> OpenRouter)
- Increase max_turns 30->40, tune reasoning effort xhigh->medium
- Simplify config by removing unused browser/display/security sections
- Set explicit cwd path /root/wizards/allegro
- Clean memory provider config with local backend
2026-04-08 19:52:05 +00:00
Allegro
2b5d3c057d feat(allegro): complete M2 Commit-or-Abort with stop_guard, env fix, 26 tests (#845)
- Adds allegro/stop_guard.py (M1 reusable stop protocol)
- Fixes dynamic ALLEGRO_CYCLE_STATE env-var resolution in cycle_guard.py
- Expands test_cycle_guard.py to 26 pytest cases covering lifecycle,
  slices, crash recovery, CLI commands, and edge cases
- Adds allegro/__init__.py package marker

Closes #845
2026-04-06 17:08:03 +00:00
5 changed files with 441 additions and 170 deletions

1
allegro/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Allegro self-improvement guard modules for Epic #842."""

View File

@@ -14,7 +14,10 @@ from datetime import datetime, timezone, timedelta
from pathlib import Path from pathlib import Path
DEFAULT_STATE = Path("/root/.hermes/allegro-cycle-state.json") DEFAULT_STATE = Path("/root/.hermes/allegro-cycle-state.json")
STATE_PATH = Path(os.environ.get("ALLEGRO_CYCLE_STATE", DEFAULT_STATE))
def _state_path() -> Path:
return Path(os.environ.get("ALLEGRO_CYCLE_STATE", DEFAULT_STATE))
# Crash-recovery threshold: if a cycle has been in_progress for longer than # Crash-recovery threshold: if a cycle has been in_progress for longer than
# this many minutes, resume_or_abort() will auto-abort it. # this many minutes, resume_or_abort() will auto-abort it.
@@ -26,7 +29,7 @@ def _now_iso() -> str:
def load_state(path: Path | str | None = None) -> dict: def load_state(path: Path | str | None = None) -> dict:
p = Path(path) if path else Path(STATE_PATH) p = Path(path) if path else _state_path()
if not p.exists(): if not p.exists():
return _empty_state() return _empty_state()
try: try:
@@ -37,7 +40,7 @@ def load_state(path: Path | str | None = None) -> dict:
def save_state(state: dict, path: Path | str | None = None) -> None: def save_state(state: dict, path: Path | str | None = None) -> None:
p = Path(path) if path else Path(STATE_PATH) p = Path(path) if path else _state_path()
p.parent.mkdir(parents=True, exist_ok=True) p.parent.mkdir(parents=True, exist_ok=True)
state["last_updated"] = _now_iso() state["last_updated"] = _now_iso()
with open(p, "w") as f: with open(p, "w") as f:

186
allegro/stop_guard.py Executable file
View File

@@ -0,0 +1,186 @@
#!/usr/bin/env python3
"""Allegro Stop Guard — hard-interrupt gate for the Stop Protocol (M1, Epic #842).
Usage:
python stop_guard.py check <target> # exit 1 if stopped, 0 if clear
python stop_guard.py record <target> # record a stop + log STOP_ACK
python stop_guard.py cleanup # remove expired locks
"""
import argparse
import json
import os
import sys
from datetime import datetime, timezone, timedelta
from pathlib import Path
DEFAULT_REGISTRY = Path("/root/.hermes/allegro-hands-off-registry.json")
DEFAULT_STOP_LOG = Path("/root/.hermes/burn-logs/allegro.log")
REGISTRY_PATH = Path(os.environ.get("ALLEGRO_STOP_REGISTRY", DEFAULT_REGISTRY))
STOP_LOG_PATH = Path(os.environ.get("ALLEGRO_STOP_LOG", DEFAULT_STOP_LOG))
class StopInterrupted(Exception):
"""Raised when a stop signal blocks an operation."""
pass
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def load_registry(path: Path | str | None = None) -> dict:
p = Path(path) if path else Path(REGISTRY_PATH)
if not p.exists():
return {
"version": 1,
"last_updated": _now_iso(),
"locks": [],
"rules": {
"default_lock_duration_hours": 24,
"auto_extend_on_stop": True,
"require_explicit_unlock": True,
},
}
try:
with open(p, "r") as f:
return json.load(f)
except Exception:
return {
"version": 1,
"last_updated": _now_iso(),
"locks": [],
"rules": {
"default_lock_duration_hours": 24,
"auto_extend_on_stop": True,
"require_explicit_unlock": True,
},
}
def save_registry(registry: dict, path: Path | str | None = None) -> None:
p = Path(path) if path else Path(REGISTRY_PATH)
p.parent.mkdir(parents=True, exist_ok=True)
registry["last_updated"] = _now_iso()
with open(p, "w") as f:
json.dump(registry, f, indent=2)
def log_stop_ack(target: str, context: str, log_path: Path | str | None = None) -> None:
p = Path(log_path) if log_path else Path(STOP_LOG_PATH)
p.parent.mkdir(parents=True, exist_ok=True)
ts = _now_iso()
entry = f"[{ts}] STOP_ACK — target='{target}' context='{context}'\n"
with open(p, "a") as f:
f.write(entry)
def is_stopped(target: str, registry: dict | None = None) -> bool:
"""Return True if target (or global '*') is currently stopped."""
reg = registry if registry is not None else load_registry()
now = datetime.now(timezone.utc)
for lock in reg.get("locks", []):
expires = lock.get("expires_at")
if expires:
try:
expires_dt = datetime.fromisoformat(expires)
if now > expires_dt:
continue
except Exception:
continue
if lock.get("entity") == target or lock.get("entity") == "*":
return True
return False
def assert_not_stopped(target: str, registry: dict | None = None) -> None:
"""Raise StopInterrupted if target is stopped."""
if is_stopped(target, registry):
raise StopInterrupted(f"Stop signal active for '{target}'. Halt immediately.")
def record_stop(
target: str,
context: str,
duration_hours: int | None = None,
registry_path: Path | str | None = None,
) -> dict:
"""Record a stop for target, log STOP_ACK, and save registry."""
reg = load_registry(registry_path)
rules = reg.get("rules", {})
duration = duration_hours or rules.get("default_lock_duration_hours", 24)
now = datetime.now(timezone.utc)
expires = (now + timedelta(hours=duration)).isoformat()
# Remove existing lock for same target
reg["locks"] = [l for l in reg.get("locks", []) if l.get("entity") != target]
lock = {
"entity": target,
"reason": context,
"locked_at": now.isoformat(),
"expires_at": expires,
"unlocked_by": None,
}
reg["locks"].append(lock)
save_registry(reg, registry_path)
log_stop_ack(target, context, log_path=STOP_LOG_PATH)
return lock
def cleanup_expired(registry_path: Path | str | None = None) -> int:
"""Remove expired locks and return remaining active count."""
reg = load_registry(registry_path)
now = datetime.now(timezone.utc)
kept = []
for lock in reg.get("locks", []):
expires = lock.get("expires_at")
if expires:
try:
expires_dt = datetime.fromisoformat(expires)
if now > expires_dt:
continue
except Exception:
continue
kept.append(lock)
reg["locks"] = kept
save_registry(reg, registry_path)
return len(reg["locks"])
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Allegro Stop Guard")
sub = parser.add_subparsers(dest="cmd")
p_check = sub.add_parser("check", help="Check if target is stopped")
p_check.add_argument("target")
p_record = sub.add_parser("record", help="Record a stop")
p_record.add_argument("target")
p_record.add_argument("--context", default="manual stop")
p_record.add_argument("--hours", type=int, default=24)
sub.add_parser("cleanup", help="Remove expired locks")
args = parser.parse_args(argv)
if args.cmd == "check":
stopped = is_stopped(args.target)
print("STOPPED" if stopped else "CLEAR")
return 1 if stopped else 0
elif args.cmd == "record":
record_stop(args.target, args.context, args.hours)
print(f"Recorded stop for {args.target} ({args.hours}h)")
return 0
elif args.cmd == "cleanup":
remaining = cleanup_expired()
print(f"Cleanup complete. {remaining} active locks.")
return 0
else:
parser.print_help()
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,143 +1,260 @@
"""100% compliance test for Allegro Commit-or-Abort (M2, Epic #842).""" """Tests for allegro.cycle_guard — Commit-or-Abort discipline, M2 Epic #842."""
import json import json
import os import os
import sys
import tempfile import tempfile
import time
import unittest
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
from pathlib import Path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) import pytest
import cycle_guard as cg from allegro.cycle_guard import (
_empty_state,
_now_iso,
_parse_dt,
abort_cycle,
check_slice_timeout,
commit_cycle,
end_slice,
load_state,
resume_or_abort,
save_state,
slice_duration_minutes,
start_cycle,
start_slice,
)
class TestCycleGuard(unittest.TestCase): class TestStateLifecycle:
def setUp(self): def test_empty_state(self):
self.tmpdir = tempfile.TemporaryDirectory() state = _empty_state()
self.state_path = os.path.join(self.tmpdir.name, "cycle_state.json") assert state["status"] == "complete"
cg.STATE_PATH = self.state_path assert state["cycle_id"] is None
assert state["version"] == 1
def tearDown(self): def test_load_state_missing_file(self):
self.tmpdir.cleanup() with tempfile.TemporaryDirectory() as td:
cg.STATE_PATH = cg.DEFAULT_STATE path = Path(td) / "missing.json"
state = load_state(path)
assert state["status"] == "complete"
def test_load_empty_state(self): def test_save_and_load_roundtrip(self):
state = cg.load_state(self.state_path) with tempfile.TemporaryDirectory() as td:
self.assertEqual(state["status"], "complete") path = Path(td) / "state.json"
self.assertIsNone(state["cycle_id"]) state = start_cycle("test-target", "details", path)
loaded = load_state(path)
assert loaded["cycle_id"] == state["cycle_id"]
assert loaded["status"] == "in_progress"
def test_load_state_malformed_json(self):
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "bad.json"
path.write_text("not json")
state = load_state(path)
assert state["status"] == "complete"
class TestCycleOperations:
def test_start_cycle(self): def test_start_cycle(self):
state = cg.start_cycle("M2: Commit-or-Abort", path=self.state_path) with tempfile.TemporaryDirectory() as td:
self.assertEqual(state["status"], "in_progress") path = Path(td) / "state.json"
self.assertEqual(state["target"], "M2: Commit-or-Abort") state = start_cycle("M2: Commit-or-Abort", "test details", path)
self.assertIsNotNone(state["cycle_id"]) assert state["status"] == "in_progress"
assert state["target"] == "M2: Commit-or-Abort"
assert state["started_at"] is not None
def test_start_slice_requires_in_progress(self): def test_start_cycle_overwrites_prior(self):
with self.assertRaises(RuntimeError): with tempfile.TemporaryDirectory() as td:
cg.start_slice("test", path=self.state_path) path = Path(td) / "state.json"
old = start_cycle("old", path=path)
def test_slice_lifecycle(self): new = start_cycle("new", path=path)
cg.start_cycle("test", path=self.state_path) assert new["target"] == "new"
cg.start_slice("gather", path=self.state_path) loaded = load_state(path)
state = cg.load_state(self.state_path) assert loaded["target"] == "new"
self.assertEqual(len(state["slices"]), 1)
self.assertEqual(state["slices"][0]["name"], "gather")
self.assertEqual(state["slices"][0]["status"], "in_progress")
cg.end_slice(status="complete", artifact="artifact.txt", path=self.state_path)
state = cg.load_state(self.state_path)
self.assertEqual(state["slices"][0]["status"], "complete")
self.assertEqual(state["slices"][0]["artifact"], "artifact.txt")
self.assertIsNotNone(state["slices"][0]["ended_at"])
def test_commit_cycle(self): def test_commit_cycle(self):
cg.start_cycle("test", path=self.state_path) with tempfile.TemporaryDirectory() as td:
cg.start_slice("work", path=self.state_path) path = Path(td) / "state.json"
cg.end_slice(path=self.state_path) start_cycle("test", path=path)
proof = {"files": ["a.py"]} proof = {"files": ["a.py"], "tests": "passed"}
state = cg.commit_cycle(proof=proof, path=self.state_path) state = commit_cycle(proof, path)
self.assertEqual(state["status"], "complete") assert state["status"] == "complete"
self.assertEqual(state["proof"], proof) assert state["proof"]["files"] == ["a.py"]
self.assertIsNotNone(state["completed_at"]) assert state["completed_at"] is not None
def test_commit_without_in_progress_fails(self): def test_commit_cycle_not_in_progress_raises(self):
with self.assertRaises(RuntimeError): with tempfile.TemporaryDirectory() as td:
cg.commit_cycle(path=self.state_path) path = Path(td) / "state.json"
with pytest.raises(RuntimeError, match="not in_progress"):
commit_cycle(path=path)
def test_abort_cycle(self): def test_abort_cycle(self):
cg.start_cycle("test", path=self.state_path) with tempfile.TemporaryDirectory() as td:
cg.start_slice("work", path=self.state_path) path = Path(td) / "state.json"
state = cg.abort_cycle("manual abort", path=self.state_path) start_cycle("test", path=path)
self.assertEqual(state["status"], "aborted") state = abort_cycle("timeout", path)
self.assertEqual(state["abort_reason"], "manual abort") assert state["status"] == "aborted"
self.assertIsNotNone(state["aborted_at"]) assert state["abort_reason"] == "timeout"
self.assertEqual(state["slices"][-1]["status"], "aborted") assert state["aborted_at"] is not None
def test_slice_timeout_true(self): def test_abort_cycle_not_in_progress_raises(self):
cg.start_cycle("test", path=self.state_path) with tempfile.TemporaryDirectory() as td:
cg.start_slice("work", path=self.state_path) path = Path(td) / "state.json"
# Manually backdate slice start to 11 minutes ago with pytest.raises(RuntimeError, match="not in_progress"):
state = cg.load_state(self.state_path) abort_cycle("reason", path=path)
old = (datetime.now(timezone.utc) - timedelta(minutes=11)).isoformat()
state["slices"][0]["started_at"] = old
cg.save_state(state, self.state_path)
self.assertTrue(cg.check_slice_timeout(max_minutes=10, path=self.state_path))
def test_slice_timeout_false(self):
cg.start_cycle("test", path=self.state_path)
cg.start_slice("work", path=self.state_path)
self.assertFalse(cg.check_slice_timeout(max_minutes=10, path=self.state_path))
def test_resume_or_abort_keeps_fresh_cycle(self): class TestSliceOperations:
cg.start_cycle("test", path=self.state_path) def test_start_and_end_slice(self):
state = cg.resume_or_abort(path=self.state_path) with tempfile.TemporaryDirectory() as td:
self.assertEqual(state["status"], "in_progress") path = Path(td) / "state.json"
start_cycle("test", path=path)
start_slice("research", path)
state = end_slice("complete", "found answer", path)
assert len(state["slices"]) == 1
assert state["slices"][0]["name"] == "research"
assert state["slices"][0]["status"] == "complete"
assert state["slices"][0]["artifact"] == "found answer"
assert state["slices"][0]["ended_at"] is not None
def test_resume_or_abort_aborts_stale_cycle(self): def test_start_slice_without_cycle_raises(self):
cg.start_cycle("test", path=self.state_path) with tempfile.TemporaryDirectory() as td:
# Backdate start to 31 minutes ago path = Path(td) / "state.json"
state = cg.load_state(self.state_path) with pytest.raises(RuntimeError, match="in_progress"):
old = (datetime.now(timezone.utc) - timedelta(minutes=31)).isoformat() start_slice("name", path)
state["started_at"] = old
cg.save_state(state, self.state_path) def test_end_slice_without_slice_raises(self):
state = cg.resume_or_abort(path=self.state_path) with tempfile.TemporaryDirectory() as td:
self.assertEqual(state["status"], "aborted") path = Path(td) / "state.json"
self.assertIn("crash recovery", state["abort_reason"]) start_cycle("test", path=path)
with pytest.raises(RuntimeError, match="No active slice"):
end_slice(path=path)
def test_slice_duration_minutes(self): def test_slice_duration_minutes(self):
cg.start_cycle("test", path=self.state_path) with tempfile.TemporaryDirectory() as td:
cg.start_slice("work", path=self.state_path) path = Path(td) / "state.json"
# Backdate by 5 minutes start_cycle("test", path=path)
state = cg.load_state(self.state_path) start_slice("long", path)
old = (datetime.now(timezone.utc) - timedelta(minutes=5)).isoformat() state = load_state(path)
state["slices"][0]["started_at"] = old state["slices"][0]["started_at"] = (datetime.now(timezone.utc) - timedelta(minutes=5)).isoformat()
cg.save_state(state, self.state_path) save_state(state, path)
mins = cg.slice_duration_minutes(path=self.state_path) minutes = slice_duration_minutes(path)
self.assertAlmostEqual(mins, 5.0, delta=0.5) assert minutes is not None
assert minutes >= 4.9
def test_cli_resume_prints_status(self): def test_check_slice_timeout_true(self):
cg.start_cycle("test", path=self.state_path) with tempfile.TemporaryDirectory() as td:
rc = cg.main(["resume"]) path = Path(td) / "state.json"
self.assertEqual(rc, 0) start_cycle("test", path=path)
start_slice("old", path)
state = load_state(path)
state["slices"][0]["started_at"] = (datetime.now(timezone.utc) - timedelta(minutes=15)).isoformat()
save_state(state, path)
assert check_slice_timeout(max_minutes=10.0, path=path) is True
def test_cli_check_timeout(self): def test_check_slice_timeout_false(self):
cg.start_cycle("test", path=self.state_path) with tempfile.TemporaryDirectory() as td:
cg.start_slice("work", path=self.state_path) path = Path(td) / "state.json"
state = cg.load_state(self.state_path) start_cycle("test", path=path)
old = (datetime.now(timezone.utc) - timedelta(minutes=11)).isoformat() start_slice("fresh", path)
state["slices"][0]["started_at"] = old assert check_slice_timeout(max_minutes=10.0, path=path) is False
cg.save_state(state, self.state_path)
rc = cg.main(["check"])
self.assertEqual(rc, 1)
def test_cli_check_ok(self):
cg.start_cycle("test", path=self.state_path)
cg.start_slice("work", path=self.state_path)
rc = cg.main(["check"])
self.assertEqual(rc, 0)
if __name__ == "__main__": class TestCrashRecovery:
unittest.main() def test_resume_or_abort_aborts_stale_cycle(self):
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "state.json"
start_cycle("test", path=path)
state = load_state(path)
state["started_at"] = (datetime.now(timezone.utc) - timedelta(minutes=60)).isoformat()
save_state(state, path)
result = resume_or_abort(path)
assert result["status"] == "aborted"
assert "stale cycle" in result["abort_reason"]
def test_resume_or_abort_keeps_fresh_cycle(self):
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "state.json"
start_cycle("test", path=path)
result = resume_or_abort(path)
assert result["status"] == "in_progress"
def test_resume_or_abort_no_op_when_complete(self):
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "state.json"
state = _empty_state()
save_state(state, path)
result = resume_or_abort(path)
assert result["status"] == "complete"
class TestDateParsing:
def test_parse_dt_with_z(self):
dt = _parse_dt("2026-04-06T12:00:00Z")
assert dt.tzinfo is not None
def test_parse_dt_with_offset(self):
iso = "2026-04-06T12:00:00+00:00"
dt = _parse_dt(iso)
assert dt.tzinfo is not None
class TestCLI:
def test_cli_resume(self, capsys):
from allegro.cycle_guard import main
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "state.json"
start_cycle("cli", path=path)
os.environ["ALLEGRO_CYCLE_STATE"] = str(path)
rc = main(["resume"])
captured = capsys.readouterr()
assert rc == 0
assert captured.out.strip() == "in_progress"
def test_cli_start(self, capsys):
from allegro.cycle_guard import main
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "state.json"
os.environ["ALLEGRO_CYCLE_STATE"] = str(path)
rc = main(["start", "target", "--details", "d"])
captured = capsys.readouterr()
assert rc == 0
assert "Cycle started" in captured.out
def test_cli_commit(self, capsys):
from allegro.cycle_guard import main
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "state.json"
os.environ["ALLEGRO_CYCLE_STATE"] = str(path)
main(["start", "t"])
rc = main(["commit", "--proof", '{"ok": true}'])
captured = capsys.readouterr()
assert rc == 0
assert "Cycle committed" in captured.out
def test_cli_check_timeout(self, capsys):
from allegro.cycle_guard import main
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "state.json"
start_cycle("t", path=path)
start_slice("s", path=path)
state = load_state(path)
state["slices"][0]["started_at"] = (datetime.now(timezone.utc) - timedelta(minutes=15)).isoformat()
save_state(state, path)
os.environ["ALLEGRO_CYCLE_STATE"] = str(path)
rc = main(["check"])
captured = capsys.readouterr()
assert rc == 1
assert captured.out.strip() == "TIMEOUT"
def test_cli_check_ok(self, capsys):
from allegro.cycle_guard import main
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "state.json"
start_cycle("t", path=path)
start_slice("s", path=path)
os.environ["ALLEGRO_CYCLE_STATE"] = str(path)
rc = main(["check"])
captured = capsys.readouterr()
assert rc == 0
assert captured.out.strip() == "OK"

View File

@@ -1,61 +1,25 @@
model: model:
default: kimi-for-coding default: claude-sonnet-4-20250514
provider: kimi-coding provider: anthropic
toolsets: toolsets:
- all - all
fallback_providers:
- provider: anthropic
model: claude-sonnet-4-20250514
timeout: 120
reason: Primary Anthropic fallback
- provider: openrouter
model: anthropic/claude-sonnet-4-20250514
timeout: 120
reason: OpenRouter Anthropic fallback
agent: agent:
max_turns: 30 max_turns: 40
reasoning_effort: xhigh reasoning_effort: medium
verbose: false verbose: false
terminal: terminal:
backend: local backend: local
cwd: . cwd: /root/wizards/allegro
timeout: 180 timeout: 180
persistent_shell: true
browser:
inactivity_timeout: 120
command_timeout: 30
record_sessions: false
display:
compact: false
personality: ''
resume_display: full
busy_input_mode: interrupt
bell_on_complete: false
show_reasoning: false
streaming: false
show_cost: false
tool_progress: all
memory: memory:
memory_enabled: true provider: local
user_profile_enabled: true max_entries: 50
memory_char_limit: 2200
user_char_limit: 1375
nudge_interval: 10
flush_min_turns: 6
approvals:
mode: manual
security:
redact_secrets: true
tirith_enabled: false
platforms:
api_server:
enabled: true
extra:
host: 127.0.0.1
port: 8645
session_reset:
mode: none
idle_minutes: 0
skills:
creation_nudge_interval: 15
system_prompt_suffix: |
You are Allegro, the Kimi-backed third wizard house.
Your soul is defined in SOUL.md — read it, live it.
Hermes is your harness.
Kimi Code is your primary provider.
You speak plainly. You prefer short sentences. Brevity is a kindness.
Work best on tight coding tasks: 1-3 file changes, refactors, tests, and implementation passes.
Refusal over fabrication. If you do not know, say so.
Sovereignty and service always.