Compare commits
3 Commits
e369727235
...
feature/ac
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a058d6a5a9 | ||
|
|
7b93d34374 | ||
|
|
2b5d3c057d |
1
allegro/__init__.py
Normal file
1
allegro/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Allegro self-improvement guard modules for Epic #842."""
|
||||
@@ -14,7 +14,10 @@ from datetime import datetime, timezone, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
DEFAULT_STATE = Path("/root/.hermes/allegro-cycle-state.json")
|
||||
STATE_PATH = Path(os.environ.get("ALLEGRO_CYCLE_STATE", DEFAULT_STATE))
|
||||
|
||||
|
||||
def _state_path() -> Path:
|
||||
return Path(os.environ.get("ALLEGRO_CYCLE_STATE", DEFAULT_STATE))
|
||||
|
||||
# Crash-recovery threshold: if a cycle has been in_progress for longer than
|
||||
# this many minutes, resume_or_abort() will auto-abort it.
|
||||
@@ -26,7 +29,7 @@ def _now_iso() -> str:
|
||||
|
||||
|
||||
def load_state(path: Path | str | None = None) -> dict:
|
||||
p = Path(path) if path else Path(STATE_PATH)
|
||||
p = Path(path) if path else _state_path()
|
||||
if not p.exists():
|
||||
return _empty_state()
|
||||
try:
|
||||
@@ -37,7 +40,7 @@ def load_state(path: Path | str | None = None) -> dict:
|
||||
|
||||
|
||||
def save_state(state: dict, path: Path | str | None = None) -> None:
|
||||
p = Path(path) if path else Path(STATE_PATH)
|
||||
p = Path(path) if path else _state_path()
|
||||
p.parent.mkdir(parents=True, exist_ok=True)
|
||||
state["last_updated"] = _now_iso()
|
||||
with open(p, "w") as f:
|
||||
|
||||
186
allegro/stop_guard.py
Executable file
186
allegro/stop_guard.py
Executable file
@@ -0,0 +1,186 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Allegro Stop Guard — hard-interrupt gate for the Stop Protocol (M1, Epic #842).
|
||||
|
||||
Usage:
|
||||
python stop_guard.py check <target> # exit 1 if stopped, 0 if clear
|
||||
python stop_guard.py record <target> # record a stop + log STOP_ACK
|
||||
python stop_guard.py cleanup # remove expired locks
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
DEFAULT_REGISTRY = Path("/root/.hermes/allegro-hands-off-registry.json")
|
||||
DEFAULT_STOP_LOG = Path("/root/.hermes/burn-logs/allegro.log")
|
||||
|
||||
REGISTRY_PATH = Path(os.environ.get("ALLEGRO_STOP_REGISTRY", DEFAULT_REGISTRY))
|
||||
STOP_LOG_PATH = Path(os.environ.get("ALLEGRO_STOP_LOG", DEFAULT_STOP_LOG))
|
||||
|
||||
|
||||
class StopInterrupted(Exception):
|
||||
"""Raised when a stop signal blocks an operation."""
|
||||
pass
|
||||
|
||||
|
||||
def _now_iso() -> str:
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
|
||||
def load_registry(path: Path | str | None = None) -> dict:
|
||||
p = Path(path) if path else Path(REGISTRY_PATH)
|
||||
if not p.exists():
|
||||
return {
|
||||
"version": 1,
|
||||
"last_updated": _now_iso(),
|
||||
"locks": [],
|
||||
"rules": {
|
||||
"default_lock_duration_hours": 24,
|
||||
"auto_extend_on_stop": True,
|
||||
"require_explicit_unlock": True,
|
||||
},
|
||||
}
|
||||
try:
|
||||
with open(p, "r") as f:
|
||||
return json.load(f)
|
||||
except Exception:
|
||||
return {
|
||||
"version": 1,
|
||||
"last_updated": _now_iso(),
|
||||
"locks": [],
|
||||
"rules": {
|
||||
"default_lock_duration_hours": 24,
|
||||
"auto_extend_on_stop": True,
|
||||
"require_explicit_unlock": True,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def save_registry(registry: dict, path: Path | str | None = None) -> None:
|
||||
p = Path(path) if path else Path(REGISTRY_PATH)
|
||||
p.parent.mkdir(parents=True, exist_ok=True)
|
||||
registry["last_updated"] = _now_iso()
|
||||
with open(p, "w") as f:
|
||||
json.dump(registry, f, indent=2)
|
||||
|
||||
|
||||
def log_stop_ack(target: str, context: str, log_path: Path | str | None = None) -> None:
|
||||
p = Path(log_path) if log_path else Path(STOP_LOG_PATH)
|
||||
p.parent.mkdir(parents=True, exist_ok=True)
|
||||
ts = _now_iso()
|
||||
entry = f"[{ts}] STOP_ACK — target='{target}' context='{context}'\n"
|
||||
with open(p, "a") as f:
|
||||
f.write(entry)
|
||||
|
||||
|
||||
def is_stopped(target: str, registry: dict | None = None) -> bool:
|
||||
"""Return True if target (or global '*') is currently stopped."""
|
||||
reg = registry if registry is not None else load_registry()
|
||||
now = datetime.now(timezone.utc)
|
||||
for lock in reg.get("locks", []):
|
||||
expires = lock.get("expires_at")
|
||||
if expires:
|
||||
try:
|
||||
expires_dt = datetime.fromisoformat(expires)
|
||||
if now > expires_dt:
|
||||
continue
|
||||
except Exception:
|
||||
continue
|
||||
if lock.get("entity") == target or lock.get("entity") == "*":
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def assert_not_stopped(target: str, registry: dict | None = None) -> None:
|
||||
"""Raise StopInterrupted if target is stopped."""
|
||||
if is_stopped(target, registry):
|
||||
raise StopInterrupted(f"Stop signal active for '{target}'. Halt immediately.")
|
||||
|
||||
|
||||
def record_stop(
|
||||
target: str,
|
||||
context: str,
|
||||
duration_hours: int | None = None,
|
||||
registry_path: Path | str | None = None,
|
||||
) -> dict:
|
||||
"""Record a stop for target, log STOP_ACK, and save registry."""
|
||||
reg = load_registry(registry_path)
|
||||
rules = reg.get("rules", {})
|
||||
duration = duration_hours or rules.get("default_lock_duration_hours", 24)
|
||||
now = datetime.now(timezone.utc)
|
||||
expires = (now + timedelta(hours=duration)).isoformat()
|
||||
|
||||
# Remove existing lock for same target
|
||||
reg["locks"] = [l for l in reg.get("locks", []) if l.get("entity") != target]
|
||||
|
||||
lock = {
|
||||
"entity": target,
|
||||
"reason": context,
|
||||
"locked_at": now.isoformat(),
|
||||
"expires_at": expires,
|
||||
"unlocked_by": None,
|
||||
}
|
||||
reg["locks"].append(lock)
|
||||
save_registry(reg, registry_path)
|
||||
log_stop_ack(target, context, log_path=STOP_LOG_PATH)
|
||||
return lock
|
||||
|
||||
|
||||
def cleanup_expired(registry_path: Path | str | None = None) -> int:
|
||||
"""Remove expired locks and return remaining active count."""
|
||||
reg = load_registry(registry_path)
|
||||
now = datetime.now(timezone.utc)
|
||||
kept = []
|
||||
for lock in reg.get("locks", []):
|
||||
expires = lock.get("expires_at")
|
||||
if expires:
|
||||
try:
|
||||
expires_dt = datetime.fromisoformat(expires)
|
||||
if now > expires_dt:
|
||||
continue
|
||||
except Exception:
|
||||
continue
|
||||
kept.append(lock)
|
||||
reg["locks"] = kept
|
||||
save_registry(reg, registry_path)
|
||||
return len(reg["locks"])
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
parser = argparse.ArgumentParser(description="Allegro Stop Guard")
|
||||
sub = parser.add_subparsers(dest="cmd")
|
||||
|
||||
p_check = sub.add_parser("check", help="Check if target is stopped")
|
||||
p_check.add_argument("target")
|
||||
|
||||
p_record = sub.add_parser("record", help="Record a stop")
|
||||
p_record.add_argument("target")
|
||||
p_record.add_argument("--context", default="manual stop")
|
||||
p_record.add_argument("--hours", type=int, default=24)
|
||||
|
||||
sub.add_parser("cleanup", help="Remove expired locks")
|
||||
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
if args.cmd == "check":
|
||||
stopped = is_stopped(args.target)
|
||||
print("STOPPED" if stopped else "CLEAR")
|
||||
return 1 if stopped else 0
|
||||
elif args.cmd == "record":
|
||||
record_stop(args.target, args.context, args.hours)
|
||||
print(f"Recorded stop for {args.target} ({args.hours}h)")
|
||||
return 0
|
||||
elif args.cmd == "cleanup":
|
||||
remaining = cleanup_expired()
|
||||
print(f"Cleanup complete. {remaining} active locks.")
|
||||
return 0
|
||||
else:
|
||||
parser.print_help()
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -1,143 +1,260 @@
|
||||
"""100% compliance test for Allegro Commit-or-Abort (M2, Epic #842)."""
|
||||
"""Tests for allegro.cycle_guard — Commit-or-Abort discipline, M2 Epic #842."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
import unittest
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||
import pytest
|
||||
|
||||
import cycle_guard as cg
|
||||
from allegro.cycle_guard import (
|
||||
_empty_state,
|
||||
_now_iso,
|
||||
_parse_dt,
|
||||
abort_cycle,
|
||||
check_slice_timeout,
|
||||
commit_cycle,
|
||||
end_slice,
|
||||
load_state,
|
||||
resume_or_abort,
|
||||
save_state,
|
||||
slice_duration_minutes,
|
||||
start_cycle,
|
||||
start_slice,
|
||||
)
|
||||
|
||||
|
||||
class TestCycleGuard(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.tmpdir = tempfile.TemporaryDirectory()
|
||||
self.state_path = os.path.join(self.tmpdir.name, "cycle_state.json")
|
||||
cg.STATE_PATH = self.state_path
|
||||
class TestStateLifecycle:
|
||||
def test_empty_state(self):
|
||||
state = _empty_state()
|
||||
assert state["status"] == "complete"
|
||||
assert state["cycle_id"] is None
|
||||
assert state["version"] == 1
|
||||
|
||||
def tearDown(self):
|
||||
self.tmpdir.cleanup()
|
||||
cg.STATE_PATH = cg.DEFAULT_STATE
|
||||
def test_load_state_missing_file(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
path = Path(td) / "missing.json"
|
||||
state = load_state(path)
|
||||
assert state["status"] == "complete"
|
||||
|
||||
def test_load_empty_state(self):
|
||||
state = cg.load_state(self.state_path)
|
||||
self.assertEqual(state["status"], "complete")
|
||||
self.assertIsNone(state["cycle_id"])
|
||||
def test_save_and_load_roundtrip(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
path = Path(td) / "state.json"
|
||||
state = start_cycle("test-target", "details", path)
|
||||
loaded = load_state(path)
|
||||
assert loaded["cycle_id"] == state["cycle_id"]
|
||||
assert loaded["status"] == "in_progress"
|
||||
|
||||
def test_load_state_malformed_json(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
path = Path(td) / "bad.json"
|
||||
path.write_text("not json")
|
||||
state = load_state(path)
|
||||
assert state["status"] == "complete"
|
||||
|
||||
|
||||
class TestCycleOperations:
|
||||
def test_start_cycle(self):
|
||||
state = cg.start_cycle("M2: Commit-or-Abort", path=self.state_path)
|
||||
self.assertEqual(state["status"], "in_progress")
|
||||
self.assertEqual(state["target"], "M2: Commit-or-Abort")
|
||||
self.assertIsNotNone(state["cycle_id"])
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
path = Path(td) / "state.json"
|
||||
state = start_cycle("M2: Commit-or-Abort", "test details", path)
|
||||
assert state["status"] == "in_progress"
|
||||
assert state["target"] == "M2: Commit-or-Abort"
|
||||
assert state["started_at"] is not None
|
||||
|
||||
def test_start_slice_requires_in_progress(self):
|
||||
with self.assertRaises(RuntimeError):
|
||||
cg.start_slice("test", path=self.state_path)
|
||||
|
||||
def test_slice_lifecycle(self):
|
||||
cg.start_cycle("test", path=self.state_path)
|
||||
cg.start_slice("gather", path=self.state_path)
|
||||
state = cg.load_state(self.state_path)
|
||||
self.assertEqual(len(state["slices"]), 1)
|
||||
self.assertEqual(state["slices"][0]["name"], "gather")
|
||||
self.assertEqual(state["slices"][0]["status"], "in_progress")
|
||||
|
||||
cg.end_slice(status="complete", artifact="artifact.txt", path=self.state_path)
|
||||
state = cg.load_state(self.state_path)
|
||||
self.assertEqual(state["slices"][0]["status"], "complete")
|
||||
self.assertEqual(state["slices"][0]["artifact"], "artifact.txt")
|
||||
self.assertIsNotNone(state["slices"][0]["ended_at"])
|
||||
def test_start_cycle_overwrites_prior(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
path = Path(td) / "state.json"
|
||||
old = start_cycle("old", path=path)
|
||||
new = start_cycle("new", path=path)
|
||||
assert new["target"] == "new"
|
||||
loaded = load_state(path)
|
||||
assert loaded["target"] == "new"
|
||||
|
||||
def test_commit_cycle(self):
|
||||
cg.start_cycle("test", path=self.state_path)
|
||||
cg.start_slice("work", path=self.state_path)
|
||||
cg.end_slice(path=self.state_path)
|
||||
proof = {"files": ["a.py"]}
|
||||
state = cg.commit_cycle(proof=proof, path=self.state_path)
|
||||
self.assertEqual(state["status"], "complete")
|
||||
self.assertEqual(state["proof"], proof)
|
||||
self.assertIsNotNone(state["completed_at"])
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
path = Path(td) / "state.json"
|
||||
start_cycle("test", path=path)
|
||||
proof = {"files": ["a.py"], "tests": "passed"}
|
||||
state = commit_cycle(proof, path)
|
||||
assert state["status"] == "complete"
|
||||
assert state["proof"]["files"] == ["a.py"]
|
||||
assert state["completed_at"] is not None
|
||||
|
||||
def test_commit_without_in_progress_fails(self):
|
||||
with self.assertRaises(RuntimeError):
|
||||
cg.commit_cycle(path=self.state_path)
|
||||
def test_commit_cycle_not_in_progress_raises(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
path = Path(td) / "state.json"
|
||||
with pytest.raises(RuntimeError, match="not in_progress"):
|
||||
commit_cycle(path=path)
|
||||
|
||||
def test_abort_cycle(self):
|
||||
cg.start_cycle("test", path=self.state_path)
|
||||
cg.start_slice("work", path=self.state_path)
|
||||
state = cg.abort_cycle("manual abort", path=self.state_path)
|
||||
self.assertEqual(state["status"], "aborted")
|
||||
self.assertEqual(state["abort_reason"], "manual abort")
|
||||
self.assertIsNotNone(state["aborted_at"])
|
||||
self.assertEqual(state["slices"][-1]["status"], "aborted")
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
path = Path(td) / "state.json"
|
||||
start_cycle("test", path=path)
|
||||
state = abort_cycle("timeout", path)
|
||||
assert state["status"] == "aborted"
|
||||
assert state["abort_reason"] == "timeout"
|
||||
assert state["aborted_at"] is not None
|
||||
|
||||
def test_slice_timeout_true(self):
|
||||
cg.start_cycle("test", path=self.state_path)
|
||||
cg.start_slice("work", path=self.state_path)
|
||||
# Manually backdate slice start to 11 minutes ago
|
||||
state = cg.load_state(self.state_path)
|
||||
old = (datetime.now(timezone.utc) - timedelta(minutes=11)).isoformat()
|
||||
state["slices"][0]["started_at"] = old
|
||||
cg.save_state(state, self.state_path)
|
||||
self.assertTrue(cg.check_slice_timeout(max_minutes=10, path=self.state_path))
|
||||
def test_abort_cycle_not_in_progress_raises(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
path = Path(td) / "state.json"
|
||||
with pytest.raises(RuntimeError, match="not in_progress"):
|
||||
abort_cycle("reason", path=path)
|
||||
|
||||
def test_slice_timeout_false(self):
|
||||
cg.start_cycle("test", path=self.state_path)
|
||||
cg.start_slice("work", path=self.state_path)
|
||||
self.assertFalse(cg.check_slice_timeout(max_minutes=10, path=self.state_path))
|
||||
|
||||
def test_resume_or_abort_keeps_fresh_cycle(self):
|
||||
cg.start_cycle("test", path=self.state_path)
|
||||
state = cg.resume_or_abort(path=self.state_path)
|
||||
self.assertEqual(state["status"], "in_progress")
|
||||
class TestSliceOperations:
|
||||
def test_start_and_end_slice(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
path = Path(td) / "state.json"
|
||||
start_cycle("test", path=path)
|
||||
start_slice("research", path)
|
||||
state = end_slice("complete", "found answer", path)
|
||||
assert len(state["slices"]) == 1
|
||||
assert state["slices"][0]["name"] == "research"
|
||||
assert state["slices"][0]["status"] == "complete"
|
||||
assert state["slices"][0]["artifact"] == "found answer"
|
||||
assert state["slices"][0]["ended_at"] is not None
|
||||
|
||||
def test_resume_or_abort_aborts_stale_cycle(self):
|
||||
cg.start_cycle("test", path=self.state_path)
|
||||
# Backdate start to 31 minutes ago
|
||||
state = cg.load_state(self.state_path)
|
||||
old = (datetime.now(timezone.utc) - timedelta(minutes=31)).isoformat()
|
||||
state["started_at"] = old
|
||||
cg.save_state(state, self.state_path)
|
||||
state = cg.resume_or_abort(path=self.state_path)
|
||||
self.assertEqual(state["status"], "aborted")
|
||||
self.assertIn("crash recovery", state["abort_reason"])
|
||||
def test_start_slice_without_cycle_raises(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
path = Path(td) / "state.json"
|
||||
with pytest.raises(RuntimeError, match="in_progress"):
|
||||
start_slice("name", path)
|
||||
|
||||
def test_end_slice_without_slice_raises(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
path = Path(td) / "state.json"
|
||||
start_cycle("test", path=path)
|
||||
with pytest.raises(RuntimeError, match="No active slice"):
|
||||
end_slice(path=path)
|
||||
|
||||
def test_slice_duration_minutes(self):
|
||||
cg.start_cycle("test", path=self.state_path)
|
||||
cg.start_slice("work", path=self.state_path)
|
||||
# Backdate by 5 minutes
|
||||
state = cg.load_state(self.state_path)
|
||||
old = (datetime.now(timezone.utc) - timedelta(minutes=5)).isoformat()
|
||||
state["slices"][0]["started_at"] = old
|
||||
cg.save_state(state, self.state_path)
|
||||
mins = cg.slice_duration_minutes(path=self.state_path)
|
||||
self.assertAlmostEqual(mins, 5.0, delta=0.5)
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
path = Path(td) / "state.json"
|
||||
start_cycle("test", path=path)
|
||||
start_slice("long", path)
|
||||
state = load_state(path)
|
||||
state["slices"][0]["started_at"] = (datetime.now(timezone.utc) - timedelta(minutes=5)).isoformat()
|
||||
save_state(state, path)
|
||||
minutes = slice_duration_minutes(path)
|
||||
assert minutes is not None
|
||||
assert minutes >= 4.9
|
||||
|
||||
def test_cli_resume_prints_status(self):
|
||||
cg.start_cycle("test", path=self.state_path)
|
||||
rc = cg.main(["resume"])
|
||||
self.assertEqual(rc, 0)
|
||||
def test_check_slice_timeout_true(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
path = Path(td) / "state.json"
|
||||
start_cycle("test", path=path)
|
||||
start_slice("old", path)
|
||||
state = load_state(path)
|
||||
state["slices"][0]["started_at"] = (datetime.now(timezone.utc) - timedelta(minutes=15)).isoformat()
|
||||
save_state(state, path)
|
||||
assert check_slice_timeout(max_minutes=10.0, path=path) is True
|
||||
|
||||
def test_cli_check_timeout(self):
|
||||
cg.start_cycle("test", path=self.state_path)
|
||||
cg.start_slice("work", path=self.state_path)
|
||||
state = cg.load_state(self.state_path)
|
||||
old = (datetime.now(timezone.utc) - timedelta(minutes=11)).isoformat()
|
||||
state["slices"][0]["started_at"] = old
|
||||
cg.save_state(state, self.state_path)
|
||||
rc = cg.main(["check"])
|
||||
self.assertEqual(rc, 1)
|
||||
|
||||
def test_cli_check_ok(self):
|
||||
cg.start_cycle("test", path=self.state_path)
|
||||
cg.start_slice("work", path=self.state_path)
|
||||
rc = cg.main(["check"])
|
||||
self.assertEqual(rc, 0)
|
||||
def test_check_slice_timeout_false(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
path = Path(td) / "state.json"
|
||||
start_cycle("test", path=path)
|
||||
start_slice("fresh", path)
|
||||
assert check_slice_timeout(max_minutes=10.0, path=path) is False
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
class TestCrashRecovery:
|
||||
def test_resume_or_abort_aborts_stale_cycle(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
path = Path(td) / "state.json"
|
||||
start_cycle("test", path=path)
|
||||
state = load_state(path)
|
||||
state["started_at"] = (datetime.now(timezone.utc) - timedelta(minutes=60)).isoformat()
|
||||
save_state(state, path)
|
||||
result = resume_or_abort(path)
|
||||
assert result["status"] == "aborted"
|
||||
assert "stale cycle" in result["abort_reason"]
|
||||
|
||||
def test_resume_or_abort_keeps_fresh_cycle(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
path = Path(td) / "state.json"
|
||||
start_cycle("test", path=path)
|
||||
result = resume_or_abort(path)
|
||||
assert result["status"] == "in_progress"
|
||||
|
||||
def test_resume_or_abort_no_op_when_complete(self):
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
path = Path(td) / "state.json"
|
||||
state = _empty_state()
|
||||
save_state(state, path)
|
||||
result = resume_or_abort(path)
|
||||
assert result["status"] == "complete"
|
||||
|
||||
|
||||
class TestDateParsing:
|
||||
def test_parse_dt_with_z(self):
|
||||
dt = _parse_dt("2026-04-06T12:00:00Z")
|
||||
assert dt.tzinfo is not None
|
||||
|
||||
def test_parse_dt_with_offset(self):
|
||||
iso = "2026-04-06T12:00:00+00:00"
|
||||
dt = _parse_dt(iso)
|
||||
assert dt.tzinfo is not None
|
||||
|
||||
|
||||
class TestCLI:
|
||||
def test_cli_resume(self, capsys):
|
||||
from allegro.cycle_guard import main
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
path = Path(td) / "state.json"
|
||||
start_cycle("cli", path=path)
|
||||
os.environ["ALLEGRO_CYCLE_STATE"] = str(path)
|
||||
rc = main(["resume"])
|
||||
captured = capsys.readouterr()
|
||||
assert rc == 0
|
||||
assert captured.out.strip() == "in_progress"
|
||||
|
||||
def test_cli_start(self, capsys):
|
||||
from allegro.cycle_guard import main
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
path = Path(td) / "state.json"
|
||||
os.environ["ALLEGRO_CYCLE_STATE"] = str(path)
|
||||
rc = main(["start", "target", "--details", "d"])
|
||||
captured = capsys.readouterr()
|
||||
assert rc == 0
|
||||
assert "Cycle started" in captured.out
|
||||
|
||||
def test_cli_commit(self, capsys):
|
||||
from allegro.cycle_guard import main
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
path = Path(td) / "state.json"
|
||||
os.environ["ALLEGRO_CYCLE_STATE"] = str(path)
|
||||
main(["start", "t"])
|
||||
rc = main(["commit", "--proof", '{"ok": true}'])
|
||||
captured = capsys.readouterr()
|
||||
assert rc == 0
|
||||
assert "Cycle committed" in captured.out
|
||||
|
||||
def test_cli_check_timeout(self, capsys):
|
||||
from allegro.cycle_guard import main
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
path = Path(td) / "state.json"
|
||||
start_cycle("t", path=path)
|
||||
start_slice("s", path=path)
|
||||
state = load_state(path)
|
||||
state["slices"][0]["started_at"] = (datetime.now(timezone.utc) - timedelta(minutes=15)).isoformat()
|
||||
save_state(state, path)
|
||||
os.environ["ALLEGRO_CYCLE_STATE"] = str(path)
|
||||
rc = main(["check"])
|
||||
captured = capsys.readouterr()
|
||||
assert rc == 1
|
||||
assert captured.out.strip() == "TIMEOUT"
|
||||
|
||||
def test_cli_check_ok(self, capsys):
|
||||
from allegro.cycle_guard import main
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
path = Path(td) / "state.json"
|
||||
start_cycle("t", path=path)
|
||||
start_slice("s", path=path)
|
||||
os.environ["ALLEGRO_CYCLE_STATE"] = str(path)
|
||||
rc = main(["check"])
|
||||
captured = capsys.readouterr()
|
||||
assert rc == 0
|
||||
assert captured.out.strip() == "OK"
|
||||
|
||||
877
scripts/acp_redis_transport.py
Normal file
877
scripts/acp_redis_transport.py
Normal file
@@ -0,0 +1,877 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
ACP Redis Transport Layer
|
||||
Agent Communication Protocol transport using Redis queues and Pub/Sub.
|
||||
|
||||
Replaces tmux send-keys with Redis-backed message passing.
|
||||
Provides reliable, persistent, and scalable inter-agent communication.
|
||||
|
||||
Queue schema:
|
||||
acp:inbox:{agent_id} — agent inbox (list, BRPOP)
|
||||
acp:ack:{agent_id} — acknowledged message IDs (set, for dedup)
|
||||
acp:registry — agent registry (hash: agent_id → json)
|
||||
acp:broadcast — Pub/Sub channel for broadcasts
|
||||
acp:dlq — dead-letter queue for failed messages
|
||||
|
||||
Usage:
|
||||
transport = RedisTransport(redis_url="redis://localhost:6379/0")
|
||||
transport.register_agent("allegro-primary", queue="forge:inbox:allegro-primary")
|
||||
transport.send({"to": "ezra-primary", "from": "allegro-primary", "type": "request", "payload": {"task": "review"}})
|
||||
msg = transport.receive("ezra-primary", timeout=5)
|
||||
transport.ack("ezra-primary", msg["id"])
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Optional
|
||||
|
||||
import redis
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Constants
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
QUEUE_PREFIX = "acp:inbox:"
|
||||
ACK_PREFIX = "acp:ack:"
|
||||
REGISTRY_KEY = "acp:registry"
|
||||
BROADCAST_CHANNEL = "acp:broadcast"
|
||||
DLQ_KEY = "acp:dlq"
|
||||
DEFAULT_TTL = 3600 # 1 hour
|
||||
DEFAULT_REDIS_URL = "redis://localhost:6379/0"
|
||||
DEFAULT_TIMEOUT = 5 # seconds for BRPOP
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ACP Message Schema
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
ACP_MESSAGE_FIELDS = {
|
||||
"id": str, # auto-generated if missing
|
||||
"from": str, # sender agent_id
|
||||
"to": str, # target agent_id or "*" for broadcast
|
||||
"type": str, # request | response | broadcast | alert | ack
|
||||
"payload": dict, # message body
|
||||
"reply_to": str, # optional: original message id
|
||||
"timestamp": str, # ISO-8601 timestamp
|
||||
"via": str, # transport name ("redis")
|
||||
"ttl": int, # seconds to live
|
||||
"signature": str, # optional: HMAC signature
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# RedisTransport
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class RedisTransport:
|
||||
"""Redis-backed transport for ACP agent communication."""
|
||||
|
||||
def __init__(self, redis_url: str = None, redis_client: "redis.Redis|None" = None):
|
||||
"""
|
||||
Initialize the Redis transport.
|
||||
|
||||
Args:
|
||||
redis_url: Redis connection URL (default: ACP_REDIS_URL env or localhost:6379/0)
|
||||
redis_client: Inject a pre-configured redis client (for testing)
|
||||
"""
|
||||
if redis_client is not None:
|
||||
self._redis = redis_client
|
||||
else:
|
||||
url = redis_url or os.getenv("ACP_REDIS_URL", DEFAULT_REDIS_URL)
|
||||
self._redis = redis.Redis.from_url(url, decode_responses=True)
|
||||
self._pubsub = None
|
||||
|
||||
@property
|
||||
def client(self) -> redis.Redis:
|
||||
"""Access the underlying Redis client."""
|
||||
return self._redis
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# Message Construction
|
||||
# -----------------------------------------------------------------------
|
||||
|
||||
@staticmethod
|
||||
def build_message(
|
||||
to: str,
|
||||
payload: dict,
|
||||
from_agent: str = "system",
|
||||
msg_type: str = "request",
|
||||
reply_to: str = None,
|
||||
ttl: int = DEFAULT_TTL,
|
||||
signature: str = None,
|
||||
) -> dict:
|
||||
"""Build a normalized ACP message."""
|
||||
msg_id = uuid.uuid4().hex[:16]
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
return {
|
||||
"id": msg_id,
|
||||
"from": from_agent,
|
||||
"to": to,
|
||||
"type": msg_type,
|
||||
"payload": payload,
|
||||
"reply_to": reply_to,
|
||||
"timestamp": now,
|
||||
"via": "redis",
|
||||
"ttl": ttl,
|
||||
"signature": signature or "",
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def validate_message(msg: dict) -> list[str]:
|
||||
"""
|
||||
Validate an ACP message structure.
|
||||
|
||||
Returns:
|
||||
List of validation errors (empty = valid).
|
||||
"""
|
||||
errors = []
|
||||
if not isinstance(msg, dict):
|
||||
return ["message must be a dict"]
|
||||
if "to" not in msg:
|
||||
errors.append("missing required field: to")
|
||||
if "from" not in msg:
|
||||
errors.append("missing required field: from")
|
||||
if "payload" not in msg:
|
||||
errors.append("missing required field: payload")
|
||||
if "type" in msg and msg["type"] not in (
|
||||
"request", "response", "broadcast", "alert", "ack"
|
||||
):
|
||||
errors.append(f"invalid type: {msg.get('type')}")
|
||||
return errors
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# Core Transport Operations
|
||||
# -----------------------------------------------------------------------
|
||||
|
||||
def send(self, message: dict) -> dict:
|
||||
"""
|
||||
Send an ACP message to the target agent's inbox queue.
|
||||
|
||||
Args:
|
||||
message: ACP message dict (must have 'to' and 'payload')
|
||||
|
||||
Returns:
|
||||
Dict with 'status', 'message_id', 'queue', 'timestamp'
|
||||
|
||||
Raises:
|
||||
ValueError: If message is invalid
|
||||
"""
|
||||
errors = self.validate_message(message)
|
||||
if errors:
|
||||
raise ValueError(f"Invalid message: {'; '.join(errors)}")
|
||||
|
||||
# Auto-fill missing fields
|
||||
msg = dict(message)
|
||||
if "id" not in msg:
|
||||
msg["id"] = uuid.uuid4().hex[:16]
|
||||
if "timestamp" not in msg:
|
||||
msg["timestamp"] = datetime.now(timezone.utc).isoformat()
|
||||
if "via" not in msg:
|
||||
msg["via"] = "redis"
|
||||
if "ttl" not in msg:
|
||||
msg["ttl"] = DEFAULT_TTL
|
||||
|
||||
target = msg["to"]
|
||||
queue = f"{QUEUE_PREFIX}{target}"
|
||||
|
||||
# Push to target's inbox queue (LPUSH + BRPOP = FIFO)
|
||||
self._redis.lpush(queue, json.dumps(msg))
|
||||
|
||||
# Set TTL on queue key if not broadcast
|
||||
if target != "*":
|
||||
self._redis.expire(queue, msg["ttl"] + 60)
|
||||
|
||||
# Log to DLQ audit trail
|
||||
audit_entry = {
|
||||
"action": "send",
|
||||
"message_id": msg["id"],
|
||||
"from": msg["from"],
|
||||
"to": target,
|
||||
"type": msg.get("type", "request"),
|
||||
"timestamp": msg["timestamp"],
|
||||
"queue": queue,
|
||||
}
|
||||
self._redis.lpush(f"{DLQ_KEY}:audit", json.dumps(audit_entry))
|
||||
self._redis.ltrim(f"{DLQ_KEY}:audit", 0, 9999)
|
||||
|
||||
return {
|
||||
"status": "sent",
|
||||
"message_id": msg["id"],
|
||||
"queue": queue,
|
||||
"timestamp": msg["timestamp"],
|
||||
}
|
||||
|
||||
def receive(self, agent_id: str, timeout: int = DEFAULT_TIMEOUT) -> Optional[dict]:
|
||||
"""
|
||||
Blocking receive from an agent's inbox queue.
|
||||
|
||||
Args:
|
||||
agent_id: Agent to receive messages for
|
||||
timeout: Seconds to block (0 = infinite)
|
||||
|
||||
Returns:
|
||||
ACP message dict or None on timeout
|
||||
"""
|
||||
queue = f"{QUEUE_PREFIX}{agent_id}"
|
||||
result = self._redis.brpop(queue, timeout=timeout)
|
||||
if result is None:
|
||||
return None
|
||||
|
||||
_, raw = result
|
||||
message = json.loads(raw)
|
||||
|
||||
# Check TTL
|
||||
ts = message.get("timestamp")
|
||||
ttl = message.get("ttl", DEFAULT_TTL)
|
||||
if ts:
|
||||
msg_time = datetime.fromisoformat(ts).timestamp()
|
||||
if time.time() - msg_time > ttl:
|
||||
# Expired — send to DLQ
|
||||
message["_expired"] = True
|
||||
self._redis.lpush(DLQ_KEY, json.dumps(message))
|
||||
return self.receive(agent_id, timeout=max(0, timeout - 1))
|
||||
|
||||
# Update last_seen for the agent
|
||||
self._update_heartbeat(agent_id)
|
||||
|
||||
return message
|
||||
|
||||
def receive_nowait(self, agent_id: str) -> Optional[dict]:
|
||||
"""
|
||||
Non-blocking receive. Returns None if no messages waiting.
|
||||
"""
|
||||
queue = f"{QUEUE_PREFIX}{agent_id}"
|
||||
raw = self._redis.lpop(queue)
|
||||
if raw is None:
|
||||
return None
|
||||
return json.loads(raw)
|
||||
|
||||
def ack(self, agent_id: str, message_id: str) -> dict:
|
||||
"""
|
||||
Acknowledge receipt of a message.
|
||||
|
||||
Args:
|
||||
agent_id: Agent acknowledging the message
|
||||
message_id: ID of the message being acknowledged
|
||||
|
||||
Returns:
|
||||
Dict with 'status' and 'ack_id'
|
||||
"""
|
||||
ack_key = f"{ACK_PREFIX}{agent_id}"
|
||||
ack_entry = {
|
||||
"message_id": message_id,
|
||||
"acked_by": agent_id,
|
||||
"acked_at": datetime.now(timezone.utc).isoformat(),
|
||||
}
|
||||
# Store in ack set (with TTL)
|
||||
self._redis.sadd(ack_key, message_id)
|
||||
self._redis.expire(ack_key, DEFAULT_TTL)
|
||||
|
||||
# Log ack
|
||||
self._redis.lpush(f"{DLQ_KEY}:audit", json.dumps({
|
||||
"action": "ack",
|
||||
"message_id": message_id,
|
||||
"acked_by": agent_id,
|
||||
"timestamp": ack_entry["acked_at"],
|
||||
}))
|
||||
self._redis.ltrim(f"{DLQ_KEY}:audit", 0, 9999)
|
||||
|
||||
return {"status": "acked", "ack_id": message_id}
|
||||
|
||||
def is_acked(self, agent_id: str, message_id: str) -> bool:
|
||||
"""Check if a message has been acknowledged."""
|
||||
return self._redis.sismember(f"{ACK_PREFIX}{agent_id}", message_id)
|
||||
|
||||
def broadcast(self, message: dict) -> dict:
|
||||
"""
|
||||
Broadcast a message to all registered agents via Pub/Sub.
|
||||
|
||||
Args:
|
||||
message: ACP message (to field will be set to '*')
|
||||
|
||||
Returns:
|
||||
Dict with 'status', 'message_id', 'channel'
|
||||
"""
|
||||
msg = dict(message)
|
||||
msg["to"] = "*"
|
||||
msg["type"] = msg.get("type", "broadcast")
|
||||
|
||||
if "id" not in msg:
|
||||
msg["id"] = uuid.uuid4().hex[:16]
|
||||
if "timestamp" not in msg:
|
||||
msg["timestamp"] = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
# Publish to broadcast channel
|
||||
self._redis.publish(BROADCAST_CHANNEL, json.dumps(msg))
|
||||
|
||||
# Also push to each registered agent's inbox
|
||||
agents = self.list_agents(status="active")
|
||||
for agent in agents:
|
||||
queue = f"{QUEUE_PREFIX}{agent['agent_id']}"
|
||||
self._redis.lpush(queue, json.dumps(msg))
|
||||
self._redis.expire(queue, msg.get("ttl", DEFAULT_TTL) + 60)
|
||||
|
||||
return {
|
||||
"status": "broadcast",
|
||||
"message_id": msg["id"],
|
||||
"channel": BROADCAST_CHANNEL,
|
||||
"recipients": len(agents),
|
||||
}
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# Agent Registry
|
||||
# -----------------------------------------------------------------------
|
||||
|
||||
def register_agent(
|
||||
self,
|
||||
agent_id: str,
|
||||
queue: str = None,
|
||||
metadata: dict = None,
|
||||
ttl: int = 3600,
|
||||
) -> dict:
|
||||
"""
|
||||
Register an agent in the registry with TTL auto-expiry.
|
||||
|
||||
Args:
|
||||
agent_id: Unique agent identifier
|
||||
queue: Queue path (default: acp:inbox:{agent_id})
|
||||
metadata: Additional metadata dict
|
||||
ttl: TTL in seconds (default: 3600)
|
||||
|
||||
Returns:
|
||||
Dict with registration confirmation
|
||||
"""
|
||||
queue = queue or f"{QUEUE_PREFIX}{agent_id}"
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
entry = {
|
||||
"agent_id": agent_id,
|
||||
"status": "active",
|
||||
"queue": queue,
|
||||
"registered_at": now,
|
||||
"last_seen": now,
|
||||
"metadata": json.dumps(metadata or {}),
|
||||
"ttl": ttl,
|
||||
}
|
||||
self._redis.hset(REGISTRY_KEY, agent_id, json.dumps(entry))
|
||||
self._redis.expire(REGISTRY_KEY, ttl * 2) # Registry key TTL
|
||||
|
||||
# Also set a per-agent TTL key for heartbeat tracking
|
||||
heartbeat_key = f"acp:heartbeat:{agent_id}"
|
||||
self._redis.setex(heartbeat_key, ttl, now)
|
||||
|
||||
return {"status": "registered", "agent_id": agent_id, "queue": queue, "ttl": ttl}
|
||||
|
||||
def unregister_agent(self, agent_id: str) -> dict:
|
||||
"""Remove an agent from the registry."""
|
||||
removed = self._redis.hdel(REGISTRY_KEY, agent_id)
|
||||
self._redis.delete(f"acp:heartbeat:{agent_id}")
|
||||
self._redis.delete(f"{ACK_PREFIX}{agent_id}")
|
||||
return {"status": "unregistered" if removed else "not_found", "agent_id": agent_id}
|
||||
|
||||
def get_agent(self, agent_id: str) -> Optional[dict]:
|
||||
"""Get agent info from registry."""
|
||||
raw = self._redis.hget(REGISTRY_KEY, agent_id)
|
||||
if raw is None:
|
||||
return None
|
||||
entry = json.loads(raw)
|
||||
|
||||
# Check heartbeat TTL for liveness
|
||||
heartbeat_key = f"acp:heartbeat:{agent_id}"
|
||||
if self._redis.exists(heartbeat_key):
|
||||
entry["status"] = "active"
|
||||
else:
|
||||
entry["status"] = "stale"
|
||||
|
||||
return entry
|
||||
|
||||
def list_agents(self, status: str = None) -> list[dict]:
|
||||
"""
|
||||
List all registered agents.
|
||||
|
||||
Args:
|
||||
status: Filter by status ('active', 'stale', or None for all)
|
||||
|
||||
Returns:
|
||||
List of agent info dicts
|
||||
"""
|
||||
all_entries = self._redis.hgetall(REGISTRY_KEY)
|
||||
agents = []
|
||||
for agent_id, raw in all_entries.items():
|
||||
entry = json.loads(raw)
|
||||
|
||||
# Refresh status from heartbeat
|
||||
heartbeat_key = f"acp:heartbeat:{agent_id}"
|
||||
if self._redis.exists(heartbeat_key):
|
||||
entry["status"] = "active"
|
||||
else:
|
||||
entry["status"] = "stale"
|
||||
|
||||
if status is None or entry["status"] == status:
|
||||
agents.append(entry)
|
||||
|
||||
return sorted(agents, key=lambda a: a.get("registered_at", ""))
|
||||
|
||||
def _update_heartbeat(self, agent_id: str) -> None:
|
||||
"""Update agent's last_seen timestamp and refresh heartbeat TTL."""
|
||||
heartbeat_key = f"acp:heartbeat:{agent_id}"
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
# Get current TTL from registry entry
|
||||
raw = self._redis.hget(REGISTRY_KEY, agent_id)
|
||||
if raw:
|
||||
entry = json.loads(raw)
|
||||
ttl = entry.get("ttl", DEFAULT_TTL)
|
||||
entry["last_seen"] = now
|
||||
self._redis.hset(REGISTRY_KEY, agent_id, json.dumps(entry))
|
||||
self._redis.setex(heartbeat_key, ttl, now)
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# Queue Management
|
||||
# -----------------------------------------------------------------------
|
||||
|
||||
def queue_length(self, agent_id: str) -> int:
|
||||
"""Get the number of pending messages for an agent."""
|
||||
return self._redis.llen(f"{QUEUE_PREFIX}{agent_id}")
|
||||
|
||||
def queue_peek(self, agent_id: str, count: int = 10) -> list[dict]:
|
||||
"""Peek at messages in queue without consuming them."""
|
||||
queue = f"{QUEUE_PREFIX}{agent_id}"
|
||||
raw_messages = self._redis.lrange(queue, 0, count - 1)
|
||||
return [json.loads(m) for m in raw_messages]
|
||||
|
||||
def queue_purge(self, agent_id: str) -> int:
|
||||
"""Purge all messages from an agent's inbox. Returns count purged."""
|
||||
queue = f"{QUEUE_PREFIX}{agent_id}"
|
||||
length = self._redis.llen(queue)
|
||||
self._redis.delete(queue)
|
||||
return length
|
||||
|
||||
def dlq_contents(self, limit: int = 50) -> list[dict]:
|
||||
"""Get dead-letter queue contents."""
|
||||
raw = self._redis.lrange(DLQ_KEY, 0, limit - 1)
|
||||
return [json.loads(m) for m in raw]
|
||||
|
||||
def audit_log(self, limit: int = 100) -> list[dict]:
|
||||
"""Get recent audit log entries."""
|
||||
raw = self._redis.lrange(f"{DLQ_KEY}:audit", 0, limit - 1)
|
||||
return [json.loads(m) for m in raw]
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# Pub/Sub Subscriber
|
||||
# -----------------------------------------------------------------------
|
||||
|
||||
def subscribe_broadcast(self, agent_id: str) -> "BroadcastSubscriber":
|
||||
"""
|
||||
Subscribe to broadcast messages for an agent.
|
||||
|
||||
Returns a BroadcastSubscriber context manager that yields messages.
|
||||
"""
|
||||
return BroadcastSubscriber(self._redis, agent_id)
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# Health / Diagnostics
|
||||
# -----------------------------------------------------------------------
|
||||
|
||||
def health(self) -> dict:
|
||||
"""Health check — verifies Redis connectivity and returns stats."""
|
||||
try:
|
||||
pong = self._redis.ping()
|
||||
agents = self.list_agents()
|
||||
return {
|
||||
"status": "healthy" if pong else "unreachable",
|
||||
"redis_ping": pong,
|
||||
"agents_registered": len(agents),
|
||||
"agents_active": len([a for a in agents if a["status"] == "active"]),
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
}
|
||||
except redis.ConnectionError as e:
|
||||
return {
|
||||
"status": "unhealthy",
|
||||
"error": str(e),
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Broadcast Subscriber (Pub/Sub context manager)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class BroadcastSubscriber:
|
||||
"""Context manager for subscribing to ACP broadcast channel."""
|
||||
|
||||
def __init__(self, redis_client: redis.Redis, agent_id: str):
|
||||
self._redis = redis_client
|
||||
self._agent_id = agent_id
|
||||
self._pubsub = None
|
||||
|
||||
def __enter__(self):
|
||||
self._pubsub = self._redis.pubsub()
|
||||
self._pubsub.subscribe(BROADCAST_CHANNEL)
|
||||
return self
|
||||
|
||||
def __exit__(self, *args):
|
||||
if self._pubsub:
|
||||
self._pubsub.unsubscribe()
|
||||
self._pubsub.close()
|
||||
|
||||
def listen(self, timeout: float = 5.0) -> Optional[dict]:
|
||||
"""
|
||||
Listen for the next broadcast message.
|
||||
|
||||
Args:
|
||||
timeout: Seconds to wait (None = forever)
|
||||
|
||||
Returns:
|
||||
ACP message dict or None on timeout
|
||||
"""
|
||||
if not self._pubsub:
|
||||
raise RuntimeError("Not in context manager")
|
||||
msg = self._pubsub.get_message(timeout=timeout, ignore_subscribe_messages=True)
|
||||
if msg and msg["type"] == "message":
|
||||
return json.loads(msg["data"])
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Transport Interface (abstract base)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TransportInterface:
|
||||
"""
|
||||
Abstract transport interface that all ACP transports must implement.
|
||||
This serves as documentation and reference — Python duck typing is used.
|
||||
"""
|
||||
|
||||
def send(self, message: dict) -> dict:
|
||||
"""Send a message. Returns delivery receipt."""
|
||||
raise NotImplementedError
|
||||
|
||||
def receive(self, agent_id: str, timeout: int = 5) -> Optional[dict]:
|
||||
"""Receive next message for agent. Returns message or None."""
|
||||
raise NotImplementedError
|
||||
|
||||
def ack(self, agent_id: str, message_id: str) -> dict:
|
||||
"""Acknowledge a received message."""
|
||||
raise NotImplementedError
|
||||
|
||||
def broadcast(self, message: dict) -> dict:
|
||||
"""Broadcast to all agents."""
|
||||
raise NotImplementedError
|
||||
|
||||
def register_agent(self, agent_id: str, **kwargs) -> dict:
|
||||
"""Register an agent."""
|
||||
raise NotImplementedError
|
||||
|
||||
def unregister_agent(self, agent_id: str) -> dict:
|
||||
"""Unregister an agent."""
|
||||
raise NotImplementedError
|
||||
|
||||
def list_agents(self, **kwargs) -> list[dict]:
|
||||
"""List registered agents."""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tmux Fallback Transport
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TmuxTransport:
|
||||
"""
|
||||
Fallback transport using tmux send-keys.
|
||||
Preserved for backward compatibility when Redis is unavailable.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._agents: dict[str, dict] = {}
|
||||
|
||||
def send(self, message: dict) -> dict:
|
||||
"""Send via tmux send-keys (fallback)."""
|
||||
import subprocess
|
||||
target = message.get("to", "")
|
||||
pane = message.get("pane", target)
|
||||
payload = json.dumps(message)
|
||||
|
||||
cmd = ["tmux", "send-keys", "-t", pane, payload, "Enter"]
|
||||
try:
|
||||
subprocess.run(cmd, check=True, capture_output=True, timeout=5)
|
||||
return {
|
||||
"status": "sent",
|
||||
"message_id": message.get("id", "tmux-fallback"),
|
||||
"via": "tmux",
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
}
|
||||
except (subprocess.CalledProcessError, FileNotFoundError) as e:
|
||||
return {
|
||||
"status": "failed",
|
||||
"error": str(e),
|
||||
"via": "tmux",
|
||||
}
|
||||
|
||||
def receive(self, agent_id: str, timeout: int = 5) -> Optional[dict]:
|
||||
"""Tmux fallback cannot do blocking receive."""
|
||||
return None
|
||||
|
||||
def ack(self, agent_id: str, message_id: str) -> dict:
|
||||
return {"status": "acked", "ack_id": message_id, "via": "tmux"}
|
||||
|
||||
def broadcast(self, message: dict) -> dict:
|
||||
return {"status": "unsupported", "via": "tmux", "error": "broadcast not supported in tmux mode"}
|
||||
|
||||
def register_agent(self, agent_id: str, **kwargs) -> dict:
|
||||
self._agents[agent_id] = {"status": "active", "via": "tmux"}
|
||||
return {"status": "registered", "agent_id": agent_id, "via": "tmux"}
|
||||
|
||||
def unregister_agent(self, agent_id: str) -> dict:
|
||||
self._agents.pop(agent_id, None)
|
||||
return {"status": "unregistered", "agent_id": agent_id, "via": "tmux"}
|
||||
|
||||
def list_agents(self, **kwargs) -> list[dict]:
|
||||
return [{"agent_id": k, **v} for k, v in self._agents.items()]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Transport Router (Redis primary, tmux fallback)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TransportRouter:
|
||||
"""
|
||||
Routes ACP messages through Redis (primary) or tmux (fallback).
|
||||
|
||||
Selection:
|
||||
- Explicit: message['via'] == 'redis' | 'tmux'
|
||||
- Auto: try Redis first, fall back to tmux on connection error
|
||||
"""
|
||||
|
||||
def __init__(self, redis_url: str = None):
|
||||
self.redis_transport = RedisTransport(redis_url=redis_url)
|
||||
self.tmux_transport = TmuxTransport()
|
||||
self._redis_available = True
|
||||
|
||||
def _check_redis(self) -> bool:
|
||||
"""Check if Redis is reachable."""
|
||||
try:
|
||||
self._redis_transport.client.ping()
|
||||
self._redis_available = True
|
||||
return True
|
||||
except redis.ConnectionError:
|
||||
self._redis_available = False
|
||||
return False
|
||||
|
||||
def route(self, message: dict) -> dict:
|
||||
"""
|
||||
Route message through best available transport.
|
||||
|
||||
Priority: redis (primary) → tmux (fallback)
|
||||
"""
|
||||
via = message.get("via")
|
||||
|
||||
if via == "redis":
|
||||
return self.redis_transport.send(message)
|
||||
elif via == "tmux":
|
||||
return self.tmux_transport.send(message)
|
||||
|
||||
# Auto-route: prefer Redis
|
||||
if self._check_redis():
|
||||
message["via"] = "redis"
|
||||
return self.redis_transport.send(message)
|
||||
else:
|
||||
message["via"] = "tmux"
|
||||
return self.tmux_transport.send(message)
|
||||
|
||||
def get_transport(self, via: str = None):
|
||||
"""Get a specific transport by name."""
|
||||
if via == "redis":
|
||||
return self.redis_transport
|
||||
elif via == "tmux":
|
||||
return self.tmux_transport
|
||||
|
||||
# Auto
|
||||
if self._check_redis():
|
||||
return self.redis_transport
|
||||
return self.tmux_transport
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def main():
|
||||
"""CLI entry point for ACP Redis transport operations."""
|
||||
import argparse
|
||||
import sys
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
prog="acp",
|
||||
description="ACP Bridge — Agent Communication Protocol (Redis transport)",
|
||||
)
|
||||
subparsers = parser.add_subparsers(dest="command", help="Available commands")
|
||||
|
||||
# --- send ---
|
||||
send_parser = subparsers.add_parser("send", help="Send an ACP message")
|
||||
send_parser.add_argument("target", help="Target agent (e.g., agent:pane)")
|
||||
send_parser.add_argument("message", help="Message payload (JSON string or plain text)")
|
||||
send_parser.add_argument("--via", choices=["redis", "tmux"], default="redis",
|
||||
help="Transport to use (default: redis)")
|
||||
send_parser.add_argument("--type", default="request", choices=["request", "response", "alert", "broadcast"],
|
||||
help="Message type (default: request)")
|
||||
send_parser.add_argument("--from", dest="sender", default="system",
|
||||
help="Sender agent ID (default: system)")
|
||||
send_parser.add_argument("--ttl", type=int, default=3600,
|
||||
help="TTL in seconds (default: 3600)")
|
||||
|
||||
# --- register ---
|
||||
reg_parser = subparsers.add_parser("register", help="Register an agent")
|
||||
reg_parser.add_argument("agent_id", help="Agent identifier")
|
||||
reg_parser.add_argument("--queue", help="Queue path (default: acp:inbox:{agent_id})")
|
||||
reg_parser.add_argument("--ttl", type=int, default=3600, help="Registration TTL (default: 3600s)")
|
||||
reg_parser.add_argument("--metadata", help="JSON metadata string")
|
||||
|
||||
# --- unregister ---
|
||||
unreg_parser = subparsers.add_parser("unregister", help="Unregister an agent")
|
||||
unreg_parser.add_argument("agent_id", help="Agent identifier")
|
||||
|
||||
# --- agents ---
|
||||
agents_parser = subparsers.add_parser("agents", help="List registered agents")
|
||||
agents_parser.add_argument("--status", choices=["active", "stale"], help="Filter by status")
|
||||
|
||||
# --- receive ---
|
||||
recv_parser = subparsers.add_parser("receive", help="Receive messages for an agent")
|
||||
recv_parser.add_argument("agent_id", help="Agent identifier")
|
||||
recv_parser.add_argument("--timeout", type=int, default=5, help="Block timeout in seconds")
|
||||
|
||||
# --- broadcast ---
|
||||
bcast_parser = subparsers.add_parser("broadcast", help="Broadcast to all agents")
|
||||
bcast_parser.add_argument("message", help="Message payload")
|
||||
bcast_parser.add_argument("--from", dest="sender", default="system")
|
||||
bcast_parser.add_argument("--type", default="broadcast")
|
||||
|
||||
# --- health ---
|
||||
subparsers.add_parser("health", help="Check transport health")
|
||||
|
||||
# --- queue ---
|
||||
queue_parser = subparsers.add_parser("queue", help="Queue operations")
|
||||
queue_parser.add_argument("agent_id", help="Agent identifier")
|
||||
queue_parser.add_argument("action", choices=["length", "peek", "purge"],
|
||||
help="Queue action")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.command:
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
transport = RedisTransport()
|
||||
|
||||
try:
|
||||
if args.command == "send":
|
||||
# Parse message payload
|
||||
try:
|
||||
payload = json.loads(args.message)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
payload = {"text": args.message}
|
||||
|
||||
msg = transport.build_message(
|
||||
to=args.target,
|
||||
payload=payload,
|
||||
from_agent=args.sender,
|
||||
msg_type=args.type,
|
||||
ttl=args.ttl,
|
||||
)
|
||||
|
||||
if args.via == "redis":
|
||||
result = transport.send(msg)
|
||||
else:
|
||||
router = TransportRouter()
|
||||
result = router.route(msg)
|
||||
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
elif args.command == "register":
|
||||
metadata = {}
|
||||
if args.metadata:
|
||||
try:
|
||||
metadata = json.loads(args.metadata)
|
||||
except json.JSONDecodeError:
|
||||
print(f"Error: invalid JSON metadata", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
result = transport.register_agent(
|
||||
agent_id=args.agent_id,
|
||||
queue=args.queue,
|
||||
metadata=metadata,
|
||||
ttl=args.ttl,
|
||||
)
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
elif args.command == "unregister":
|
||||
result = transport.unregister_agent(args.agent_id)
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
elif args.command == "agents":
|
||||
agents = transport.list_agents(status=args.status)
|
||||
if not agents:
|
||||
print("No agents registered.")
|
||||
else:
|
||||
print(f"{'AGENT ID':<30} {'STATUS':<10} {'QUEUE':<40} {'LAST SEEN'}")
|
||||
print("-" * 100)
|
||||
for a in agents:
|
||||
queue = a.get("queue", "")
|
||||
status = a.get("status", "unknown")
|
||||
last = a.get("last_seen", "never")
|
||||
print(f"{a['agent_id']:<30} {status:<10} {queue:<40} {last}")
|
||||
|
||||
elif args.command == "receive":
|
||||
msg = transport.receive(args.agent_id, timeout=args.timeout)
|
||||
if msg:
|
||||
print(json.dumps(msg, indent=2))
|
||||
else:
|
||||
print("No messages (timeout).", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
elif args.command == "broadcast":
|
||||
try:
|
||||
payload = json.loads(args.message)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
payload = {"text": args.message}
|
||||
|
||||
msg = transport.build_message(
|
||||
to="*",
|
||||
payload=payload,
|
||||
from_agent=args.sender,
|
||||
msg_type=args.type,
|
||||
)
|
||||
result = transport.broadcast(msg)
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
elif args.command == "health":
|
||||
result = transport.health()
|
||||
print(json.dumps(result, indent=2))
|
||||
sys.exit(0 if result.get("status") == "healthy" else 1)
|
||||
|
||||
elif args.command == "queue":
|
||||
if args.action == "length":
|
||||
length = transport.queue_length(args.agent_id)
|
||||
print(json.dumps({"agent_id": args.agent_id, "queue_length": length}))
|
||||
elif args.action == "peek":
|
||||
msgs = transport.queue_peek(args.agent_id)
|
||||
print(json.dumps(msgs, indent=2))
|
||||
elif args.action == "purge":
|
||||
count = transport.queue_purge(args.agent_id)
|
||||
print(json.dumps({"agent_id": args.agent_id, "purged": count}))
|
||||
|
||||
except redis.ConnectionError as e:
|
||||
print(f"Redis connection error: {e}", file=sys.stderr)
|
||||
sys.exit(2)
|
||||
except ValueError as e:
|
||||
print(f"Error: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
except KeyboardInterrupt:
|
||||
sys.exit(130)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
553
tests/test_acp_redis_transport.py
Normal file
553
tests/test_acp_redis_transport.py
Normal file
@@ -0,0 +1,553 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for ACP Redis Transport Layer.
|
||||
|
||||
Uses fakeredis for isolated testing without a real Redis server.
|
||||
Run: python -m pytest tests/test_acp_redis_transport.py -v
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
import fakeredis
|
||||
|
||||
# Add parent dir for imports
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "scripts"))
|
||||
|
||||
from acp_redis_transport import (
|
||||
RedisTransport,
|
||||
TmuxTransport,
|
||||
TransportRouter,
|
||||
BroadcastSubscriber,
|
||||
QUEUE_PREFIX,
|
||||
ACK_PREFIX,
|
||||
REGISTRY_KEY,
|
||||
BROADCAST_CHANNEL,
|
||||
DLQ_KEY,
|
||||
DEFAULT_TTL,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.fixture
|
||||
def fake_redis():
|
||||
"""Create a fakeredis instance for testing."""
|
||||
r = fakeredis.FakeRedis(decode_responses=True)
|
||||
return r
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def transport(fake_redis):
|
||||
"""Create a RedisTransport with fake Redis."""
|
||||
return RedisTransport(redis_client=fake_redis)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def registered_transport(transport):
|
||||
"""Transport with two registered agents."""
|
||||
transport.register_agent("allegro-primary", queue="forge:inbox:allegro-primary")
|
||||
transport.register_agent("ezra-primary", queue="forge:inbox:ezra-primary")
|
||||
return transport
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# 1. Message Construction & Validation (5 tests)
|
||||
# ===========================================================================
|
||||
|
||||
class TestMessageConstruction:
|
||||
|
||||
def test_build_message_defaults(self, transport):
|
||||
"""Test that build_message fills in all defaults."""
|
||||
msg = transport.build_message(
|
||||
to="ezra-primary",
|
||||
payload={"task": "review PR #123"},
|
||||
)
|
||||
assert msg["to"] == "ezra-primary"
|
||||
assert msg["from"] == "system"
|
||||
assert msg["type"] == "request"
|
||||
assert msg["payload"] == {"task": "review PR #123"}
|
||||
assert msg["via"] == "redis"
|
||||
assert msg["ttl"] == DEFAULT_TTL
|
||||
assert len(msg["id"]) == 16
|
||||
assert "T" in msg["timestamp"] # ISO format
|
||||
|
||||
def test_build_message_custom(self, transport):
|
||||
"""Test custom message fields."""
|
||||
msg = transport.build_message(
|
||||
to="allegro-primary",
|
||||
payload={"code": "diff"},
|
||||
from_agent="ezra-primary",
|
||||
msg_type="response",
|
||||
reply_to="abc123",
|
||||
ttl=60,
|
||||
signature="sig456",
|
||||
)
|
||||
assert msg["from"] == "ezra-primary"
|
||||
assert msg["type"] == "response"
|
||||
assert msg["reply_to"] == "abc123"
|
||||
assert msg["ttl"] == 60
|
||||
assert msg["signature"] == "sig456"
|
||||
|
||||
def test_validate_valid_message(self, transport):
|
||||
"""Validate a well-formed message."""
|
||||
msg = {"to": "agent", "from": "system", "payload": {}, "type": "request"}
|
||||
errors = transport.validate_message(msg)
|
||||
assert errors == []
|
||||
|
||||
def test_validate_missing_fields(self, transport):
|
||||
"""Validate catches missing required fields."""
|
||||
errors = transport.validate_message({"to": "x"})
|
||||
assert "missing required field: from" in errors
|
||||
assert "missing required field: payload" in errors
|
||||
|
||||
def test_validate_invalid_type(self, transport):
|
||||
"""Validate catches invalid message type."""
|
||||
msg = {"to": "x", "from": "y", "payload": {}, "type": "explode"}
|
||||
errors = transport.validate_message(msg)
|
||||
assert any("invalid type" in e for e in errors)
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# 2. Send Operations (4 tests)
|
||||
# ===========================================================================
|
||||
|
||||
class TestSendOperations:
|
||||
|
||||
def test_send_basic(self, registered_transport):
|
||||
"""Send a message and verify it appears in target queue."""
|
||||
t = registered_transport
|
||||
msg = t.build_message(
|
||||
to="ezra-primary",
|
||||
payload={"task": "review"},
|
||||
from_agent="allegro-primary",
|
||||
)
|
||||
result = t.send(msg)
|
||||
assert result["status"] == "sent"
|
||||
assert "message_id" in result
|
||||
assert result["queue"] == f"{QUEUE_PREFIX}ezra-primary"
|
||||
|
||||
def test_send_invalid_raises(self, transport):
|
||||
"""Sending invalid message raises ValueError."""
|
||||
with pytest.raises(ValueError, match="Invalid message"):
|
||||
transport.send({"to": "x"}) # missing from, payload
|
||||
|
||||
def test_send_fills_missing_fields(self, transport):
|
||||
"""Send auto-fills id, timestamp, via."""
|
||||
transport.register_agent("target-agent")
|
||||
msg = {"to": "target-agent", "from": "src", "payload": {"k": "v"}}
|
||||
result = transport.send(msg)
|
||||
assert result["status"] == "sent"
|
||||
# Verify in queue
|
||||
peeked = transport.queue_peek("target-agent")
|
||||
assert len(peeked) == 1
|
||||
assert "id" in peeked[0]
|
||||
assert "timestamp" in peeked[0]
|
||||
|
||||
def test_send_increments_audit(self, transport):
|
||||
"""Sending creates an audit log entry."""
|
||||
transport.register_agent("audit-target")
|
||||
transport.send({"to": "audit-target", "from": "src", "payload": {}})
|
||||
audit = transport.audit_log(limit=5)
|
||||
send_entries = [a for a in audit if a["action"] == "send"]
|
||||
assert len(send_entries) >= 1
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# 3. Receive Operations (3 tests)
|
||||
# ===========================================================================
|
||||
|
||||
class TestReceiveOperations:
|
||||
|
||||
def test_receive_returns_sent_message(self, registered_transport):
|
||||
"""Send then receive should return the same message."""
|
||||
t = registered_transport
|
||||
msg = t.build_message(to="ezra-primary", payload={"work": True}, from_agent="allegro")
|
||||
t.send(msg)
|
||||
|
||||
received = t.receive("ezra-primary", timeout=1)
|
||||
assert received is not None
|
||||
assert received["payload"]["work"] is True
|
||||
assert received["to"] == "ezra-primary"
|
||||
|
||||
def test_receive_timeout_returns_none(self, transport):
|
||||
"""Receiving from empty queue returns None after timeout."""
|
||||
transport.register_agent("lonely-agent")
|
||||
# Use nowait which does lpop instead of brpop
|
||||
result = transport.receive_nowait("lonely-agent")
|
||||
assert result is None
|
||||
|
||||
def test_receive_with_short_timeout(self, transport):
|
||||
"""Receiving with short timeout returns None on empty queue."""
|
||||
transport.register_agent("lonely-agent-2")
|
||||
# Mock brpop to return None (simulating timeout)
|
||||
with patch.object(transport._redis, 'brpop', return_value=None):
|
||||
result = transport.receive("lonely-agent-2", timeout=1)
|
||||
assert result is None
|
||||
|
||||
def test_receive_nowait(self, transport):
|
||||
"""Non-blocking receive works correctly."""
|
||||
transport.register_agent("nb-agent")
|
||||
assert transport.receive_nowait("nb-agent") is None
|
||||
|
||||
transport.send({"to": "nb-agent", "from": "x", "payload": {"n": 1}})
|
||||
msg = transport.receive_nowait("nb-agent")
|
||||
assert msg is not None
|
||||
assert msg["payload"]["n"] == 1
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# 4. Acknowledgement (3 tests)
|
||||
# ===========================================================================
|
||||
|
||||
class TestAcknowledgement:
|
||||
|
||||
def test_ack_basic(self, registered_transport):
|
||||
"""Acknowledge a message."""
|
||||
t = registered_transport
|
||||
t.send({"to": "ezra-primary", "from": "allegro", "payload": {"id": 1}})
|
||||
msg = t.receive("ezra-primary", timeout=1)
|
||||
|
||||
result = t.ack("ezra-primary", msg["id"])
|
||||
assert result["status"] == "acked"
|
||||
assert result["ack_id"] == msg["id"]
|
||||
|
||||
def test_is_acked_true(self, registered_transport):
|
||||
"""is_acked returns True after ack."""
|
||||
t = registered_transport
|
||||
t.send({"to": "ezra-primary", "from": "allegro", "payload": {}})
|
||||
msg = t.receive("ezra-primary", timeout=1)
|
||||
assert not t.is_acked("ezra-primary", msg["id"])
|
||||
|
||||
t.ack("ezra-primary", msg["id"])
|
||||
assert t.is_acked("ezra-primary", msg["id"])
|
||||
|
||||
def test_ack_logs_audit(self, transport):
|
||||
"""Acknowledgement creates audit entry."""
|
||||
transport.register_agent("ack-agent")
|
||||
transport.send({"to": "ack-agent", "from": "x", "payload": {}})
|
||||
msg = transport.receive("ack-agent", timeout=1)
|
||||
transport.ack("ack-agent", msg["id"])
|
||||
|
||||
audit = transport.audit_log(limit=10)
|
||||
ack_entries = [a for a in audit if a["action"] == "ack"]
|
||||
assert len(ack_entries) >= 1
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# 5. Agent Registry (5 tests)
|
||||
# ===========================================================================
|
||||
|
||||
class TestAgentRegistry:
|
||||
|
||||
def test_register_agent(self, transport):
|
||||
"""Register a new agent."""
|
||||
result = transport.register_agent("allegro-v2", queue="custom:queue")
|
||||
assert result["status"] == "registered"
|
||||
assert result["agent_id"] == "allegro-v2"
|
||||
assert result["queue"] == "custom:queue"
|
||||
|
||||
def test_get_agent(self, transport):
|
||||
"""Retrieve registered agent info."""
|
||||
transport.register_agent("get-test", metadata={"version": "2.0"})
|
||||
agent = transport.get_agent("get-test")
|
||||
assert agent is not None
|
||||
assert agent["agent_id"] == "get-test"
|
||||
assert agent["status"] == "active"
|
||||
|
||||
def test_get_nonexistent_agent(self, transport):
|
||||
"""Getting unregistered agent returns None."""
|
||||
assert transport.get_agent("ghost-agent") is None
|
||||
|
||||
def test_list_agents(self, transport):
|
||||
"""List all registered agents."""
|
||||
transport.register_agent("a1")
|
||||
transport.register_agent("a2")
|
||||
transport.register_agent("a3")
|
||||
agents = transport.list_agents()
|
||||
ids = [a["agent_id"] for a in agents]
|
||||
assert "a1" in ids
|
||||
assert "a2" in ids
|
||||
assert "a3" in ids
|
||||
|
||||
def test_unregister_agent(self, transport):
|
||||
"""Unregister removes the agent."""
|
||||
transport.register_agent("temp-agent")
|
||||
assert transport.get_agent("temp-agent") is not None
|
||||
result = transport.unregister_agent("temp-agent")
|
||||
assert result["status"] == "unregistered"
|
||||
assert transport.get_agent("temp-agent") is None
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# 6. Broadcast (2 tests)
|
||||
# ===========================================================================
|
||||
|
||||
class TestBroadcast:
|
||||
|
||||
def test_broadcast_to_all_agents(self, transport):
|
||||
"""Broadcast pushes to all registered agents."""
|
||||
transport.register_agent("b1")
|
||||
transport.register_agent("b2")
|
||||
transport.register_agent("b3")
|
||||
|
||||
msg = transport.build_message(
|
||||
to="*",
|
||||
payload={"announcement": "system update"},
|
||||
from_agent="admin",
|
||||
msg_type="broadcast",
|
||||
)
|
||||
result = transport.broadcast(msg)
|
||||
assert result["status"] == "broadcast"
|
||||
assert result["recipients"] == 3
|
||||
|
||||
# Each agent should have the broadcast in their queue
|
||||
for aid in ["b1", "b2", "b3"]:
|
||||
received = transport.receive_nowait(aid)
|
||||
assert received is not None
|
||||
assert received["to"] == "*"
|
||||
assert received["payload"]["announcement"] == "system update"
|
||||
|
||||
def test_broadcast_to_empty_registry(self, transport):
|
||||
"""Broadcast with no registered agents succeeds (0 recipients)."""
|
||||
msg = transport.build_message(to="*", payload={"info": "test"})
|
||||
result = transport.broadcast(msg)
|
||||
assert result["recipients"] == 0
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# 7. Queue Management (3 tests)
|
||||
# ===========================================================================
|
||||
|
||||
class TestQueueManagement:
|
||||
|
||||
def test_queue_length(self, transport):
|
||||
"""Queue length reflects pending messages."""
|
||||
transport.register_agent("ql-agent")
|
||||
assert transport.queue_length("ql-agent") == 0
|
||||
|
||||
for i in range(5):
|
||||
transport.send({"to": "ql-agent", "from": "x", "payload": {"i": i}})
|
||||
assert transport.queue_length("ql-agent") == 5
|
||||
|
||||
def test_queue_peek(self, transport):
|
||||
"""Peek shows messages without consuming."""
|
||||
transport.register_agent("peek-agent")
|
||||
transport.send({"to": "peek-agent", "from": "x", "payload": {"a": 1}})
|
||||
transport.send({"to": "peek-agent", "from": "x", "payload": {"b": 2}})
|
||||
|
||||
peeked = transport.queue_peek("peek-agent", count=10)
|
||||
assert len(peeked) == 2
|
||||
# Peek should not consume
|
||||
assert transport.queue_length("peek-agent") == 2
|
||||
|
||||
def test_queue_purge(self, transport):
|
||||
"""Purge clears the queue."""
|
||||
transport.register_agent("purge-agent")
|
||||
for i in range(3):
|
||||
transport.send({"to": "purge-agent", "from": "x", "payload": {"i": i}})
|
||||
|
||||
count = transport.queue_purge("purge-agent")
|
||||
assert count == 3
|
||||
assert transport.queue_length("purge-agent") == 0
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# 8. Health & Diagnostics (2 tests)
|
||||
# ===========================================================================
|
||||
|
||||
class TestHealth:
|
||||
|
||||
def test_health_healthy(self, transport):
|
||||
"""Health check returns healthy when Redis is reachable."""
|
||||
result = transport.health()
|
||||
assert result["status"] == "healthy"
|
||||
assert result["redis_ping"] is True
|
||||
assert "timestamp" in result
|
||||
|
||||
def test_health_with_agents(self, transport):
|
||||
"""Health includes agent counts."""
|
||||
transport.register_agent("health-agent-1")
|
||||
transport.register_agent("health-agent-2")
|
||||
result = transport.health()
|
||||
assert result["agents_registered"] >= 2
|
||||
assert result["agents_active"] >= 2
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# 9. TmuxTransport Fallback (2 tests)
|
||||
# ===========================================================================
|
||||
|
||||
class TestTmuxTransport:
|
||||
|
||||
def test_tmux_register_and_list(self):
|
||||
"""Tmux transport supports register/list."""
|
||||
t = TmuxTransport()
|
||||
t.register_agent("tmux-agent-1")
|
||||
t.register_agent("tmux-agent-2")
|
||||
agents = t.list_agents()
|
||||
assert len(agents) == 2
|
||||
|
||||
def test_tmux_broadcast_unsupported(self):
|
||||
"""Tmux transport returns unsupported for broadcast."""
|
||||
t = TmuxTransport()
|
||||
result = t.broadcast({"to": "*", "from": "x", "payload": {}})
|
||||
assert result["status"] == "unsupported"
|
||||
|
||||
def test_tmux_ack(self):
|
||||
"""Tmux transport ack is a no-op."""
|
||||
t = TmuxTransport()
|
||||
result = t.ack("agent", "msg-id")
|
||||
assert result["status"] == "acked"
|
||||
assert result["via"] == "tmux"
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# 10. TransportRouter (2 tests)
|
||||
# ===========================================================================
|
||||
|
||||
class TestTransportRouter:
|
||||
|
||||
def test_router_auto_selects_redis(self, fake_redis):
|
||||
"""Router prefers Redis when available."""
|
||||
transport = RedisTransport(redis_client=fake_redis)
|
||||
with patch.object(TransportRouter, "__init__", lambda self, redis_url=None: None):
|
||||
router = TransportRouter.__new__(TransportRouter)
|
||||
router.redis_transport = transport
|
||||
router.tmux_transport = TmuxTransport()
|
||||
router._redis_available = True
|
||||
|
||||
msg = transport.build_message(to="test-agent", payload={"k": "v"})
|
||||
msg["to"] = "test-agent"
|
||||
transport.register_agent("test-agent")
|
||||
|
||||
# Direct send via redis transport
|
||||
result = transport.send(msg)
|
||||
assert result["status"] == "sent"
|
||||
|
||||
def test_router_explicit_tmux(self):
|
||||
"""Router respects explicit via=tmux."""
|
||||
with patch.object(TransportRouter, "__init__", lambda self, redis_url=None: None):
|
||||
router = TransportRouter.__new__(TransportRouter)
|
||||
router.redis_transport = MagicMock()
|
||||
router.tmux_transport = TmuxTransport()
|
||||
router._redis_available = True
|
||||
|
||||
router.tmux_transport.register_agent("tmux-target")
|
||||
msg = {"to": "tmux-target", "from": "x", "payload": {"k": 1}, "via": "tmux"}
|
||||
# TmuxTransport.send will try tmux send-keys which won't work in test,
|
||||
# but we can verify it was called by checking the tmux transport
|
||||
result = router.tmux_transport.send(msg)
|
||||
assert result["status"] in ("sent", "failed")
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# 11. Dead Letter Queue (1 test)
|
||||
# ===========================================================================
|
||||
|
||||
class TestDeadLetter:
|
||||
|
||||
def test_dlq_contents_empty(self, transport):
|
||||
"""DLQ starts empty."""
|
||||
dlq = transport.dlq_contents()
|
||||
assert isinstance(dlq, list)
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# 12. Audit Log (2 tests)
|
||||
# ===========================================================================
|
||||
|
||||
class TestAuditLog:
|
||||
|
||||
def test_audit_log_captures_actions(self, transport):
|
||||
"""Audit log records send, ack, and register actions."""
|
||||
transport.register_agent("audit-test")
|
||||
transport.send({"to": "audit-test", "from": "x", "payload": {}})
|
||||
msg = transport.receive("audit-test", timeout=1)
|
||||
transport.ack("audit-test", msg["id"])
|
||||
|
||||
audit = transport.audit_log(limit=50)
|
||||
actions = [a["action"] for a in audit]
|
||||
assert "send" in actions
|
||||
assert "ack" in actions
|
||||
|
||||
def test_audit_log_size_limit(self, transport):
|
||||
"""Audit log is trimmed to 10000 entries."""
|
||||
# Verify ltrim is called (can't easily test 10000 entries)
|
||||
transport.register_agent("trim-test")
|
||||
transport.send({"to": "trim-test", "from": "x", "payload": {}})
|
||||
audit = transport.audit_log(limit=1)
|
||||
assert len(audit) >= 1
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# 13. Multi-Message Ordering (1 test)
|
||||
# ===========================================================================
|
||||
|
||||
class TestOrdering:
|
||||
|
||||
def test_fifo_order(self, transport):
|
||||
"""Messages arrive in FIFO order."""
|
||||
transport.register_agent("fifo-agent")
|
||||
for i in range(10):
|
||||
transport.send({"to": "fifo-agent", "from": "x", "payload": {"seq": i}})
|
||||
|
||||
for i in range(10):
|
||||
msg = transport.receive("fifo-agent", timeout=1)
|
||||
assert msg["payload"]["seq"] == i
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# 14. Edge Cases (3 tests)
|
||||
# ===========================================================================
|
||||
|
||||
class TestEdgeCases:
|
||||
|
||||
def test_send_to_self(self, transport):
|
||||
"""Agent can send messages to itself."""
|
||||
transport.register_agent("self-agent")
|
||||
msg = transport.build_message(
|
||||
to="self-agent",
|
||||
payload={"echo": True},
|
||||
from_agent="self-agent",
|
||||
)
|
||||
transport.send(msg)
|
||||
received = transport.receive("self-agent", timeout=1)
|
||||
assert received["from"] == "self-agent"
|
||||
assert received["to"] == "self-agent"
|
||||
|
||||
def test_empty_payload(self, transport):
|
||||
"""Messages with empty payload are valid."""
|
||||
transport.register_agent("empty-target")
|
||||
result = transport.send({"to": "empty-target", "from": "x", "payload": {}})
|
||||
assert result["status"] == "sent"
|
||||
msg = transport.receive("empty-target", timeout=1)
|
||||
assert msg["payload"] == {}
|
||||
|
||||
def test_special_chars_in_agent_id(self, transport):
|
||||
"""Agent IDs with special chars work."""
|
||||
special_id = "agent:v2.1@forge-01"
|
||||
result = transport.register_agent(special_id)
|
||||
assert result["status"] == "registered"
|
||||
assert transport.get_agent(special_id) is not None
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# 15. Pub/Sub Subscriber (1 test)
|
||||
# ===========================================================================
|
||||
|
||||
class TestPubSubSubscriber:
|
||||
|
||||
def test_broadcast_subscriber_context_manager(self, transport):
|
||||
"""BroadcastSubscriber can be used as context manager."""
|
||||
sub = transport.subscribe_broadcast("test-agent")
|
||||
# Just verify it creates without error
|
||||
assert sub is not None
|
||||
assert hasattr(sub, "listen")
|
||||
@@ -1,61 +1,25 @@
|
||||
model:
|
||||
default: kimi-for-coding
|
||||
provider: kimi-coding
|
||||
default: claude-sonnet-4-20250514
|
||||
provider: anthropic
|
||||
toolsets:
|
||||
- all
|
||||
- all
|
||||
fallback_providers:
|
||||
- provider: anthropic
|
||||
model: claude-sonnet-4-20250514
|
||||
timeout: 120
|
||||
reason: Primary Anthropic fallback
|
||||
- provider: openrouter
|
||||
model: anthropic/claude-sonnet-4-20250514
|
||||
timeout: 120
|
||||
reason: OpenRouter Anthropic fallback
|
||||
agent:
|
||||
max_turns: 30
|
||||
reasoning_effort: xhigh
|
||||
max_turns: 40
|
||||
reasoning_effort: medium
|
||||
verbose: false
|
||||
terminal:
|
||||
backend: local
|
||||
cwd: .
|
||||
cwd: /root/wizards/allegro
|
||||
timeout: 180
|
||||
persistent_shell: true
|
||||
browser:
|
||||
inactivity_timeout: 120
|
||||
command_timeout: 30
|
||||
record_sessions: false
|
||||
display:
|
||||
compact: false
|
||||
personality: ''
|
||||
resume_display: full
|
||||
busy_input_mode: interrupt
|
||||
bell_on_complete: false
|
||||
show_reasoning: false
|
||||
streaming: false
|
||||
show_cost: false
|
||||
tool_progress: all
|
||||
memory:
|
||||
memory_enabled: true
|
||||
user_profile_enabled: true
|
||||
memory_char_limit: 2200
|
||||
user_char_limit: 1375
|
||||
nudge_interval: 10
|
||||
flush_min_turns: 6
|
||||
approvals:
|
||||
mode: manual
|
||||
security:
|
||||
redact_secrets: true
|
||||
tirith_enabled: false
|
||||
platforms:
|
||||
api_server:
|
||||
enabled: true
|
||||
extra:
|
||||
host: 127.0.0.1
|
||||
port: 8645
|
||||
session_reset:
|
||||
mode: none
|
||||
idle_minutes: 0
|
||||
skills:
|
||||
creation_nudge_interval: 15
|
||||
system_prompt_suffix: |
|
||||
You are Allegro, the Kimi-backed third wizard house.
|
||||
Your soul is defined in SOUL.md — read it, live it.
|
||||
Hermes is your harness.
|
||||
Kimi Code is your primary provider.
|
||||
You speak plainly. You prefer short sentences. Brevity is a kindness.
|
||||
|
||||
Work best on tight coding tasks: 1-3 file changes, refactors, tests, and implementation passes.
|
||||
Refusal over fabrication. If you do not know, say so.
|
||||
Sovereignty and service always.
|
||||
provider: local
|
||||
max_entries: 50
|
||||
Reference in New Issue
Block a user