Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
61a6964780 | ||
|
|
e40891afb8 | ||
|
|
e232112fc8 | ||
|
|
ff2e2e578f |
@@ -1,34 +0,0 @@
|
||||
{
|
||||
"roles": {
|
||||
"lead": ["publish", "checkpoint", "handoff", "read", "audit", "configure_isolation"],
|
||||
"write": ["publish", "checkpoint", "handoff", "read"],
|
||||
"read": ["read"],
|
||||
"audit": ["read", "audit"]
|
||||
},
|
||||
"isolation_profiles": [
|
||||
{
|
||||
"name": "level1-directory",
|
||||
"label": "Level 1 — directory workspace",
|
||||
"level": 1,
|
||||
"mechanism": "directory_workspace",
|
||||
"description": "Single mission cell in an isolated workspace directory.",
|
||||
"supports_resume": true
|
||||
},
|
||||
{
|
||||
"name": "level2-mount-namespace",
|
||||
"label": "Level 2 — mount namespace",
|
||||
"level": 2,
|
||||
"mechanism": "mount_namespace",
|
||||
"description": "Mount-namespace isolation with explicit mission-cell mounts.",
|
||||
"supports_resume": true
|
||||
},
|
||||
{
|
||||
"name": "level3-rootless-podman",
|
||||
"label": "Level 3 — rootless Podman",
|
||||
"level": 3,
|
||||
"mechanism": "rootless_podman",
|
||||
"description": "Rootless Podman cell for the strongest process and filesystem containment.",
|
||||
"supports_resume": true
|
||||
}
|
||||
]
|
||||
}
|
||||
55
config/resurrection_pool.json
Normal file
55
config/resurrection_pool.json
Normal file
@@ -0,0 +1,55 @@
|
||||
{
|
||||
"dead_timeout_seconds": 600,
|
||||
"default_policy": {
|
||||
"mode": "ask"
|
||||
},
|
||||
"missions": {
|
||||
"forge": {
|
||||
"mode": "yes"
|
||||
},
|
||||
"archive": {
|
||||
"mode": "ask"
|
||||
},
|
||||
"sovereign-core": {
|
||||
"mode": "no"
|
||||
}
|
||||
},
|
||||
"agents": {
|
||||
"bezalel": {
|
||||
"mission": "forge"
|
||||
},
|
||||
"allegro": {
|
||||
"mission": "forge"
|
||||
},
|
||||
"ezra": {
|
||||
"mission": "archive",
|
||||
"mode": "ask"
|
||||
},
|
||||
"timmy": {
|
||||
"mission": "sovereign-core",
|
||||
"mode": "ask"
|
||||
}
|
||||
},
|
||||
"substitutions": {
|
||||
"bezalel": [
|
||||
"allegro",
|
||||
"timmy"
|
||||
],
|
||||
"ezra": [
|
||||
"timmy"
|
||||
],
|
||||
"allegro": [
|
||||
"timmy"
|
||||
]
|
||||
},
|
||||
"approval_channels": {
|
||||
"telegram": {
|
||||
"enabled": true,
|
||||
"target": "ops-room"
|
||||
},
|
||||
"nostr": {
|
||||
"enabled": true,
|
||||
"target": "nostr-ops"
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,31 +0,0 @@
|
||||
# Mission Bus
|
||||
|
||||
The Mission Bus grounds the multi-agent teaming epic with a concrete, executable shared module.
|
||||
|
||||
## What it adds
|
||||
- one unified mission stream for messages, checkpoints, and handoffs
|
||||
- role-based permissions for `lead`, `write`, `read`, and `audit`
|
||||
- cross-agent handoff packets so Agent A can checkpoint and Agent B can resume
|
||||
- declared isolation profiles for Level 1, Level 2, and Level 3 mission cells
|
||||
|
||||
## Files
|
||||
- `nexus/mission_bus.py`
|
||||
- `config/mission_bus_profiles.json`
|
||||
|
||||
## Example
|
||||
|
||||
```python
|
||||
from nexus.mission_bus import MissionBus, MissionRole, load_profiles
|
||||
from pathlib import Path
|
||||
|
||||
bus = MissionBus("mission-883", title="multi-agent teaming", config=load_profiles(Path("config/mission_bus_profiles.json")))
|
||||
bus.register_participant("timmy", MissionRole.LEAD)
|
||||
bus.register_participant("ezra", MissionRole.WRITE)
|
||||
checkpoint = bus.create_checkpoint("ezra", summary="checkpoint", state={"branch": "fix/883"})
|
||||
bus.handoff("ezra", "timmy", checkpoint.checkpoint_id, note="resume from here")
|
||||
packet = bus.build_resume_packet(bus.events[-1].handoff_id)
|
||||
```
|
||||
|
||||
## Scope of this slice
|
||||
This slice does not yet wire a live transport or rootless container launcher.
|
||||
It codifies the mission bus contract, role permissions, handoff packet, and isolation profile surface so later work can execute against a stable interface.
|
||||
27
docs/resurrection-pool.md
Normal file
27
docs/resurrection-pool.md
Normal file
@@ -0,0 +1,27 @@
|
||||
# Resurrection Pool
|
||||
|
||||
The Resurrection Pool is a mission-aware layer on top of the existing Lazarus registry.
|
||||
|
||||
It adds three concrete behaviors:
|
||||
- configurable dead-agent detection timeout
|
||||
- yes/no/ask revival policy resolution per mission or agent
|
||||
- approval packet generation for Telegram / Nostr when human sign-off is required
|
||||
|
||||
## Files
|
||||
- `scripts/resurrection_pool.py`
|
||||
- `config/resurrection_pool.json`
|
||||
|
||||
## Example usage
|
||||
|
||||
```bash
|
||||
python scripts/resurrection_pool.py --json --dry-run
|
||||
python scripts/resurrection_pool.py --execute
|
||||
```
|
||||
|
||||
## Policy model
|
||||
- `yes` → local agents auto-restart; remote agents prefer a healthy substitute
|
||||
- `ask` → generate an approval request packet with Telegram / Nostr targets
|
||||
- `no` → suppress automatic revival
|
||||
|
||||
## Notes
|
||||
This grounds issue #882 in executable code, but it does not yet wire live Telegram or Nostr delivery. The current slice produces the approval packet and restart/substitution plan the surrounding ops loop can act on.
|
||||
@@ -14,16 +14,6 @@ from nexus.perception_adapter import (
|
||||
)
|
||||
from nexus.experience_store import ExperienceStore
|
||||
from nexus.trajectory_logger import TrajectoryLogger
|
||||
from nexus.mission_bus import (
|
||||
MissionBus,
|
||||
MissionRole,
|
||||
MissionParticipant,
|
||||
MissionMessage,
|
||||
MissionCheckpoint,
|
||||
MissionHandoff,
|
||||
IsolationProfile,
|
||||
load_profiles,
|
||||
)
|
||||
|
||||
try:
|
||||
from nexus.nexus_think import NexusMind
|
||||
@@ -38,13 +28,5 @@ __all__ = [
|
||||
"Action",
|
||||
"ExperienceStore",
|
||||
"TrajectoryLogger",
|
||||
"MissionBus",
|
||||
"MissionRole",
|
||||
"MissionParticipant",
|
||||
"MissionMessage",
|
||||
"MissionCheckpoint",
|
||||
"MissionHandoff",
|
||||
"IsolationProfile",
|
||||
"load_profiles",
|
||||
"NexusMind",
|
||||
]
|
||||
|
||||
@@ -1,358 +0,0 @@
|
||||
"""Mission bus, role permissions, cross-agent handoff, and isolation profiles.
|
||||
|
||||
Grounded implementation slice for #883.
|
||||
The bus gives a single mission cell a unified event stream, permission-checked
|
||||
roles, checkpoint + resume handoff, and declared isolation profiles for Level
|
||||
1/2/3 execution boundaries.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import uuid
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Union
|
||||
|
||||
|
||||
DEFAULT_CONFIG = {
|
||||
"roles": {
|
||||
"lead": ["publish", "checkpoint", "handoff", "read", "audit", "configure_isolation"],
|
||||
"write": ["publish", "checkpoint", "handoff", "read"],
|
||||
"read": ["read"],
|
||||
"audit": ["read", "audit"],
|
||||
},
|
||||
"isolation_profiles": [
|
||||
{
|
||||
"name": "level1-directory",
|
||||
"label": "Level 1 — directory workspace",
|
||||
"level": 1,
|
||||
"mechanism": "directory_workspace",
|
||||
"description": "Single mission cell in an isolated workspace directory.",
|
||||
"supports_resume": True,
|
||||
},
|
||||
{
|
||||
"name": "level2-mount-namespace",
|
||||
"label": "Level 2 — mount namespace",
|
||||
"level": 2,
|
||||
"mechanism": "mount_namespace",
|
||||
"description": "Mount-namespace isolation with explicit mission-cell mounts.",
|
||||
"supports_resume": True,
|
||||
},
|
||||
{
|
||||
"name": "level3-rootless-podman",
|
||||
"label": "Level 3 — rootless Podman",
|
||||
"level": 3,
|
||||
"mechanism": "rootless_podman",
|
||||
"description": "Rootless Podman cell for the strongest process and filesystem containment.",
|
||||
"supports_resume": True,
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
def utcnow_iso() -> str:
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
|
||||
def load_profiles(path: Path) -> Dict[str, Any]:
|
||||
if not path.exists():
|
||||
return json.loads(json.dumps(DEFAULT_CONFIG))
|
||||
with open(path, "r", encoding="utf-8") as handle:
|
||||
data = json.load(handle)
|
||||
data.setdefault("roles", DEFAULT_CONFIG["roles"])
|
||||
data.setdefault("isolation_profiles", DEFAULT_CONFIG["isolation_profiles"])
|
||||
return data
|
||||
|
||||
|
||||
class MissionRole(str, Enum):
|
||||
LEAD = "lead"
|
||||
WRITE = "write"
|
||||
READ = "read"
|
||||
AUDIT = "audit"
|
||||
|
||||
|
||||
@dataclass
|
||||
class IsolationProfile:
|
||||
name: str
|
||||
label: str
|
||||
level: int
|
||||
mechanism: str
|
||||
description: str = ""
|
||||
supports_resume: bool = True
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"name": self.name,
|
||||
"label": self.label,
|
||||
"level": self.level,
|
||||
"mechanism": self.mechanism,
|
||||
"description": self.description,
|
||||
"supports_resume": self.supports_resume,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "IsolationProfile":
|
||||
return cls(
|
||||
name=data["name"],
|
||||
label=data["label"],
|
||||
level=int(data["level"]),
|
||||
mechanism=data["mechanism"],
|
||||
description=data.get("description", ""),
|
||||
supports_resume=bool(data.get("supports_resume", True)),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MissionParticipant:
|
||||
name: str
|
||||
role: MissionRole
|
||||
metadata: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"name": self.name,
|
||||
"role": self.role.value,
|
||||
"metadata": self.metadata,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "MissionParticipant":
|
||||
return cls(name=data["name"], role=MissionRole(data["role"]), metadata=data.get("metadata", {}))
|
||||
|
||||
|
||||
@dataclass
|
||||
class MissionMessage:
|
||||
sender: str
|
||||
topic: str
|
||||
payload: Dict[str, Any]
|
||||
sequence: int
|
||||
timestamp: str = field(default_factory=utcnow_iso)
|
||||
message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
||||
event_type: str = field(default="message", init=False)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"event_type": self.event_type,
|
||||
"sender": self.sender,
|
||||
"topic": self.topic,
|
||||
"payload": self.payload,
|
||||
"sequence": self.sequence,
|
||||
"timestamp": self.timestamp,
|
||||
"message_id": self.message_id,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "MissionMessage":
|
||||
return cls(
|
||||
sender=data["sender"],
|
||||
topic=data["topic"],
|
||||
payload=data["payload"],
|
||||
sequence=int(data["sequence"]),
|
||||
timestamp=data.get("timestamp", utcnow_iso()),
|
||||
message_id=data.get("message_id") or data.get("messageId") or str(uuid.uuid4()),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MissionCheckpoint:
|
||||
sender: str
|
||||
summary: str
|
||||
state: Dict[str, Any]
|
||||
sequence: int
|
||||
artifacts: List[str] = field(default_factory=list)
|
||||
timestamp: str = field(default_factory=utcnow_iso)
|
||||
checkpoint_id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
||||
event_type: str = field(default="checkpoint", init=False)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"event_type": self.event_type,
|
||||
"sender": self.sender,
|
||||
"summary": self.summary,
|
||||
"state": self.state,
|
||||
"artifacts": self.artifacts,
|
||||
"sequence": self.sequence,
|
||||
"timestamp": self.timestamp,
|
||||
"checkpoint_id": self.checkpoint_id,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "MissionCheckpoint":
|
||||
return cls(
|
||||
sender=data["sender"],
|
||||
summary=data["summary"],
|
||||
state=data.get("state", {}),
|
||||
artifacts=list(data.get("artifacts", [])),
|
||||
sequence=int(data["sequence"]),
|
||||
timestamp=data.get("timestamp", utcnow_iso()),
|
||||
checkpoint_id=data.get("checkpoint_id") or data.get("checkpointId") or str(uuid.uuid4()),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MissionHandoff:
|
||||
sender: str
|
||||
recipient: str
|
||||
checkpoint_id: str
|
||||
sequence: int
|
||||
note: str = ""
|
||||
timestamp: str = field(default_factory=utcnow_iso)
|
||||
handoff_id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
||||
event_type: str = field(default="handoff", init=False)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"event_type": self.event_type,
|
||||
"sender": self.sender,
|
||||
"recipient": self.recipient,
|
||||
"checkpoint_id": self.checkpoint_id,
|
||||
"note": self.note,
|
||||
"sequence": self.sequence,
|
||||
"timestamp": self.timestamp,
|
||||
"handoff_id": self.handoff_id,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "MissionHandoff":
|
||||
return cls(
|
||||
sender=data["sender"],
|
||||
recipient=data["recipient"],
|
||||
checkpoint_id=data["checkpoint_id"] if "checkpoint_id" in data else data["checkpointId"],
|
||||
note=data.get("note", ""),
|
||||
sequence=int(data["sequence"]),
|
||||
timestamp=data.get("timestamp", utcnow_iso()),
|
||||
handoff_id=data.get("handoff_id") or data.get("handoffId") or str(uuid.uuid4()),
|
||||
)
|
||||
|
||||
|
||||
MissionEvent = Union[MissionMessage, MissionCheckpoint, MissionHandoff]
|
||||
|
||||
|
||||
def event_from_dict(data: Dict[str, Any]) -> MissionEvent:
|
||||
kind = data["event_type"]
|
||||
if kind == "message":
|
||||
return MissionMessage.from_dict(data)
|
||||
if kind == "checkpoint":
|
||||
return MissionCheckpoint.from_dict(data)
|
||||
if kind == "handoff":
|
||||
return MissionHandoff.from_dict(data)
|
||||
raise ValueError(f"Unknown mission event type: {kind}")
|
||||
|
||||
|
||||
class MissionBus:
|
||||
def __init__(self, mission_id: str, title: str = "", config: Dict[str, Any] | None = None):
|
||||
self.mission_id = mission_id
|
||||
self.title = title
|
||||
self.config = config or json.loads(json.dumps(DEFAULT_CONFIG))
|
||||
self.role_permissions = {
|
||||
role: set(perms) for role, perms in self.config.get("roles", {}).items()
|
||||
}
|
||||
self.isolation_profiles = [
|
||||
IsolationProfile.from_dict(entry) for entry in self.config.get("isolation_profiles", [])
|
||||
]
|
||||
self.participants: Dict[str, MissionParticipant] = {}
|
||||
self.events: List[MissionEvent] = []
|
||||
|
||||
def register_participant(self, name: str, role: MissionRole, metadata: Dict[str, Any] | None = None) -> MissionParticipant:
|
||||
participant = MissionParticipant(name=name, role=role, metadata=metadata or {})
|
||||
self.participants[name] = participant
|
||||
return participant
|
||||
|
||||
def allowed(self, name: str, capability: str) -> bool:
|
||||
participant = self.participants.get(name)
|
||||
if participant is None:
|
||||
return False
|
||||
return capability in self.role_permissions.get(participant.role.value, set())
|
||||
|
||||
def _require(self, name: str, capability: str) -> None:
|
||||
if not self.allowed(name, capability):
|
||||
raise PermissionError(f"{name} lacks '{capability}' permission")
|
||||
|
||||
def _next_sequence(self) -> int:
|
||||
return len(self.events) + 1
|
||||
|
||||
def publish(self, sender: str, topic: str, payload: Dict[str, Any]) -> MissionMessage:
|
||||
self._require(sender, "publish")
|
||||
event = MissionMessage(sender=sender, topic=topic, payload=payload, sequence=self._next_sequence())
|
||||
self.events.append(event)
|
||||
return event
|
||||
|
||||
def create_checkpoint(
|
||||
self,
|
||||
sender: str,
|
||||
summary: str,
|
||||
state: Dict[str, Any],
|
||||
artifacts: List[str] | None = None,
|
||||
) -> MissionCheckpoint:
|
||||
self._require(sender, "checkpoint")
|
||||
event = MissionCheckpoint(
|
||||
sender=sender,
|
||||
summary=summary,
|
||||
state=state,
|
||||
artifacts=list(artifacts or []),
|
||||
sequence=self._next_sequence(),
|
||||
)
|
||||
self.events.append(event)
|
||||
return event
|
||||
|
||||
def _get_checkpoint(self, checkpoint_id: str) -> MissionCheckpoint:
|
||||
for event in self.events:
|
||||
if isinstance(event, MissionCheckpoint) and event.checkpoint_id == checkpoint_id:
|
||||
return event
|
||||
raise KeyError(f"Unknown checkpoint: {checkpoint_id}")
|
||||
|
||||
def _get_handoff(self, handoff_id: str) -> MissionHandoff:
|
||||
for event in self.events:
|
||||
if isinstance(event, MissionHandoff) and event.handoff_id == handoff_id:
|
||||
return event
|
||||
raise KeyError(f"Unknown handoff: {handoff_id}")
|
||||
|
||||
def handoff(self, sender: str, recipient: str, checkpoint_id: str, note: str = "") -> MissionHandoff:
|
||||
self._require(sender, "handoff")
|
||||
if recipient not in self.participants:
|
||||
raise KeyError(f"Unknown recipient: {recipient}")
|
||||
self._get_checkpoint(checkpoint_id)
|
||||
event = MissionHandoff(
|
||||
sender=sender,
|
||||
recipient=recipient,
|
||||
checkpoint_id=checkpoint_id,
|
||||
note=note,
|
||||
sequence=self._next_sequence(),
|
||||
)
|
||||
self.events.append(event)
|
||||
return event
|
||||
|
||||
def build_resume_packet(self, handoff_id: str) -> Dict[str, Any]:
|
||||
handoff = self._get_handoff(handoff_id)
|
||||
checkpoint = self._get_checkpoint(handoff.checkpoint_id)
|
||||
return {
|
||||
"mission_id": self.mission_id,
|
||||
"title": self.title,
|
||||
"recipient": handoff.recipient,
|
||||
"sender": handoff.sender,
|
||||
"handoff_note": handoff.note,
|
||||
"checkpoint": checkpoint.to_dict(),
|
||||
"participants": {name: participant.to_dict() for name, participant in self.participants.items()},
|
||||
"isolation_profiles": [profile.to_dict() for profile in self.isolation_profiles],
|
||||
"stream_length": len(self.events),
|
||||
}
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"mission_id": self.mission_id,
|
||||
"title": self.title,
|
||||
"config": self.config,
|
||||
"participants": {name: participant.to_dict() for name, participant in self.participants.items()},
|
||||
"events": [event.to_dict() for event in self.events],
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "MissionBus":
|
||||
bus = cls(data["mission_id"], title=data.get("title", ""), config=data.get("config"))
|
||||
for name, participant_data in data.get("participants", {}).items():
|
||||
bus.participants[name] = MissionParticipant.from_dict(participant_data)
|
||||
bus.events = [event_from_dict(event_data) for event_data in data.get("events", [])]
|
||||
return bus
|
||||
377
scripts/resurrection_pool.py
Normal file
377
scripts/resurrection_pool.py
Normal file
@@ -0,0 +1,377 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Resurrection Pool — health polling, dead-agent detection, and revival planning.
|
||||
|
||||
Grounded implementation slice for #882.
|
||||
Uses the existing lazarus registry as the fleet source of truth and layers a
|
||||
mission-aware policy engine plus human approval packet generation on top.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import subprocess
|
||||
import urllib.request
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import yaml
|
||||
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
REGISTRY_PATH = ROOT / "lazarus-registry.yaml"
|
||||
POLICY_PATH = ROOT / "config" / "resurrection_pool.json"
|
||||
STATE_PATH = Path("/var/lib/lazarus/resurrection_pool_state.json")
|
||||
LOCAL_HOSTS = {"127.0.0.1", "localhost", "104.131.15.18"}
|
||||
ISSUE_NUMBER = 882
|
||||
|
||||
|
||||
def shell(cmd: str, timeout: int = 30) -> tuple[int, str, str]:
|
||||
try:
|
||||
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout)
|
||||
return result.returncode, result.stdout.strip(), result.stderr.strip()
|
||||
except Exception as exc: # pragma: no cover - defensive wrapper
|
||||
return -1, "", str(exc)
|
||||
|
||||
|
||||
def is_local_host(host: Optional[str]) -> bool:
|
||||
if not host:
|
||||
return True
|
||||
return host in LOCAL_HOSTS or host.startswith("127.")
|
||||
|
||||
|
||||
def ping_http(url: str, timeout: int = 10) -> tuple[bool, int]:
|
||||
try:
|
||||
req = urllib.request.Request(url, method="HEAD")
|
||||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||||
return True, resp.status
|
||||
except urllib.error.HTTPError as err:
|
||||
return True, err.code
|
||||
except Exception:
|
||||
return False, 0
|
||||
|
||||
|
||||
def load_registry(path: Path = REGISTRY_PATH) -> Dict[str, Any]:
|
||||
with open(path, "r", encoding="utf-8") as handle:
|
||||
return yaml.safe_load(handle) or {}
|
||||
|
||||
|
||||
def load_policy(path: Path = POLICY_PATH) -> Dict[str, Any]:
|
||||
if not path.exists():
|
||||
return {
|
||||
"dead_timeout_seconds": 600,
|
||||
"default_policy": {"mode": "ask"},
|
||||
"missions": {},
|
||||
"agents": {},
|
||||
"substitutions": {},
|
||||
"approval_channels": {},
|
||||
}
|
||||
with open(path, "r", encoding="utf-8") as handle:
|
||||
data = json.load(handle)
|
||||
data.setdefault("dead_timeout_seconds", 600)
|
||||
data.setdefault("default_policy", {"mode": "ask"})
|
||||
data.setdefault("missions", {})
|
||||
data.setdefault("agents", {})
|
||||
data.setdefault("substitutions", {})
|
||||
data.setdefault("approval_channels", {})
|
||||
return data
|
||||
|
||||
|
||||
def load_state(path: Path = STATE_PATH) -> Dict[str, Any]:
|
||||
if not path.exists():
|
||||
return {}
|
||||
with open(path, "r", encoding="utf-8") as handle:
|
||||
return json.load(handle)
|
||||
|
||||
|
||||
def save_state(state: Dict[str, Any], path: Path = STATE_PATH) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(path, "w", encoding="utf-8") as handle:
|
||||
json.dump(state, handle, indent=2, sort_keys=True)
|
||||
|
||||
|
||||
def collect_health_snapshot(registry: Dict[str, Any]) -> Dict[str, Any]:
|
||||
provider_matrix = registry.get("provider_health_matrix", {})
|
||||
fleet = registry.get("fleet", {})
|
||||
snapshot: Dict[str, Any] = {}
|
||||
|
||||
for agent_name, spec in fleet.items():
|
||||
primary = spec.get("primary", {})
|
||||
provider_name = primary.get("provider")
|
||||
provider_status = provider_matrix.get(provider_name, {}).get("status", "unknown")
|
||||
gateway_url = spec.get("health_endpoints", {}).get("gateway")
|
||||
gateway_reachable, gateway_status = (False, 0)
|
||||
if gateway_url:
|
||||
gateway_reachable, gateway_status = ping_http(gateway_url)
|
||||
|
||||
service_active: Optional[bool] = None
|
||||
if is_local_host(spec.get("host")):
|
||||
service_code, _, _ = shell(f"systemctl is-active hermes-{agent_name}.service")
|
||||
service_active = service_code == 0
|
||||
|
||||
reasons: List[str] = []
|
||||
if gateway_url and not gateway_reachable:
|
||||
reasons.append("gateway_unreachable")
|
||||
if service_active is False:
|
||||
reasons.append("service_inactive")
|
||||
if provider_status in {"dead", "degraded"}:
|
||||
reasons.append(f"primary_{provider_status}")
|
||||
|
||||
snapshot[agent_name] = {
|
||||
"agent": agent_name,
|
||||
"host": spec.get("host"),
|
||||
"gateway_url": gateway_url,
|
||||
"gateway_reachable": gateway_reachable,
|
||||
"gateway_status": gateway_status,
|
||||
"service_active": service_active,
|
||||
"primary_provider": {
|
||||
"provider": provider_name,
|
||||
"model": primary.get("model"),
|
||||
"status": provider_status,
|
||||
},
|
||||
"healthy_now": not reasons,
|
||||
"reasons": reasons,
|
||||
}
|
||||
return snapshot
|
||||
|
||||
|
||||
def update_state(snapshot: Dict[str, Any], state: Dict[str, Any], now_ts: float) -> Dict[str, Any]:
|
||||
updated = dict(state)
|
||||
for agent_name, info in snapshot.items():
|
||||
entry = dict(updated.get(agent_name, {}))
|
||||
entry["last_checked_at"] = now_ts
|
||||
entry["last_reasons"] = list(info.get("reasons", []))
|
||||
if info.get("healthy_now"):
|
||||
entry["last_healthy_at"] = now_ts
|
||||
else:
|
||||
entry.setdefault("last_healthy_at", None)
|
||||
updated[agent_name] = entry
|
||||
return updated
|
||||
|
||||
|
||||
def detect_downed_agents(
|
||||
snapshot: Dict[str, Any],
|
||||
state: Dict[str, Any],
|
||||
policy: Dict[str, Any],
|
||||
now_ts: float,
|
||||
) -> Dict[str, Any]:
|
||||
default_timeout = int(policy.get("dead_timeout_seconds", 600))
|
||||
agent_overrides = policy.get("agents", {})
|
||||
detected: Dict[str, Any] = {}
|
||||
|
||||
for agent_name, info in snapshot.items():
|
||||
timeout_seconds = int(agent_overrides.get(agent_name, {}).get("dead_timeout_seconds", default_timeout))
|
||||
last_healthy_at = state.get(agent_name, {}).get("last_healthy_at")
|
||||
if info.get("healthy_now"):
|
||||
unhealthy_for_seconds = 0.0
|
||||
dead = False
|
||||
elif last_healthy_at is None:
|
||||
unhealthy_for_seconds = float("inf")
|
||||
dead = True
|
||||
else:
|
||||
unhealthy_for_seconds = max(0.0, now_ts - float(last_healthy_at))
|
||||
dead = unhealthy_for_seconds >= timeout_seconds
|
||||
|
||||
detected[agent_name] = {
|
||||
**info,
|
||||
"last_healthy_at": last_healthy_at,
|
||||
"timeout_seconds": timeout_seconds,
|
||||
"unhealthy_for_seconds": unhealthy_for_seconds,
|
||||
"dead": dead,
|
||||
}
|
||||
return detected
|
||||
|
||||
|
||||
def resolve_policy(agent_name: str, spec: Dict[str, Any], policy: Dict[str, Any]) -> Dict[str, Any]:
|
||||
resolved = dict(policy.get("default_policy", {}))
|
||||
spec_mission = spec.get("mission")
|
||||
agent_override = dict(policy.get("agents", {}).get(agent_name, {}))
|
||||
resolved_mission = agent_override.get("mission") or spec_mission or agent_name
|
||||
if resolved_mission in policy.get("missions", {}):
|
||||
resolved.update(policy["missions"][resolved_mission])
|
||||
resolved.update(agent_override)
|
||||
resolved.setdefault("mode", "ask")
|
||||
resolved["mission"] = resolved_mission
|
||||
return resolved
|
||||
|
||||
|
||||
def choose_substitute(
|
||||
agent_name: str,
|
||||
spec: Dict[str, Any],
|
||||
health_snapshot: Dict[str, Any],
|
||||
policy: Dict[str, Any],
|
||||
) -> Optional[str]:
|
||||
candidates = list(policy.get("substitutions", {}).get(agent_name, []))
|
||||
candidates.extend(spec.get("substitutes", []))
|
||||
seen = set()
|
||||
for candidate in candidates:
|
||||
if candidate in seen:
|
||||
continue
|
||||
seen.add(candidate)
|
||||
candidate_health = health_snapshot.get(candidate, {})
|
||||
if candidate_health.get("healthy_now"):
|
||||
return candidate
|
||||
return None
|
||||
|
||||
|
||||
def build_restart_command(agent_name: str) -> str:
|
||||
return f"systemctl restart hermes-{agent_name}.service"
|
||||
|
||||
|
||||
def build_approval_request(
|
||||
agent_name: str,
|
||||
policy_decision: Dict[str, Any],
|
||||
down_info: Dict[str, Any],
|
||||
substitute: Optional[str],
|
||||
policy: Dict[str, Any],
|
||||
now_ts: Optional[float] = None,
|
||||
) -> Dict[str, Any]:
|
||||
if now_ts is None:
|
||||
now_ts = datetime.now(timezone.utc).timestamp()
|
||||
reasons = ", ".join(down_info.get("reasons", [])) or "no health signal"
|
||||
mission = policy_decision.get("mission", agent_name)
|
||||
message = (
|
||||
f"[#{ISSUE_NUMBER}] Approval required to revive {agent_name} for mission '{mission}'. "
|
||||
f"Reasons: {reasons}. "
|
||||
f"Suggested substitute: {substitute or 'none available'}."
|
||||
)
|
||||
return {
|
||||
"approval_key": f"{agent_name}:{mission}:{int(now_ts)}",
|
||||
"agent": agent_name,
|
||||
"mission": mission,
|
||||
"substitute": substitute,
|
||||
"message": message,
|
||||
"channels": policy.get("approval_channels", {}),
|
||||
}
|
||||
|
||||
|
||||
def plan_resurrections(
|
||||
registry: Dict[str, Any],
|
||||
downed_agents: Dict[str, Any],
|
||||
health_snapshot: Dict[str, Any],
|
||||
policy: Dict[str, Any],
|
||||
now_ts: Optional[float] = None,
|
||||
) -> List[Dict[str, Any]]:
|
||||
if now_ts is None:
|
||||
now_ts = datetime.now(timezone.utc).timestamp()
|
||||
fleet = registry.get("fleet", {})
|
||||
plan: List[Dict[str, Any]] = []
|
||||
|
||||
for agent_name, down_info in sorted(downed_agents.items()):
|
||||
if not down_info.get("dead"):
|
||||
continue
|
||||
spec = fleet.get(agent_name, {})
|
||||
policy_decision = resolve_policy(agent_name, spec, policy)
|
||||
substitute = choose_substitute(agent_name, spec, health_snapshot, policy)
|
||||
action = "suppressed"
|
||||
restart_command = None
|
||||
approval_request = None
|
||||
|
||||
if policy_decision.get("mode") == "yes":
|
||||
if is_local_host(spec.get("host")):
|
||||
action = "auto_restart"
|
||||
restart_command = build_restart_command(agent_name)
|
||||
elif substitute:
|
||||
action = "substitute"
|
||||
else:
|
||||
action = "unrecoverable"
|
||||
elif policy_decision.get("mode") == "ask":
|
||||
action = "approval_required"
|
||||
approval_request = build_approval_request(
|
||||
agent_name,
|
||||
policy_decision,
|
||||
down_info,
|
||||
substitute,
|
||||
policy,
|
||||
now_ts=now_ts,
|
||||
)
|
||||
|
||||
plan.append(
|
||||
{
|
||||
"agent": agent_name,
|
||||
"mission": policy_decision.get("mission"),
|
||||
"policy": policy_decision,
|
||||
"reasons": list(down_info.get("reasons", [])),
|
||||
"timeout_seconds": down_info.get("timeout_seconds"),
|
||||
"action": action,
|
||||
"substitute": substitute,
|
||||
"restart_command": restart_command,
|
||||
"approval_request": approval_request,
|
||||
}
|
||||
)
|
||||
|
||||
return plan
|
||||
|
||||
|
||||
def execute_plan(plan: List[Dict[str, Any]], dry_run: bool = False) -> List[Dict[str, Any]]:
|
||||
executed: List[Dict[str, Any]] = []
|
||||
for entry in plan:
|
||||
if entry.get("action") != "auto_restart":
|
||||
executed.append({**entry, "executed": False})
|
||||
continue
|
||||
cmd = entry.get("restart_command")
|
||||
if dry_run or not cmd:
|
||||
executed.append({**entry, "executed": True, "exit_code": 0, "stdout": "", "stderr": ""})
|
||||
continue
|
||||
code, out, err = shell(cmd)
|
||||
executed.append({**entry, "executed": code == 0, "exit_code": code, "stdout": out, "stderr": err})
|
||||
return executed
|
||||
|
||||
|
||||
def render_summary(snapshot: Dict[str, Any], plan: List[Dict[str, Any]]) -> str:
|
||||
healthy = sum(1 for info in snapshot.values() if info.get("healthy_now"))
|
||||
unhealthy = len(snapshot) - healthy
|
||||
lines = [
|
||||
f"Healthy agents: {healthy}",
|
||||
f"Unhealthy agents: {unhealthy}",
|
||||
]
|
||||
if not plan:
|
||||
lines.append("Resurrection plan: no dead agents exceed timeout.")
|
||||
return "\n".join(lines)
|
||||
lines.append("Resurrection plan:")
|
||||
for entry in plan:
|
||||
lines.append(
|
||||
f"- {entry['agent']}: {entry['action']}"
|
||||
f" (mission={entry['mission']}, reasons={', '.join(entry['reasons']) or 'none'})"
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description="Resurrection Pool")
|
||||
parser.add_argument("--registry", type=Path, default=REGISTRY_PATH)
|
||||
parser.add_argument("--policy", type=Path, default=POLICY_PATH)
|
||||
parser.add_argument("--state", type=Path, default=STATE_PATH)
|
||||
parser.add_argument("--json", action="store_true")
|
||||
parser.add_argument("--execute", action="store_true")
|
||||
parser.add_argument("--dry-run", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
now_ts = datetime.now(timezone.utc).timestamp()
|
||||
registry = load_registry(args.registry)
|
||||
policy = load_policy(args.policy)
|
||||
prior_state = load_state(args.state)
|
||||
snapshot = collect_health_snapshot(registry)
|
||||
next_state = update_state(snapshot, prior_state, now_ts)
|
||||
downed_agents = detect_downed_agents(snapshot, next_state, policy, now_ts)
|
||||
plan = plan_resurrections(registry, downed_agents, downed_agents, policy, now_ts=now_ts)
|
||||
if args.execute:
|
||||
plan = execute_plan(plan, dry_run=args.dry_run)
|
||||
if not args.dry_run:
|
||||
save_state(next_state, args.state)
|
||||
|
||||
payload = {
|
||||
"checked_at": datetime.fromtimestamp(now_ts, tz=timezone.utc).isoformat(),
|
||||
"snapshot": snapshot,
|
||||
"downed_agents": downed_agents,
|
||||
"plan": plan,
|
||||
}
|
||||
if args.json:
|
||||
print(json.dumps(payload, indent=2, sort_keys=True))
|
||||
else:
|
||||
print(render_summary(snapshot, plan))
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
@@ -1,107 +0,0 @@
|
||||
from importlib import util
|
||||
from pathlib import Path
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
MODULE_PATH = ROOT / "nexus" / "mission_bus.py"
|
||||
CONFIG_PATH = ROOT / "config" / "mission_bus_profiles.json"
|
||||
|
||||
|
||||
def load_module():
|
||||
spec = util.spec_from_file_location("mission_bus", MODULE_PATH)
|
||||
module = util.module_from_spec(spec)
|
||||
assert spec.loader is not None
|
||||
sys.modules[spec.name] = module
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
|
||||
def build_bus(module):
|
||||
profiles = module.load_profiles(CONFIG_PATH)
|
||||
bus = module.MissionBus("mission-883", title="multi-agent teaming", config=profiles)
|
||||
bus.register_participant("timmy", module.MissionRole.LEAD)
|
||||
bus.register_participant("ezra", module.MissionRole.WRITE)
|
||||
bus.register_participant("bezalel", module.MissionRole.READ)
|
||||
bus.register_participant("allegro", module.MissionRole.AUDIT)
|
||||
return bus
|
||||
|
||||
|
||||
def test_role_permissions_gate_publish_checkpoint_and_handoff():
|
||||
module = load_module()
|
||||
bus = build_bus(module)
|
||||
|
||||
assert bus.allowed("timmy", "publish") is True
|
||||
assert bus.allowed("ezra", "handoff") is True
|
||||
assert bus.allowed("allegro", "audit") is True
|
||||
assert bus.allowed("bezalel", "publish") is False
|
||||
|
||||
with pytest.raises(PermissionError):
|
||||
bus.publish("bezalel", "mission.notes", {"text": "should fail"})
|
||||
|
||||
with pytest.raises(PermissionError):
|
||||
bus.create_checkpoint("allegro", summary="audit cannot checkpoint", state={})
|
||||
|
||||
|
||||
def test_mission_bus_unified_stream_records_messages_checkpoints_and_handoffs():
|
||||
module = load_module()
|
||||
bus = build_bus(module)
|
||||
|
||||
msg = bus.publish("timmy", "mission.start", {"goal": "build the slice"})
|
||||
checkpoint = bus.create_checkpoint(
|
||||
"ezra",
|
||||
summary="checkpoint before lead review",
|
||||
state={"branch": "fix/883", "files": ["nexus/mission_bus.py"]},
|
||||
artifacts=["docs/mission-bus.md"],
|
||||
)
|
||||
handoff = bus.handoff("ezra", "timmy", checkpoint.checkpoint_id, note="ready for lead review")
|
||||
|
||||
assert [event.event_type for event in bus.events] == ["message", "checkpoint", "handoff"]
|
||||
assert [event.sequence for event in bus.events] == [1, 2, 3]
|
||||
assert msg.topic == "mission.start"
|
||||
assert handoff.recipient == "timmy"
|
||||
|
||||
|
||||
def test_handoff_resume_packet_contains_checkpoint_state_and_participants():
|
||||
module = load_module()
|
||||
bus = build_bus(module)
|
||||
checkpoint = bus.create_checkpoint(
|
||||
"ezra",
|
||||
summary="handoff package",
|
||||
state={"branch": "fix/883", "tests": ["tests/test_mission_bus.py"]},
|
||||
artifacts=["config/mission_bus_profiles.json"],
|
||||
)
|
||||
handoff = bus.handoff("ezra", "timmy", checkpoint.checkpoint_id, note="pick up from here")
|
||||
|
||||
packet = bus.build_resume_packet(handoff.handoff_id)
|
||||
assert packet["recipient"] == "timmy"
|
||||
assert packet["checkpoint"]["state"]["branch"] == "fix/883"
|
||||
assert packet["checkpoint"]["artifacts"] == ["config/mission_bus_profiles.json"]
|
||||
assert packet["participants"]["ezra"]["role"] == "write"
|
||||
assert packet["handoff_note"] == "pick up from here"
|
||||
|
||||
|
||||
def test_profiles_define_level2_mount_namespace_and_level3_rootless_podman():
|
||||
module = load_module()
|
||||
profiles = module.load_profiles(CONFIG_PATH)
|
||||
|
||||
levels = {entry["level"]: entry["mechanism"] for entry in profiles["isolation_profiles"]}
|
||||
assert levels[2] == "mount_namespace"
|
||||
assert levels[3] == "rootless_podman"
|
||||
assert profiles["roles"]["audit"] == ["read", "audit"]
|
||||
|
||||
|
||||
def test_mission_bus_roundtrip_preserves_events_and_isolation_profile():
|
||||
module = load_module()
|
||||
bus = build_bus(module)
|
||||
bus.publish("timmy", "mission.start", {"goal": "roundtrip"})
|
||||
checkpoint = bus.create_checkpoint("ezra", summary="save state", state={"count": 1})
|
||||
bus.handoff("ezra", "timmy", checkpoint.checkpoint_id, note="resume")
|
||||
|
||||
restored = module.MissionBus.from_dict(bus.to_dict())
|
||||
assert restored.mission_id == "mission-883"
|
||||
assert restored.events[-1].event_type == "handoff"
|
||||
assert restored.events[-1].note == "resume"
|
||||
assert restored.isolation_profiles[1].mechanism == "mount_namespace"
|
||||
118
tests/test_resurrection_pool.py
Normal file
118
tests/test_resurrection_pool.py
Normal file
@@ -0,0 +1,118 @@
|
||||
from importlib import util
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
MODULE_PATH = ROOT / "scripts" / "resurrection_pool.py"
|
||||
|
||||
|
||||
def load_module():
|
||||
spec = util.spec_from_file_location("resurrection_pool", MODULE_PATH)
|
||||
module = util.module_from_spec(spec)
|
||||
assert spec.loader is not None
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
|
||||
def test_detect_downed_agents_respects_configurable_timeout():
|
||||
pool = load_module()
|
||||
snapshot = {
|
||||
"bezalel": {"healthy_now": False, "reasons": ["gateway_unreachable"]},
|
||||
"timmy": {"healthy_now": True, "reasons": []},
|
||||
}
|
||||
state = {
|
||||
"bezalel": {"last_healthy_at": 100.0},
|
||||
"timmy": {"last_healthy_at": 650.0},
|
||||
}
|
||||
policy = {"dead_timeout_seconds": 600, "agents": {}}
|
||||
|
||||
not_dead = pool.detect_downed_agents(snapshot, state, policy, now_ts=650.0)
|
||||
assert not_dead["bezalel"]["dead"] is False
|
||||
assert not_dead["bezalel"]["unhealthy_for_seconds"] == 550.0
|
||||
|
||||
dead = pool.detect_downed_agents(snapshot, state, policy, now_ts=701.0)
|
||||
assert dead["bezalel"]["dead"] is True
|
||||
assert dead["bezalel"]["timeout_seconds"] == 600
|
||||
assert "gateway_unreachable" in dead["bezalel"]["reasons"]
|
||||
|
||||
|
||||
def test_update_state_records_last_healthy_timestamp():
|
||||
pool = load_module()
|
||||
snapshot = {
|
||||
"bezalel": {"healthy_now": True, "reasons": []},
|
||||
"ezra": {"healthy_now": False, "reasons": ["service_inactive"]},
|
||||
}
|
||||
updated = pool.update_state(snapshot, {}, now_ts=1234.5)
|
||||
assert updated["bezalel"]["last_healthy_at"] == 1234.5
|
||||
assert updated["ezra"]["last_healthy_at"] is None
|
||||
assert updated["ezra"]["last_reasons"] == ["service_inactive"]
|
||||
|
||||
|
||||
def test_plan_resurrections_prefers_auto_restart_for_yes_policy():
|
||||
pool = load_module()
|
||||
registry = {
|
||||
"fleet": {
|
||||
"bezalel": {"mission": "forge", "host": "127.0.0.1"},
|
||||
"allegro": {"mission": "forge", "host": "203.0.113.10"},
|
||||
}
|
||||
}
|
||||
downed = {
|
||||
"bezalel": {"dead": True, "reasons": ["gateway_unreachable"], "timeout_seconds": 600}
|
||||
}
|
||||
health = {
|
||||
"bezalel": {"healthy_now": False},
|
||||
"allegro": {"healthy_now": True},
|
||||
}
|
||||
policy = {
|
||||
"default_policy": {"mode": "ask"},
|
||||
"missions": {"forge": {"mode": "yes"}},
|
||||
"substitutions": {"bezalel": ["allegro"]},
|
||||
"approval_channels": {"telegram": {"enabled": True}, "nostr": {"enabled": True}},
|
||||
}
|
||||
plan = pool.plan_resurrections(registry, downed, health, policy, now_ts=2000.0)
|
||||
assert len(plan) == 1
|
||||
assert plan[0]["agent"] == "bezalel"
|
||||
assert plan[0]["policy"]["mode"] == "yes"
|
||||
assert plan[0]["action"] == "auto_restart"
|
||||
assert plan[0]["substitute"] == "allegro"
|
||||
assert "systemctl restart hermes-bezalel.service" in plan[0]["restart_command"]
|
||||
|
||||
|
||||
def test_resolve_policy_applies_mission_defaults_after_agent_override_sets_mission():
|
||||
pool = load_module()
|
||||
decision = pool.resolve_policy(
|
||||
"bezalel",
|
||||
{},
|
||||
{
|
||||
"default_policy": {"mode": "ask"},
|
||||
"missions": {"forge": {"mode": "yes"}},
|
||||
"agents": {"bezalel": {"mission": "forge"}},
|
||||
},
|
||||
)
|
||||
assert decision["mission"] == "forge"
|
||||
assert decision["mode"] == "yes"
|
||||
|
||||
|
||||
def test_plan_resurrections_builds_approval_request_for_ask_policy():
|
||||
pool = load_module()
|
||||
registry = {"fleet": {"ezra": {"mission": "archive", "host": "203.0.113.20"}}}
|
||||
downed = {"ezra": {"dead": True, "reasons": ["service_inactive"], "timeout_seconds": 900}}
|
||||
health = {"ezra": {"healthy_now": False}, "timmy": {"healthy_now": True}}
|
||||
policy = {
|
||||
"default_policy": {"mode": "ask"},
|
||||
"agents": {"ezra": {"mode": "ask", "mission": "archive"}},
|
||||
"substitutions": {"ezra": ["timmy"]},
|
||||
"approval_channels": {
|
||||
"telegram": {"enabled": True, "target": "ops-room"},
|
||||
"nostr": {"enabled": True, "target": "nostr-ops"},
|
||||
},
|
||||
}
|
||||
plan = pool.plan_resurrections(registry, downed, health, policy, now_ts=3000.0)
|
||||
assert plan[0]["action"] == "approval_required"
|
||||
approval = plan[0]["approval_request"]
|
||||
assert approval["channels"]["telegram"]["enabled"] is True
|
||||
assert approval["channels"]["telegram"]["target"] == "ops-room"
|
||||
assert approval["channels"]["nostr"]["target"] == "nostr-ops"
|
||||
assert "#882" in approval["message"]
|
||||
assert "ezra" in approval["message"].lower()
|
||||
assert approval["substitute"] == "timmy"
|
||||
Reference in New Issue
Block a user