M2: Commit-or-Abort — cycle guard with 10-minute slice rule and crash recovery (Epic #842)

This commit is contained in:
Timmy Foundation Ops
2026-04-06 16:54:02 +00:00
parent c6207bd689
commit 2db03bedb4
2 changed files with 399 additions and 0 deletions

256
allegro/cycle_guard.py Normal file
View File

@@ -0,0 +1,256 @@
#!/usr/bin/env python3
"""Allegro Cycle Guard — Commit-or-Abort discipline for M2, Epic #842.
Every cycle produces a durable artifact or documented abort.
10-minute slice rule with automatic timeout detection.
Cycle-state file provides crash-recovery resume points.
"""
import argparse
import json
import os
import sys
from datetime import datetime, timezone, timedelta
from pathlib import Path
DEFAULT_STATE = Path("/root/.hermes/allegro-cycle-state.json")
STATE_PATH = Path(os.environ.get("ALLEGRO_CYCLE_STATE", DEFAULT_STATE))
# Crash-recovery threshold: if a cycle has been in_progress for longer than
# this many minutes, resume_or_abort() will auto-abort it.
CRASH_RECOVERY_MINUTES = 30
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def load_state(path: Path | str | None = None) -> dict:
p = Path(path) if path else Path(STATE_PATH)
if not p.exists():
return _empty_state()
try:
with open(p, "r") as f:
return json.load(f)
except Exception:
return _empty_state()
def save_state(state: dict, path: Path | str | None = None) -> None:
p = Path(path) if path else Path(STATE_PATH)
p.parent.mkdir(parents=True, exist_ok=True)
state["last_updated"] = _now_iso()
with open(p, "w") as f:
json.dump(state, f, indent=2)
def _empty_state() -> dict:
return {
"cycle_id": None,
"status": "complete",
"target": None,
"details": None,
"slices": [],
"started_at": None,
"completed_at": None,
"aborted_at": None,
"abort_reason": None,
"proof": None,
"version": 1,
"last_updated": _now_iso(),
}
def start_cycle(target: str, details: str = "", path: Path | str | None = None) -> dict:
"""Begin a new cycle, discarding any prior in-progress state."""
state = {
"cycle_id": _now_iso(),
"status": "in_progress",
"target": target,
"details": details,
"slices": [],
"started_at": _now_iso(),
"completed_at": None,
"aborted_at": None,
"abort_reason": None,
"proof": None,
"version": 1,
"last_updated": _now_iso(),
}
save_state(state, path)
return state
def start_slice(name: str, path: Path | str | None = None) -> dict:
"""Start a new work slice inside the current cycle."""
state = load_state(path)
if state.get("status") != "in_progress":
raise RuntimeError("Cannot start a slice unless a cycle is in_progress.")
state["slices"].append(
{
"name": name,
"started_at": _now_iso(),
"ended_at": None,
"status": "in_progress",
"artifact": None,
}
)
save_state(state, path)
return state
def end_slice(status: str = "complete", artifact: str | None = None, path: Path | str | None = None) -> dict:
"""Close the current work slice."""
state = load_state(path)
if state.get("status") != "in_progress":
raise RuntimeError("Cannot end a slice unless a cycle is in_progress.")
if not state["slices"]:
raise RuntimeError("No active slice to end.")
current = state["slices"][-1]
current["ended_at"] = _now_iso()
current["status"] = status
if artifact is not None:
current["artifact"] = artifact
save_state(state, path)
return state
def _parse_dt(iso_str: str) -> datetime:
return datetime.fromisoformat(iso_str.replace("Z", "+00:00"))
def slice_duration_minutes(path: Path | str | None = None) -> float | None:
"""Return the age of the current slice in minutes, or None if no slice."""
state = load_state(path)
if not state["slices"]:
return None
current = state["slices"][-1]
if current.get("ended_at"):
return None
started = _parse_dt(current["started_at"])
return (datetime.now(timezone.utc) - started).total_seconds() / 60.0
def check_slice_timeout(max_minutes: float = 10.0, path: Path | str | None = None) -> bool:
"""Return True if the current slice has exceeded max_minutes."""
duration = slice_duration_minutes(path)
if duration is None:
return False
return duration > max_minutes
def commit_cycle(proof: dict | None = None, path: Path | str | None = None) -> dict:
"""Mark the cycle as successfully completed with optional proof payload."""
state = load_state(path)
if state.get("status") != "in_progress":
raise RuntimeError("Cannot commit a cycle that is not in_progress.")
state["status"] = "complete"
state["completed_at"] = _now_iso()
if proof is not None:
state["proof"] = proof
save_state(state, path)
return state
def abort_cycle(reason: str, path: Path | str | None = None) -> dict:
"""Mark the cycle as aborted, recording the reason."""
state = load_state(path)
if state.get("status") != "in_progress":
raise RuntimeError("Cannot abort a cycle that is not in_progress.")
state["status"] = "aborted"
state["aborted_at"] = _now_iso()
state["abort_reason"] = reason
# Close any open slice as aborted
if state["slices"] and not state["slices"][-1].get("ended_at"):
state["slices"][-1]["ended_at"] = _now_iso()
state["slices"][-1]["status"] = "aborted"
save_state(state, path)
return state
def resume_or_abort(path: Path | str | None = None) -> dict:
"""Crash-recovery gate: auto-abort stale in-progress cycles."""
state = load_state(path)
if state.get("status") != "in_progress":
return state
started = state.get("started_at")
if started:
started_dt = _parse_dt(started)
age_minutes = (datetime.now(timezone.utc) - started_dt).total_seconds() / 60.0
if age_minutes > CRASH_RECOVERY_MINUTES:
return abort_cycle(
f"crash recovery — stale cycle detected ({int(age_minutes)}m old)",
path,
)
# Also abort if the current slice has been running too long
if check_slice_timeout(max_minutes=CRASH_RECOVERY_MINUTES, path=path):
return abort_cycle(
"crash recovery — stale slice detected",
path,
)
return state
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Allegro Cycle Guard")
sub = parser.add_subparsers(dest="cmd")
p_resume = sub.add_parser("resume", help="Resume or abort stale cycle")
p_start = sub.add_parser("start", help="Start a new cycle")
p_start.add_argument("target")
p_start.add_argument("--details", default="")
p_slice = sub.add_parser("slice", help="Start a named slice")
p_slice.add_argument("name")
p_end = sub.add_parser("end", help="End current slice")
p_end.add_argument("--status", default="complete")
p_end.add_argument("--artifact", default=None)
p_commit = sub.add_parser("commit", help="Commit the current cycle")
p_commit.add_argument("--proof", default="{}")
p_abort = sub.add_parser("abort", help="Abort the current cycle")
p_abort.add_argument("reason")
p_check = sub.add_parser("check", help="Check slice timeout")
args = parser.parse_args(argv)
if args.cmd == "resume":
state = resume_or_abort()
print(state["status"])
return 0
elif args.cmd == "start":
state = start_cycle(args.target, args.details)
print(f"Cycle started: {state['cycle_id']}")
return 0
elif args.cmd == "slice":
state = start_slice(args.name)
print(f"Slice started: {args.name}")
return 0
elif args.cmd == "end":
artifact = args.artifact
state = end_slice(args.status, artifact)
print("Slice ended")
return 0
elif args.cmd == "commit":
proof = json.loads(args.proof)
state = commit_cycle(proof)
print(f"Cycle committed: {state['cycle_id']}")
return 0
elif args.cmd == "abort":
state = abort_cycle(args.reason)
print(f"Cycle aborted: {args.reason}")
return 0
elif args.cmd == "check":
timed_out = check_slice_timeout()
print("TIMEOUT" if timed_out else "OK")
return 1 if timed_out else 0
else:
parser.print_help()
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,143 @@
"""100% compliance test for Allegro Commit-or-Abort (M2, Epic #842)."""
import json
import os
import sys
import tempfile
import time
import unittest
from datetime import datetime, timezone, timedelta
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import cycle_guard as cg
class TestCycleGuard(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.TemporaryDirectory()
self.state_path = os.path.join(self.tmpdir.name, "cycle_state.json")
cg.STATE_PATH = self.state_path
def tearDown(self):
self.tmpdir.cleanup()
cg.STATE_PATH = cg.DEFAULT_STATE
def test_load_empty_state(self):
state = cg.load_state(self.state_path)
self.assertEqual(state["status"], "complete")
self.assertIsNone(state["cycle_id"])
def test_start_cycle(self):
state = cg.start_cycle("M2: Commit-or-Abort", path=self.state_path)
self.assertEqual(state["status"], "in_progress")
self.assertEqual(state["target"], "M2: Commit-or-Abort")
self.assertIsNotNone(state["cycle_id"])
def test_start_slice_requires_in_progress(self):
with self.assertRaises(RuntimeError):
cg.start_slice("test", path=self.state_path)
def test_slice_lifecycle(self):
cg.start_cycle("test", path=self.state_path)
cg.start_slice("gather", path=self.state_path)
state = cg.load_state(self.state_path)
self.assertEqual(len(state["slices"]), 1)
self.assertEqual(state["slices"][0]["name"], "gather")
self.assertEqual(state["slices"][0]["status"], "in_progress")
cg.end_slice(status="complete", artifact="artifact.txt", path=self.state_path)
state = cg.load_state(self.state_path)
self.assertEqual(state["slices"][0]["status"], "complete")
self.assertEqual(state["slices"][0]["artifact"], "artifact.txt")
self.assertIsNotNone(state["slices"][0]["ended_at"])
def test_commit_cycle(self):
cg.start_cycle("test", path=self.state_path)
cg.start_slice("work", path=self.state_path)
cg.end_slice(path=self.state_path)
proof = {"files": ["a.py"]}
state = cg.commit_cycle(proof=proof, path=self.state_path)
self.assertEqual(state["status"], "complete")
self.assertEqual(state["proof"], proof)
self.assertIsNotNone(state["completed_at"])
def test_commit_without_in_progress_fails(self):
with self.assertRaises(RuntimeError):
cg.commit_cycle(path=self.state_path)
def test_abort_cycle(self):
cg.start_cycle("test", path=self.state_path)
cg.start_slice("work", path=self.state_path)
state = cg.abort_cycle("manual abort", path=self.state_path)
self.assertEqual(state["status"], "aborted")
self.assertEqual(state["abort_reason"], "manual abort")
self.assertIsNotNone(state["aborted_at"])
self.assertEqual(state["slices"][-1]["status"], "aborted")
def test_slice_timeout_true(self):
cg.start_cycle("test", path=self.state_path)
cg.start_slice("work", path=self.state_path)
# Manually backdate slice start to 11 minutes ago
state = cg.load_state(self.state_path)
old = (datetime.now(timezone.utc) - timedelta(minutes=11)).isoformat()
state["slices"][0]["started_at"] = old
cg.save_state(state, self.state_path)
self.assertTrue(cg.check_slice_timeout(max_minutes=10, path=self.state_path))
def test_slice_timeout_false(self):
cg.start_cycle("test", path=self.state_path)
cg.start_slice("work", path=self.state_path)
self.assertFalse(cg.check_slice_timeout(max_minutes=10, path=self.state_path))
def test_resume_or_abort_keeps_fresh_cycle(self):
cg.start_cycle("test", path=self.state_path)
state = cg.resume_or_abort(path=self.state_path)
self.assertEqual(state["status"], "in_progress")
def test_resume_or_abort_aborts_stale_cycle(self):
cg.start_cycle("test", path=self.state_path)
# Backdate start to 31 minutes ago
state = cg.load_state(self.state_path)
old = (datetime.now(timezone.utc) - timedelta(minutes=31)).isoformat()
state["started_at"] = old
cg.save_state(state, self.state_path)
state = cg.resume_or_abort(path=self.state_path)
self.assertEqual(state["status"], "aborted")
self.assertIn("crash recovery", state["abort_reason"])
def test_slice_duration_minutes(self):
cg.start_cycle("test", path=self.state_path)
cg.start_slice("work", path=self.state_path)
# Backdate by 5 minutes
state = cg.load_state(self.state_path)
old = (datetime.now(timezone.utc) - timedelta(minutes=5)).isoformat()
state["slices"][0]["started_at"] = old
cg.save_state(state, self.state_path)
mins = cg.slice_duration_minutes(path=self.state_path)
self.assertAlmostEqual(mins, 5.0, delta=0.5)
def test_cli_resume_prints_status(self):
cg.start_cycle("test", path=self.state_path)
rc = cg.main(["resume"])
self.assertEqual(rc, 0)
def test_cli_check_timeout(self):
cg.start_cycle("test", path=self.state_path)
cg.start_slice("work", path=self.state_path)
state = cg.load_state(self.state_path)
old = (datetime.now(timezone.utc) - timedelta(minutes=11)).isoformat()
state["slices"][0]["started_at"] = old
cg.save_state(state, self.state_path)
rc = cg.main(["check"])
self.assertEqual(rc, 1)
def test_cli_check_ok(self):
cg.start_cycle("test", path=self.state_path)
cg.start_slice("work", path=self.state_path)
rc = cg.main(["check"])
self.assertEqual(rc, 0)
if __name__ == "__main__":
unittest.main()