Compare commits
2 Commits
timmy/dead
...
allegro/m2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7b93d34374 | ||
|
|
2b5d3c057d |
1
allegro/__init__.py
Normal file
1
allegro/__init__.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
"""Allegro self-improvement guard modules for Epic #842."""
|
||||||
@@ -14,7 +14,10 @@ from datetime import datetime, timezone, timedelta
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
DEFAULT_STATE = Path("/root/.hermes/allegro-cycle-state.json")
|
DEFAULT_STATE = Path("/root/.hermes/allegro-cycle-state.json")
|
||||||
STATE_PATH = Path(os.environ.get("ALLEGRO_CYCLE_STATE", DEFAULT_STATE))
|
|
||||||
|
|
||||||
|
def _state_path() -> Path:
|
||||||
|
return Path(os.environ.get("ALLEGRO_CYCLE_STATE", DEFAULT_STATE))
|
||||||
|
|
||||||
# Crash-recovery threshold: if a cycle has been in_progress for longer than
|
# Crash-recovery threshold: if a cycle has been in_progress for longer than
|
||||||
# this many minutes, resume_or_abort() will auto-abort it.
|
# this many minutes, resume_or_abort() will auto-abort it.
|
||||||
@@ -26,7 +29,7 @@ def _now_iso() -> str:
|
|||||||
|
|
||||||
|
|
||||||
def load_state(path: Path | str | None = None) -> dict:
|
def load_state(path: Path | str | None = None) -> dict:
|
||||||
p = Path(path) if path else Path(STATE_PATH)
|
p = Path(path) if path else _state_path()
|
||||||
if not p.exists():
|
if not p.exists():
|
||||||
return _empty_state()
|
return _empty_state()
|
||||||
try:
|
try:
|
||||||
@@ -37,7 +40,7 @@ def load_state(path: Path | str | None = None) -> dict:
|
|||||||
|
|
||||||
|
|
||||||
def save_state(state: dict, path: Path | str | None = None) -> None:
|
def save_state(state: dict, path: Path | str | None = None) -> None:
|
||||||
p = Path(path) if path else Path(STATE_PATH)
|
p = Path(path) if path else _state_path()
|
||||||
p.parent.mkdir(parents=True, exist_ok=True)
|
p.parent.mkdir(parents=True, exist_ok=True)
|
||||||
state["last_updated"] = _now_iso()
|
state["last_updated"] = _now_iso()
|
||||||
with open(p, "w") as f:
|
with open(p, "w") as f:
|
||||||
|
|||||||
186
allegro/stop_guard.py
Executable file
186
allegro/stop_guard.py
Executable file
@@ -0,0 +1,186 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Allegro Stop Guard — hard-interrupt gate for the Stop Protocol (M1, Epic #842).
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python stop_guard.py check <target> # exit 1 if stopped, 0 if clear
|
||||||
|
python stop_guard.py record <target> # record a stop + log STOP_ACK
|
||||||
|
python stop_guard.py cleanup # remove expired locks
|
||||||
|
"""
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from datetime import datetime, timezone, timedelta
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
DEFAULT_REGISTRY = Path("/root/.hermes/allegro-hands-off-registry.json")
|
||||||
|
DEFAULT_STOP_LOG = Path("/root/.hermes/burn-logs/allegro.log")
|
||||||
|
|
||||||
|
REGISTRY_PATH = Path(os.environ.get("ALLEGRO_STOP_REGISTRY", DEFAULT_REGISTRY))
|
||||||
|
STOP_LOG_PATH = Path(os.environ.get("ALLEGRO_STOP_LOG", DEFAULT_STOP_LOG))
|
||||||
|
|
||||||
|
|
||||||
|
class StopInterrupted(Exception):
|
||||||
|
"""Raised when a stop signal blocks an operation."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _now_iso() -> str:
|
||||||
|
return datetime.now(timezone.utc).isoformat()
|
||||||
|
|
||||||
|
|
||||||
|
def load_registry(path: Path | str | None = None) -> dict:
|
||||||
|
p = Path(path) if path else Path(REGISTRY_PATH)
|
||||||
|
if not p.exists():
|
||||||
|
return {
|
||||||
|
"version": 1,
|
||||||
|
"last_updated": _now_iso(),
|
||||||
|
"locks": [],
|
||||||
|
"rules": {
|
||||||
|
"default_lock_duration_hours": 24,
|
||||||
|
"auto_extend_on_stop": True,
|
||||||
|
"require_explicit_unlock": True,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
with open(p, "r") as f:
|
||||||
|
return json.load(f)
|
||||||
|
except Exception:
|
||||||
|
return {
|
||||||
|
"version": 1,
|
||||||
|
"last_updated": _now_iso(),
|
||||||
|
"locks": [],
|
||||||
|
"rules": {
|
||||||
|
"default_lock_duration_hours": 24,
|
||||||
|
"auto_extend_on_stop": True,
|
||||||
|
"require_explicit_unlock": True,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def save_registry(registry: dict, path: Path | str | None = None) -> None:
|
||||||
|
p = Path(path) if path else Path(REGISTRY_PATH)
|
||||||
|
p.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
registry["last_updated"] = _now_iso()
|
||||||
|
with open(p, "w") as f:
|
||||||
|
json.dump(registry, f, indent=2)
|
||||||
|
|
||||||
|
|
||||||
|
def log_stop_ack(target: str, context: str, log_path: Path | str | None = None) -> None:
|
||||||
|
p = Path(log_path) if log_path else Path(STOP_LOG_PATH)
|
||||||
|
p.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
ts = _now_iso()
|
||||||
|
entry = f"[{ts}] STOP_ACK — target='{target}' context='{context}'\n"
|
||||||
|
with open(p, "a") as f:
|
||||||
|
f.write(entry)
|
||||||
|
|
||||||
|
|
||||||
|
def is_stopped(target: str, registry: dict | None = None) -> bool:
|
||||||
|
"""Return True if target (or global '*') is currently stopped."""
|
||||||
|
reg = registry if registry is not None else load_registry()
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
for lock in reg.get("locks", []):
|
||||||
|
expires = lock.get("expires_at")
|
||||||
|
if expires:
|
||||||
|
try:
|
||||||
|
expires_dt = datetime.fromisoformat(expires)
|
||||||
|
if now > expires_dt:
|
||||||
|
continue
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
if lock.get("entity") == target or lock.get("entity") == "*":
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def assert_not_stopped(target: str, registry: dict | None = None) -> None:
|
||||||
|
"""Raise StopInterrupted if target is stopped."""
|
||||||
|
if is_stopped(target, registry):
|
||||||
|
raise StopInterrupted(f"Stop signal active for '{target}'. Halt immediately.")
|
||||||
|
|
||||||
|
|
||||||
|
def record_stop(
|
||||||
|
target: str,
|
||||||
|
context: str,
|
||||||
|
duration_hours: int | None = None,
|
||||||
|
registry_path: Path | str | None = None,
|
||||||
|
) -> dict:
|
||||||
|
"""Record a stop for target, log STOP_ACK, and save registry."""
|
||||||
|
reg = load_registry(registry_path)
|
||||||
|
rules = reg.get("rules", {})
|
||||||
|
duration = duration_hours or rules.get("default_lock_duration_hours", 24)
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
expires = (now + timedelta(hours=duration)).isoformat()
|
||||||
|
|
||||||
|
# Remove existing lock for same target
|
||||||
|
reg["locks"] = [l for l in reg.get("locks", []) if l.get("entity") != target]
|
||||||
|
|
||||||
|
lock = {
|
||||||
|
"entity": target,
|
||||||
|
"reason": context,
|
||||||
|
"locked_at": now.isoformat(),
|
||||||
|
"expires_at": expires,
|
||||||
|
"unlocked_by": None,
|
||||||
|
}
|
||||||
|
reg["locks"].append(lock)
|
||||||
|
save_registry(reg, registry_path)
|
||||||
|
log_stop_ack(target, context, log_path=STOP_LOG_PATH)
|
||||||
|
return lock
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup_expired(registry_path: Path | str | None = None) -> int:
|
||||||
|
"""Remove expired locks and return remaining active count."""
|
||||||
|
reg = load_registry(registry_path)
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
kept = []
|
||||||
|
for lock in reg.get("locks", []):
|
||||||
|
expires = lock.get("expires_at")
|
||||||
|
if expires:
|
||||||
|
try:
|
||||||
|
expires_dt = datetime.fromisoformat(expires)
|
||||||
|
if now > expires_dt:
|
||||||
|
continue
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
kept.append(lock)
|
||||||
|
reg["locks"] = kept
|
||||||
|
save_registry(reg, registry_path)
|
||||||
|
return len(reg["locks"])
|
||||||
|
|
||||||
|
|
||||||
|
def main(argv: list[str] | None = None) -> int:
|
||||||
|
parser = argparse.ArgumentParser(description="Allegro Stop Guard")
|
||||||
|
sub = parser.add_subparsers(dest="cmd")
|
||||||
|
|
||||||
|
p_check = sub.add_parser("check", help="Check if target is stopped")
|
||||||
|
p_check.add_argument("target")
|
||||||
|
|
||||||
|
p_record = sub.add_parser("record", help="Record a stop")
|
||||||
|
p_record.add_argument("target")
|
||||||
|
p_record.add_argument("--context", default="manual stop")
|
||||||
|
p_record.add_argument("--hours", type=int, default=24)
|
||||||
|
|
||||||
|
sub.add_parser("cleanup", help="Remove expired locks")
|
||||||
|
|
||||||
|
args = parser.parse_args(argv)
|
||||||
|
|
||||||
|
if args.cmd == "check":
|
||||||
|
stopped = is_stopped(args.target)
|
||||||
|
print("STOPPED" if stopped else "CLEAR")
|
||||||
|
return 1 if stopped else 0
|
||||||
|
elif args.cmd == "record":
|
||||||
|
record_stop(args.target, args.context, args.hours)
|
||||||
|
print(f"Recorded stop for {args.target} ({args.hours}h)")
|
||||||
|
return 0
|
||||||
|
elif args.cmd == "cleanup":
|
||||||
|
remaining = cleanup_expired()
|
||||||
|
print(f"Cleanup complete. {remaining} active locks.")
|
||||||
|
return 0
|
||||||
|
else:
|
||||||
|
parser.print_help()
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main())
|
||||||
@@ -1,143 +1,260 @@
|
|||||||
"""100% compliance test for Allegro Commit-or-Abort (M2, Epic #842)."""
|
"""Tests for allegro.cycle_guard — Commit-or-Abort discipline, M2 Epic #842."""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import sys
|
|
||||||
import tempfile
|
import tempfile
|
||||||
import time
|
|
||||||
import unittest
|
|
||||||
from datetime import datetime, timezone, timedelta
|
from datetime import datetime, timezone, timedelta
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
import pytest
|
||||||
|
|
||||||
import cycle_guard as cg
|
from allegro.cycle_guard import (
|
||||||
|
_empty_state,
|
||||||
|
_now_iso,
|
||||||
|
_parse_dt,
|
||||||
|
abort_cycle,
|
||||||
|
check_slice_timeout,
|
||||||
|
commit_cycle,
|
||||||
|
end_slice,
|
||||||
|
load_state,
|
||||||
|
resume_or_abort,
|
||||||
|
save_state,
|
||||||
|
slice_duration_minutes,
|
||||||
|
start_cycle,
|
||||||
|
start_slice,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class TestCycleGuard(unittest.TestCase):
|
class TestStateLifecycle:
|
||||||
def setUp(self):
|
def test_empty_state(self):
|
||||||
self.tmpdir = tempfile.TemporaryDirectory()
|
state = _empty_state()
|
||||||
self.state_path = os.path.join(self.tmpdir.name, "cycle_state.json")
|
assert state["status"] == "complete"
|
||||||
cg.STATE_PATH = self.state_path
|
assert state["cycle_id"] is None
|
||||||
|
assert state["version"] == 1
|
||||||
|
|
||||||
def tearDown(self):
|
def test_load_state_missing_file(self):
|
||||||
self.tmpdir.cleanup()
|
with tempfile.TemporaryDirectory() as td:
|
||||||
cg.STATE_PATH = cg.DEFAULT_STATE
|
path = Path(td) / "missing.json"
|
||||||
|
state = load_state(path)
|
||||||
|
assert state["status"] == "complete"
|
||||||
|
|
||||||
def test_load_empty_state(self):
|
def test_save_and_load_roundtrip(self):
|
||||||
state = cg.load_state(self.state_path)
|
with tempfile.TemporaryDirectory() as td:
|
||||||
self.assertEqual(state["status"], "complete")
|
path = Path(td) / "state.json"
|
||||||
self.assertIsNone(state["cycle_id"])
|
state = start_cycle("test-target", "details", path)
|
||||||
|
loaded = load_state(path)
|
||||||
|
assert loaded["cycle_id"] == state["cycle_id"]
|
||||||
|
assert loaded["status"] == "in_progress"
|
||||||
|
|
||||||
|
def test_load_state_malformed_json(self):
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
path = Path(td) / "bad.json"
|
||||||
|
path.write_text("not json")
|
||||||
|
state = load_state(path)
|
||||||
|
assert state["status"] == "complete"
|
||||||
|
|
||||||
|
|
||||||
|
class TestCycleOperations:
|
||||||
def test_start_cycle(self):
|
def test_start_cycle(self):
|
||||||
state = cg.start_cycle("M2: Commit-or-Abort", path=self.state_path)
|
with tempfile.TemporaryDirectory() as td:
|
||||||
self.assertEqual(state["status"], "in_progress")
|
path = Path(td) / "state.json"
|
||||||
self.assertEqual(state["target"], "M2: Commit-or-Abort")
|
state = start_cycle("M2: Commit-or-Abort", "test details", path)
|
||||||
self.assertIsNotNone(state["cycle_id"])
|
assert state["status"] == "in_progress"
|
||||||
|
assert state["target"] == "M2: Commit-or-Abort"
|
||||||
|
assert state["started_at"] is not None
|
||||||
|
|
||||||
def test_start_slice_requires_in_progress(self):
|
def test_start_cycle_overwrites_prior(self):
|
||||||
with self.assertRaises(RuntimeError):
|
with tempfile.TemporaryDirectory() as td:
|
||||||
cg.start_slice("test", path=self.state_path)
|
path = Path(td) / "state.json"
|
||||||
|
old = start_cycle("old", path=path)
|
||||||
def test_slice_lifecycle(self):
|
new = start_cycle("new", path=path)
|
||||||
cg.start_cycle("test", path=self.state_path)
|
assert new["target"] == "new"
|
||||||
cg.start_slice("gather", path=self.state_path)
|
loaded = load_state(path)
|
||||||
state = cg.load_state(self.state_path)
|
assert loaded["target"] == "new"
|
||||||
self.assertEqual(len(state["slices"]), 1)
|
|
||||||
self.assertEqual(state["slices"][0]["name"], "gather")
|
|
||||||
self.assertEqual(state["slices"][0]["status"], "in_progress")
|
|
||||||
|
|
||||||
cg.end_slice(status="complete", artifact="artifact.txt", path=self.state_path)
|
|
||||||
state = cg.load_state(self.state_path)
|
|
||||||
self.assertEqual(state["slices"][0]["status"], "complete")
|
|
||||||
self.assertEqual(state["slices"][0]["artifact"], "artifact.txt")
|
|
||||||
self.assertIsNotNone(state["slices"][0]["ended_at"])
|
|
||||||
|
|
||||||
def test_commit_cycle(self):
|
def test_commit_cycle(self):
|
||||||
cg.start_cycle("test", path=self.state_path)
|
with tempfile.TemporaryDirectory() as td:
|
||||||
cg.start_slice("work", path=self.state_path)
|
path = Path(td) / "state.json"
|
||||||
cg.end_slice(path=self.state_path)
|
start_cycle("test", path=path)
|
||||||
proof = {"files": ["a.py"]}
|
proof = {"files": ["a.py"], "tests": "passed"}
|
||||||
state = cg.commit_cycle(proof=proof, path=self.state_path)
|
state = commit_cycle(proof, path)
|
||||||
self.assertEqual(state["status"], "complete")
|
assert state["status"] == "complete"
|
||||||
self.assertEqual(state["proof"], proof)
|
assert state["proof"]["files"] == ["a.py"]
|
||||||
self.assertIsNotNone(state["completed_at"])
|
assert state["completed_at"] is not None
|
||||||
|
|
||||||
def test_commit_without_in_progress_fails(self):
|
def test_commit_cycle_not_in_progress_raises(self):
|
||||||
with self.assertRaises(RuntimeError):
|
with tempfile.TemporaryDirectory() as td:
|
||||||
cg.commit_cycle(path=self.state_path)
|
path = Path(td) / "state.json"
|
||||||
|
with pytest.raises(RuntimeError, match="not in_progress"):
|
||||||
|
commit_cycle(path=path)
|
||||||
|
|
||||||
def test_abort_cycle(self):
|
def test_abort_cycle(self):
|
||||||
cg.start_cycle("test", path=self.state_path)
|
with tempfile.TemporaryDirectory() as td:
|
||||||
cg.start_slice("work", path=self.state_path)
|
path = Path(td) / "state.json"
|
||||||
state = cg.abort_cycle("manual abort", path=self.state_path)
|
start_cycle("test", path=path)
|
||||||
self.assertEqual(state["status"], "aborted")
|
state = abort_cycle("timeout", path)
|
||||||
self.assertEqual(state["abort_reason"], "manual abort")
|
assert state["status"] == "aborted"
|
||||||
self.assertIsNotNone(state["aborted_at"])
|
assert state["abort_reason"] == "timeout"
|
||||||
self.assertEqual(state["slices"][-1]["status"], "aborted")
|
assert state["aborted_at"] is not None
|
||||||
|
|
||||||
def test_slice_timeout_true(self):
|
def test_abort_cycle_not_in_progress_raises(self):
|
||||||
cg.start_cycle("test", path=self.state_path)
|
with tempfile.TemporaryDirectory() as td:
|
||||||
cg.start_slice("work", path=self.state_path)
|
path = Path(td) / "state.json"
|
||||||
# Manually backdate slice start to 11 minutes ago
|
with pytest.raises(RuntimeError, match="not in_progress"):
|
||||||
state = cg.load_state(self.state_path)
|
abort_cycle("reason", path=path)
|
||||||
old = (datetime.now(timezone.utc) - timedelta(minutes=11)).isoformat()
|
|
||||||
state["slices"][0]["started_at"] = old
|
|
||||||
cg.save_state(state, self.state_path)
|
|
||||||
self.assertTrue(cg.check_slice_timeout(max_minutes=10, path=self.state_path))
|
|
||||||
|
|
||||||
def test_slice_timeout_false(self):
|
|
||||||
cg.start_cycle("test", path=self.state_path)
|
|
||||||
cg.start_slice("work", path=self.state_path)
|
|
||||||
self.assertFalse(cg.check_slice_timeout(max_minutes=10, path=self.state_path))
|
|
||||||
|
|
||||||
def test_resume_or_abort_keeps_fresh_cycle(self):
|
class TestSliceOperations:
|
||||||
cg.start_cycle("test", path=self.state_path)
|
def test_start_and_end_slice(self):
|
||||||
state = cg.resume_or_abort(path=self.state_path)
|
with tempfile.TemporaryDirectory() as td:
|
||||||
self.assertEqual(state["status"], "in_progress")
|
path = Path(td) / "state.json"
|
||||||
|
start_cycle("test", path=path)
|
||||||
|
start_slice("research", path)
|
||||||
|
state = end_slice("complete", "found answer", path)
|
||||||
|
assert len(state["slices"]) == 1
|
||||||
|
assert state["slices"][0]["name"] == "research"
|
||||||
|
assert state["slices"][0]["status"] == "complete"
|
||||||
|
assert state["slices"][0]["artifact"] == "found answer"
|
||||||
|
assert state["slices"][0]["ended_at"] is not None
|
||||||
|
|
||||||
def test_resume_or_abort_aborts_stale_cycle(self):
|
def test_start_slice_without_cycle_raises(self):
|
||||||
cg.start_cycle("test", path=self.state_path)
|
with tempfile.TemporaryDirectory() as td:
|
||||||
# Backdate start to 31 minutes ago
|
path = Path(td) / "state.json"
|
||||||
state = cg.load_state(self.state_path)
|
with pytest.raises(RuntimeError, match="in_progress"):
|
||||||
old = (datetime.now(timezone.utc) - timedelta(minutes=31)).isoformat()
|
start_slice("name", path)
|
||||||
state["started_at"] = old
|
|
||||||
cg.save_state(state, self.state_path)
|
def test_end_slice_without_slice_raises(self):
|
||||||
state = cg.resume_or_abort(path=self.state_path)
|
with tempfile.TemporaryDirectory() as td:
|
||||||
self.assertEqual(state["status"], "aborted")
|
path = Path(td) / "state.json"
|
||||||
self.assertIn("crash recovery", state["abort_reason"])
|
start_cycle("test", path=path)
|
||||||
|
with pytest.raises(RuntimeError, match="No active slice"):
|
||||||
|
end_slice(path=path)
|
||||||
|
|
||||||
def test_slice_duration_minutes(self):
|
def test_slice_duration_minutes(self):
|
||||||
cg.start_cycle("test", path=self.state_path)
|
with tempfile.TemporaryDirectory() as td:
|
||||||
cg.start_slice("work", path=self.state_path)
|
path = Path(td) / "state.json"
|
||||||
# Backdate by 5 minutes
|
start_cycle("test", path=path)
|
||||||
state = cg.load_state(self.state_path)
|
start_slice("long", path)
|
||||||
old = (datetime.now(timezone.utc) - timedelta(minutes=5)).isoformat()
|
state = load_state(path)
|
||||||
state["slices"][0]["started_at"] = old
|
state["slices"][0]["started_at"] = (datetime.now(timezone.utc) - timedelta(minutes=5)).isoformat()
|
||||||
cg.save_state(state, self.state_path)
|
save_state(state, path)
|
||||||
mins = cg.slice_duration_minutes(path=self.state_path)
|
minutes = slice_duration_minutes(path)
|
||||||
self.assertAlmostEqual(mins, 5.0, delta=0.5)
|
assert minutes is not None
|
||||||
|
assert minutes >= 4.9
|
||||||
|
|
||||||
def test_cli_resume_prints_status(self):
|
def test_check_slice_timeout_true(self):
|
||||||
cg.start_cycle("test", path=self.state_path)
|
with tempfile.TemporaryDirectory() as td:
|
||||||
rc = cg.main(["resume"])
|
path = Path(td) / "state.json"
|
||||||
self.assertEqual(rc, 0)
|
start_cycle("test", path=path)
|
||||||
|
start_slice("old", path)
|
||||||
|
state = load_state(path)
|
||||||
|
state["slices"][0]["started_at"] = (datetime.now(timezone.utc) - timedelta(minutes=15)).isoformat()
|
||||||
|
save_state(state, path)
|
||||||
|
assert check_slice_timeout(max_minutes=10.0, path=path) is True
|
||||||
|
|
||||||
def test_cli_check_timeout(self):
|
def test_check_slice_timeout_false(self):
|
||||||
cg.start_cycle("test", path=self.state_path)
|
with tempfile.TemporaryDirectory() as td:
|
||||||
cg.start_slice("work", path=self.state_path)
|
path = Path(td) / "state.json"
|
||||||
state = cg.load_state(self.state_path)
|
start_cycle("test", path=path)
|
||||||
old = (datetime.now(timezone.utc) - timedelta(minutes=11)).isoformat()
|
start_slice("fresh", path)
|
||||||
state["slices"][0]["started_at"] = old
|
assert check_slice_timeout(max_minutes=10.0, path=path) is False
|
||||||
cg.save_state(state, self.state_path)
|
|
||||||
rc = cg.main(["check"])
|
|
||||||
self.assertEqual(rc, 1)
|
|
||||||
|
|
||||||
def test_cli_check_ok(self):
|
|
||||||
cg.start_cycle("test", path=self.state_path)
|
|
||||||
cg.start_slice("work", path=self.state_path)
|
|
||||||
rc = cg.main(["check"])
|
|
||||||
self.assertEqual(rc, 0)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
class TestCrashRecovery:
|
||||||
unittest.main()
|
def test_resume_or_abort_aborts_stale_cycle(self):
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
path = Path(td) / "state.json"
|
||||||
|
start_cycle("test", path=path)
|
||||||
|
state = load_state(path)
|
||||||
|
state["started_at"] = (datetime.now(timezone.utc) - timedelta(minutes=60)).isoformat()
|
||||||
|
save_state(state, path)
|
||||||
|
result = resume_or_abort(path)
|
||||||
|
assert result["status"] == "aborted"
|
||||||
|
assert "stale cycle" in result["abort_reason"]
|
||||||
|
|
||||||
|
def test_resume_or_abort_keeps_fresh_cycle(self):
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
path = Path(td) / "state.json"
|
||||||
|
start_cycle("test", path=path)
|
||||||
|
result = resume_or_abort(path)
|
||||||
|
assert result["status"] == "in_progress"
|
||||||
|
|
||||||
|
def test_resume_or_abort_no_op_when_complete(self):
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
path = Path(td) / "state.json"
|
||||||
|
state = _empty_state()
|
||||||
|
save_state(state, path)
|
||||||
|
result = resume_or_abort(path)
|
||||||
|
assert result["status"] == "complete"
|
||||||
|
|
||||||
|
|
||||||
|
class TestDateParsing:
|
||||||
|
def test_parse_dt_with_z(self):
|
||||||
|
dt = _parse_dt("2026-04-06T12:00:00Z")
|
||||||
|
assert dt.tzinfo is not None
|
||||||
|
|
||||||
|
def test_parse_dt_with_offset(self):
|
||||||
|
iso = "2026-04-06T12:00:00+00:00"
|
||||||
|
dt = _parse_dt(iso)
|
||||||
|
assert dt.tzinfo is not None
|
||||||
|
|
||||||
|
|
||||||
|
class TestCLI:
|
||||||
|
def test_cli_resume(self, capsys):
|
||||||
|
from allegro.cycle_guard import main
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
path = Path(td) / "state.json"
|
||||||
|
start_cycle("cli", path=path)
|
||||||
|
os.environ["ALLEGRO_CYCLE_STATE"] = str(path)
|
||||||
|
rc = main(["resume"])
|
||||||
|
captured = capsys.readouterr()
|
||||||
|
assert rc == 0
|
||||||
|
assert captured.out.strip() == "in_progress"
|
||||||
|
|
||||||
|
def test_cli_start(self, capsys):
|
||||||
|
from allegro.cycle_guard import main
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
path = Path(td) / "state.json"
|
||||||
|
os.environ["ALLEGRO_CYCLE_STATE"] = str(path)
|
||||||
|
rc = main(["start", "target", "--details", "d"])
|
||||||
|
captured = capsys.readouterr()
|
||||||
|
assert rc == 0
|
||||||
|
assert "Cycle started" in captured.out
|
||||||
|
|
||||||
|
def test_cli_commit(self, capsys):
|
||||||
|
from allegro.cycle_guard import main
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
path = Path(td) / "state.json"
|
||||||
|
os.environ["ALLEGRO_CYCLE_STATE"] = str(path)
|
||||||
|
main(["start", "t"])
|
||||||
|
rc = main(["commit", "--proof", '{"ok": true}'])
|
||||||
|
captured = capsys.readouterr()
|
||||||
|
assert rc == 0
|
||||||
|
assert "Cycle committed" in captured.out
|
||||||
|
|
||||||
|
def test_cli_check_timeout(self, capsys):
|
||||||
|
from allegro.cycle_guard import main
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
path = Path(td) / "state.json"
|
||||||
|
start_cycle("t", path=path)
|
||||||
|
start_slice("s", path=path)
|
||||||
|
state = load_state(path)
|
||||||
|
state["slices"][0]["started_at"] = (datetime.now(timezone.utc) - timedelta(minutes=15)).isoformat()
|
||||||
|
save_state(state, path)
|
||||||
|
os.environ["ALLEGRO_CYCLE_STATE"] = str(path)
|
||||||
|
rc = main(["check"])
|
||||||
|
captured = capsys.readouterr()
|
||||||
|
assert rc == 1
|
||||||
|
assert captured.out.strip() == "TIMEOUT"
|
||||||
|
|
||||||
|
def test_cli_check_ok(self, capsys):
|
||||||
|
from allegro.cycle_guard import main
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
path = Path(td) / "state.json"
|
||||||
|
start_cycle("t", path=path)
|
||||||
|
start_slice("s", path=path)
|
||||||
|
os.environ["ALLEGRO_CYCLE_STATE"] = str(path)
|
||||||
|
rc = main(["check"])
|
||||||
|
captured = capsys.readouterr()
|
||||||
|
assert rc == 0
|
||||||
|
assert captured.out.strip() == "OK"
|
||||||
|
|||||||
@@ -1,61 +1,25 @@
|
|||||||
model:
|
model:
|
||||||
default: kimi-for-coding
|
default: claude-sonnet-4-20250514
|
||||||
provider: kimi-coding
|
provider: anthropic
|
||||||
toolsets:
|
toolsets:
|
||||||
- all
|
- all
|
||||||
|
fallback_providers:
|
||||||
|
- provider: anthropic
|
||||||
|
model: claude-sonnet-4-20250514
|
||||||
|
timeout: 120
|
||||||
|
reason: Primary Anthropic fallback
|
||||||
|
- provider: openrouter
|
||||||
|
model: anthropic/claude-sonnet-4-20250514
|
||||||
|
timeout: 120
|
||||||
|
reason: OpenRouter Anthropic fallback
|
||||||
agent:
|
agent:
|
||||||
max_turns: 30
|
max_turns: 40
|
||||||
reasoning_effort: xhigh
|
reasoning_effort: medium
|
||||||
verbose: false
|
verbose: false
|
||||||
terminal:
|
terminal:
|
||||||
backend: local
|
backend: local
|
||||||
cwd: .
|
cwd: /root/wizards/allegro
|
||||||
timeout: 180
|
timeout: 180
|
||||||
persistent_shell: true
|
|
||||||
browser:
|
|
||||||
inactivity_timeout: 120
|
|
||||||
command_timeout: 30
|
|
||||||
record_sessions: false
|
|
||||||
display:
|
|
||||||
compact: false
|
|
||||||
personality: ''
|
|
||||||
resume_display: full
|
|
||||||
busy_input_mode: interrupt
|
|
||||||
bell_on_complete: false
|
|
||||||
show_reasoning: false
|
|
||||||
streaming: false
|
|
||||||
show_cost: false
|
|
||||||
tool_progress: all
|
|
||||||
memory:
|
memory:
|
||||||
memory_enabled: true
|
provider: local
|
||||||
user_profile_enabled: true
|
max_entries: 50
|
||||||
memory_char_limit: 2200
|
|
||||||
user_char_limit: 1375
|
|
||||||
nudge_interval: 10
|
|
||||||
flush_min_turns: 6
|
|
||||||
approvals:
|
|
||||||
mode: manual
|
|
||||||
security:
|
|
||||||
redact_secrets: true
|
|
||||||
tirith_enabled: false
|
|
||||||
platforms:
|
|
||||||
api_server:
|
|
||||||
enabled: true
|
|
||||||
extra:
|
|
||||||
host: 127.0.0.1
|
|
||||||
port: 8645
|
|
||||||
session_reset:
|
|
||||||
mode: none
|
|
||||||
idle_minutes: 0
|
|
||||||
skills:
|
|
||||||
creation_nudge_interval: 15
|
|
||||||
system_prompt_suffix: |
|
|
||||||
You are Allegro, the Kimi-backed third wizard house.
|
|
||||||
Your soul is defined in SOUL.md — read it, live it.
|
|
||||||
Hermes is your harness.
|
|
||||||
Kimi Code is your primary provider.
|
|
||||||
You speak plainly. You prefer short sentences. Brevity is a kindness.
|
|
||||||
|
|
||||||
Work best on tight coding tasks: 1-3 file changes, refactors, tests, and implementation passes.
|
|
||||||
Refusal over fabrication. If you do not know, say so.
|
|
||||||
Sovereignty and service always.
|
|
||||||
Reference in New Issue
Block a user