Files
timmy-config/allegro/cycle_guard.py

257 lines
8.2 KiB
Python

#!/usr/bin/env python3
"""Allegro Cycle Guard — Commit-or-Abort discipline for M2, Epic #842.
Every cycle produces a durable artifact or documented abort.
10-minute slice rule with automatic timeout detection.
Cycle-state file provides crash-recovery resume points.
"""
import argparse
import json
import os
import sys
from datetime import datetime, timezone, timedelta
from pathlib import Path
DEFAULT_STATE = Path("/root/.hermes/allegro-cycle-state.json")
STATE_PATH = Path(os.environ.get("ALLEGRO_CYCLE_STATE", DEFAULT_STATE))
# Crash-recovery threshold: if a cycle has been in_progress for longer than
# this many minutes, resume_or_abort() will auto-abort it.
CRASH_RECOVERY_MINUTES = 30
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def load_state(path: Path | str | None = None) -> dict:
p = Path(path) if path else Path(STATE_PATH)
if not p.exists():
return _empty_state()
try:
with open(p, "r") as f:
return json.load(f)
except Exception:
return _empty_state()
def save_state(state: dict, path: Path | str | None = None) -> None:
p = Path(path) if path else Path(STATE_PATH)
p.parent.mkdir(parents=True, exist_ok=True)
state["last_updated"] = _now_iso()
with open(p, "w") as f:
json.dump(state, f, indent=2)
def _empty_state() -> dict:
return {
"cycle_id": None,
"status": "complete",
"target": None,
"details": None,
"slices": [],
"started_at": None,
"completed_at": None,
"aborted_at": None,
"abort_reason": None,
"proof": None,
"version": 1,
"last_updated": _now_iso(),
}
def start_cycle(target: str, details: str = "", path: Path | str | None = None) -> dict:
"""Begin a new cycle, discarding any prior in-progress state."""
state = {
"cycle_id": _now_iso(),
"status": "in_progress",
"target": target,
"details": details,
"slices": [],
"started_at": _now_iso(),
"completed_at": None,
"aborted_at": None,
"abort_reason": None,
"proof": None,
"version": 1,
"last_updated": _now_iso(),
}
save_state(state, path)
return state
def start_slice(name: str, path: Path | str | None = None) -> dict:
"""Start a new work slice inside the current cycle."""
state = load_state(path)
if state.get("status") != "in_progress":
raise RuntimeError("Cannot start a slice unless a cycle is in_progress.")
state["slices"].append(
{
"name": name,
"started_at": _now_iso(),
"ended_at": None,
"status": "in_progress",
"artifact": None,
}
)
save_state(state, path)
return state
def end_slice(status: str = "complete", artifact: str | None = None, path: Path | str | None = None) -> dict:
"""Close the current work slice."""
state = load_state(path)
if state.get("status") != "in_progress":
raise RuntimeError("Cannot end a slice unless a cycle is in_progress.")
if not state["slices"]:
raise RuntimeError("No active slice to end.")
current = state["slices"][-1]
current["ended_at"] = _now_iso()
current["status"] = status
if artifact is not None:
current["artifact"] = artifact
save_state(state, path)
return state
def _parse_dt(iso_str: str) -> datetime:
return datetime.fromisoformat(iso_str.replace("Z", "+00:00"))
def slice_duration_minutes(path: Path | str | None = None) -> float | None:
"""Return the age of the current slice in minutes, or None if no slice."""
state = load_state(path)
if not state["slices"]:
return None
current = state["slices"][-1]
if current.get("ended_at"):
return None
started = _parse_dt(current["started_at"])
return (datetime.now(timezone.utc) - started).total_seconds() / 60.0
def check_slice_timeout(max_minutes: float = 10.0, path: Path | str | None = None) -> bool:
"""Return True if the current slice has exceeded max_minutes."""
duration = slice_duration_minutes(path)
if duration is None:
return False
return duration > max_minutes
def commit_cycle(proof: dict | None = None, path: Path | str | None = None) -> dict:
"""Mark the cycle as successfully completed with optional proof payload."""
state = load_state(path)
if state.get("status") != "in_progress":
raise RuntimeError("Cannot commit a cycle that is not in_progress.")
state["status"] = "complete"
state["completed_at"] = _now_iso()
if proof is not None:
state["proof"] = proof
save_state(state, path)
return state
def abort_cycle(reason: str, path: Path | str | None = None) -> dict:
"""Mark the cycle as aborted, recording the reason."""
state = load_state(path)
if state.get("status") != "in_progress":
raise RuntimeError("Cannot abort a cycle that is not in_progress.")
state["status"] = "aborted"
state["aborted_at"] = _now_iso()
state["abort_reason"] = reason
# Close any open slice as aborted
if state["slices"] and not state["slices"][-1].get("ended_at"):
state["slices"][-1]["ended_at"] = _now_iso()
state["slices"][-1]["status"] = "aborted"
save_state(state, path)
return state
def resume_or_abort(path: Path | str | None = None) -> dict:
"""Crash-recovery gate: auto-abort stale in-progress cycles."""
state = load_state(path)
if state.get("status") != "in_progress":
return state
started = state.get("started_at")
if started:
started_dt = _parse_dt(started)
age_minutes = (datetime.now(timezone.utc) - started_dt).total_seconds() / 60.0
if age_minutes > CRASH_RECOVERY_MINUTES:
return abort_cycle(
f"crash recovery — stale cycle detected ({int(age_minutes)}m old)",
path,
)
# Also abort if the current slice has been running too long
if check_slice_timeout(max_minutes=CRASH_RECOVERY_MINUTES, path=path):
return abort_cycle(
"crash recovery — stale slice detected",
path,
)
return state
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Allegro Cycle Guard")
sub = parser.add_subparsers(dest="cmd")
p_resume = sub.add_parser("resume", help="Resume or abort stale cycle")
p_start = sub.add_parser("start", help="Start a new cycle")
p_start.add_argument("target")
p_start.add_argument("--details", default="")
p_slice = sub.add_parser("slice", help="Start a named slice")
p_slice.add_argument("name")
p_end = sub.add_parser("end", help="End current slice")
p_end.add_argument("--status", default="complete")
p_end.add_argument("--artifact", default=None)
p_commit = sub.add_parser("commit", help="Commit the current cycle")
p_commit.add_argument("--proof", default="{}")
p_abort = sub.add_parser("abort", help="Abort the current cycle")
p_abort.add_argument("reason")
p_check = sub.add_parser("check", help="Check slice timeout")
args = parser.parse_args(argv)
if args.cmd == "resume":
state = resume_or_abort()
print(state["status"])
return 0
elif args.cmd == "start":
state = start_cycle(args.target, args.details)
print(f"Cycle started: {state['cycle_id']}")
return 0
elif args.cmd == "slice":
state = start_slice(args.name)
print(f"Slice started: {args.name}")
return 0
elif args.cmd == "end":
artifact = args.artifact
state = end_slice(args.status, artifact)
print("Slice ended")
return 0
elif args.cmd == "commit":
proof = json.loads(args.proof)
state = commit_cycle(proof)
print(f"Cycle committed: {state['cycle_id']}")
return 0
elif args.cmd == "abort":
state = abort_cycle(args.reason)
print(f"Cycle aborted: {args.reason}")
return 0
elif args.cmd == "check":
timed_out = check_slice_timeout()
print("TIMEOUT" if timed_out else "OK")
return 1 if timed_out else 0
else:
parser.print_help()
return 0
if __name__ == "__main__":
sys.exit(main())