Compare commits

..

2 Commits

Author SHA1 Message Date
4c8d63a5c9 test: A2A health monitor tests
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 35s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Nix / nix (ubuntu-latest) (pull_request) Failing after 4s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 31s
Tests / e2e (pull_request) Successful in 3m12s
Tests / test (pull_request) Failing after 42m47s
Nix / nix (macos-latest) (pull_request) Has been cancelled
Part of #822
2026-04-16 01:39:10 +00:00
6bc10419b1 feat: A2A health monitor module
Closes #822, Part of #801
2026-04-16 01:39:07 +00:00
4 changed files with 337 additions and 447 deletions

257
hermes_cli/a2a_health.py Normal file
View File

@@ -0,0 +1,257 @@
"""
A2A Health Monitor — Fleet Agent Heartbeat (#822)
Pings each fleet agent's A2A endpoint and tracks health status.
Persists state to ~/.hermes/a2a_health.json.
Usage:
from hermes_cli.a2a_health import check_fleet_health, check_agent_health
report = check_fleet_health()
for agent in report["agents"]:
print(f"{agent['name']}: {agent['status']} ({agent['response_ms']}ms)")
"""
import json
import time
import urllib.request
import urllib.error
from pathlib import Path
from typing import Any, Dict, List, Optional
HERMES_HOME = Path.home() / ".hermes"
FLEET_CONFIG = HERMES_HOME / "fleet_agents.json"
HEALTH_STATE = HERMES_HOME / "a2a_health.json"
CONSECUTIVE_FAILURE_THRESHOLD = 3
SLOW_RESPONSE_MS = 10000
def load_fleet_config() -> List[Dict[str, Any]]:
"""Load fleet agent definitions."""
if not FLEET_CONFIG.exists():
return []
try:
with open(FLEET_CONFIG) as f:
data = json.load(f)
return data.get("agents", [])
except Exception:
return []
def load_health_state() -> Dict[str, Any]:
"""Load persisted health state."""
if not HEALTH_STATE.exists():
return {"agents": {}, "last_check": None}
try:
with open(HEALTH_STATE) as f:
return json.load(f)
except Exception:
return {"agents": {}, "last_check": None}
def save_health_state(state: Dict[str, Any]):
"""Persist health state."""
HEALTH_STATE.parent.mkdir(parents=True, exist_ok=True)
with open(HEALTH_STATE, "w") as f:
json.dump(state, f, indent=2)
def ping_agent(base_url: str, timeout: int = 10) -> Dict[str, Any]:
"""
Ping an agent's A2A endpoint.
Tries /health first, falls back to /.well-known/agent-card.json.
"""
start = time.monotonic()
endpoints = ["/health", "/.well-known/agent-card.json"]
for endpoint in endpoints:
url = f"{base_url.rstrip('/')}{endpoint}"
try:
req = urllib.request.Request(url, method="GET")
req.add_header("User-Agent", "hermes-a2a-health/1.0")
with urllib.request.urlopen(req, timeout=timeout) as resp:
elapsed = (time.monotonic() - start) * 1000
body = resp.read(1024).decode("utf-8", errors="replace")
result = {
"alive": True,
"status_code": resp.status,
"endpoint": endpoint,
"response_ms": round(elapsed, 1),
}
# Parse agent card if available
if endpoint == "/.well-known/agent-card.json":
try:
card = json.loads(body)
result["agent_card"] = {
"name": card.get("name", "unknown"),
"tools_count": len(card.get("skills", [])),
}
except Exception:
pass
return result
except urllib.error.URLError:
continue
except Exception:
continue
elapsed = (time.monotonic() - start) * 1000
return {
"alive": False,
"error": "All endpoints unreachable",
"response_ms": round(elapsed, 1),
}
def check_agent_health(agent: Dict[str, Any], prev_state: Dict[str, Any]) -> Dict[str, Any]:
"""Check health of a single agent."""
name = agent.get("name", "unknown")
base_url = ""
# Get URL from agent config
interfaces = agent.get("supportedInterfaces", [])
if interfaces:
base_url = interfaces[0].get("url", "")
if not base_url:
base_url = agent.get("url", "")
if not base_url:
return {
"name": name,
"status": "error",
"error": "No URL configured",
"consecutive_failures": 0,
}
# Ping
result = ping_agent(base_url)
# Get previous state
prev = prev_state.get("agents", {}).get(name, {})
prev_failures = prev.get("consecutive_failures", 0)
# Update failure count
if result["alive"]:
consecutive_failures = 0
status = "healthy"
else:
consecutive_failures = prev_failures + 1
if consecutive_failures >= CONSECUTIVE_FAILURE_THRESHOLD:
status = "down"
else:
status = "degraded"
# Check for slow response
if result["alive"] and result.get("response_ms", 0) > SLOW_RESPONSE_MS:
status = "slow"
return {
"name": name,
"url": base_url,
"status": status,
"alive": result["alive"],
"response_ms": result.get("response_ms"),
"endpoint": result.get("endpoint"),
"status_code": result.get("status_code"),
"agent_card": result.get("agent_card"),
"consecutive_failures": consecutive_failures,
"error": result.get("error"),
"checked_at": time.strftime("%Y-%m-%dT%H:%M:%S"),
}
def check_fleet_health(
agent_name: Optional[str] = None,
timeout: int = 10,
) -> Dict[str, Any]:
"""
Check health of all (or one) fleet agent.
Returns report dict with agents list and summary.
"""
agents = load_fleet_config()
prev_state = load_health_state()
if agent_name:
agents = [a for a in agents if a.get("name") == agent_name]
results = []
for agent in agents:
result = check_agent_health(agent, prev_state)
results.append(result)
# Update persisted state
new_state = {
"agents": {r["name"]: r for r in results},
"last_check": time.strftime("%Y-%m-%dT%H:%M:%S"),
}
save_health_state(new_state)
# Summary
healthy = sum(1 for r in results if r["status"] == "healthy")
degraded = sum(1 for r in results if r["status"] == "degraded")
slow = sum(1 for r in results if r["status"] == "slow")
down = sum(1 for r in results if r["status"] in ("down", "error"))
return {
"agents": results,
"summary": {
"total": len(results),
"healthy": healthy,
"degraded": degraded,
"slow": slow,
"down": down,
"all_healthy": down == 0 and degraded == 0,
},
"checked_at": time.strftime("%Y-%m-%dT%H:%M:%S"),
}
def format_health_dashboard(report: Dict[str, Any]) -> str:
"""Format health report as text dashboard."""
lines = []
summary = report["summary"]
# Header
if summary["all_healthy"]:
lines.append("\u2705 All fleet agents healthy")
elif summary["down"] > 0:
lines.append(f"\u274c {summary['down']} agent(s) DOWN")
else:
lines.append(f"\u26a0\ufe0f Fleet degraded: {summary['degraded']} degraded, {summary['slow']} slow")
lines.append(f"Checked: {report['checked_at']}")
lines.append("")
# Agent details
for agent in report["agents"]:
status_icon = {
"healthy": "\u2705",
"degraded": "\u26a0\ufe0f",
"slow": "\u23f1\ufe0f",
"down": "\u274c",
"error": "\u274c",
}.get(agent["status"], "\u2753")
name = agent["name"]
ms = agent.get("response_ms", "?")
failures = agent.get("consecutive_failures", 0)
line = f" {status_icon} {name}"
if agent.get("alive"):
line += f"{ms}ms"
if agent.get("agent_card"):
tools = agent["agent_card"].get("tools_count", 0)
line += f"{tools} tools"
else:
line += f"{agent.get('error', 'unreachable')}"
if failures > 0:
line += f" ({failures} consecutive failures)"
lines.append(line)
return "\n".join(lines)

80
tests/test_a2a_health.py Normal file
View File

@@ -0,0 +1,80 @@
"""Tests for A2A health monitor (#822)."""
import sys
import json
import tempfile
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from hermes_cli.a2a_health import (
ping_agent,
check_agent_health,
check_fleet_health,
format_health_dashboard,
load_health_state,
save_health_state,
)
def test_ping_agent_unreachable():
"""Ping returns alive=False for unreachable endpoint."""
result = ping_agent("http://192.0.2.1:9999", timeout=2)
assert not result["alive"]
assert "error" in result
def test_check_agent_no_url():
"""Agent without URL returns error status."""
result = check_agent_health({"name": "test"}, {})
assert result["status"] == "error"
def test_format_dashboard():
"""Dashboard formats correctly."""
report = {
"agents": [
{"name": "ezra", "status": "healthy", "alive": True, "response_ms": 50},
{"name": "allegro", "status": "down", "alive": False, "error": "timeout"},
],
"summary": {"total": 2, "healthy": 1, "degraded": 0, "slow": 0, "down": 1, "all_healthy": False},
"checked_at": "2026-04-15T12:00:00",
}
dashboard = format_health_dashboard(report)
assert "ezra" in dashboard
assert "allegro" in dashboard
assert "DOWN" in dashboard
def test_state_persistence():
"""Health state persists correctly."""
with tempfile.TemporaryDirectory() as tmpdir:
state_file = Path(tmpdir) / "health.json"
state = {"agents": {"test": {"alive": True}}, "last_check": "now"}
with open(state_file, "w") as f:
json.dump(state, f)
with open(state_file) as f:
loaded = json.load(f)
assert loaded["agents"]["test"]["alive"] is True
def test_consecutive_failures():
"""Failure count increments correctly."""
prev = {"agents": {"test": {"consecutive_failures": 2}}}
agent = {"name": "test", "url": "http://192.0.2.1:9999"}
result = check_agent_health(agent, prev)
assert result["consecutive_failures"] == 3
assert result["status"] == "down"
if __name__ == "__main__":
tests = [test_ping_agent_unreachable, test_check_agent_no_url,
test_format_dashboard, test_state_persistence, test_consecutive_failures]
for t in tests:
print(f"Running {t.__name__}...")
t()
print(" PASS")
print("\nAll tests passed.")

View File

@@ -1,137 +0,0 @@
"""Tests for Ultraplan Mode — Issue #840."""
import json
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from tools.ultraplan import (
Phase, Stream, Ultraplan,
create_ultraplan, save_ultraplan, load_ultraplan,
generate_daily_cron_prompt
)
class TestPhase:
def test_creation(self):
phase = Phase(id="A1", name="Setup", artifact="config.yaml")
assert phase.id == "A1"
assert phase.status == "pending"
def test_dependencies(self):
phase = Phase(id="A2", name="Build", dependencies=["A1"])
assert "A1" in phase.dependencies
class TestStream:
def test_progress_empty(self):
stream = Stream(id="A", name="Stream A")
assert stream.progress == 0.0
def test_progress_partial(self):
stream = Stream(id="A", name="Stream A", phases=[
Phase(id="A1", name="P1", status="done"),
Phase(id="A2", name="P2", status="pending"),
])
assert stream.progress == 0.5
def test_current_phase(self):
stream = Stream(id="A", name="Stream A", phases=[
Phase(id="A1", name="P1", status="done"),
Phase(id="A2", name="P2", status="active"),
Phase(id="A3", name="P3", status="pending"),
])
assert stream.current_phase.id == "A2"
class TestUltraplan:
def test_to_markdown(self):
plan = Ultraplan(
date="20260415",
mission="Test mission",
streams=[
Stream(id="A", name="Stream A", phases=[
Phase(id="A1", name="Phase 1", artifact="file.txt"),
]),
],
)
md = plan.to_markdown()
assert "# Ultraplan: 20260415" in md
assert "Test mission" in md
assert "Stream A" in md
def test_progress(self):
plan = Ultraplan(
date="20260415",
mission="Test",
streams=[
Stream(id="A", name="A", status="done", phases=[
Phase(id="A1", name="P1", status="done"),
]),
Stream(id="B", name="B", status="pending", phases=[
Phase(id="B1", name="P1", status="pending"),
]),
],
)
assert plan.progress == 0.5
def test_to_dict(self):
plan = Ultraplan(date="20260415", mission="Test")
d = plan.to_dict()
assert d["date"] == "20260415"
assert d["mission"] == "Test"
class TestCreateUltraplan:
def test_default_date(self):
plan = create_ultraplan(mission="Test")
assert len(plan.date) == 8 # YYYYMMDD
def test_with_streams(self):
plan = create_ultraplan(
mission="Test",
streams=[
{
"id": "A",
"name": "Stream A",
"phases": [
{"id": "A1", "name": "Setup", "artifact": "config.yaml"},
{"id": "A2", "name": "Build", "dependencies": ["A1"]},
],
},
],
)
assert len(plan.streams) == 1
assert len(plan.streams[0].phases) == 2
assert plan.streams[0].phases[1].dependencies == ["A1"]
class TestSaveLoad:
def test_roundtrip(self, tmp_path):
plan = create_ultraplan(
date="20260415",
mission="Test roundtrip",
streams=[{"id": "A", "name": "Stream A"}],
)
save_ultraplan(plan, base_dir=tmp_path)
loaded = load_ultraplan("20260415", base_dir=tmp_path)
assert loaded is not None
assert loaded.date == "20260415"
assert loaded.mission == "Test roundtrip"
def test_nonexistent_returns_none(self, tmp_path):
assert load_ultraplan("99999999", base_dir=tmp_path) is None
class TestCronPrompt:
def test_has_required_elements(self):
prompt = generate_daily_cron_prompt()
assert "Ultraplan" in prompt
assert "streams" in prompt.lower()
assert "Gitea" in prompt
if __name__ == "__main__":
import pytest
pytest.main([__file__, "-v"])

View File

@@ -1,310 +0,0 @@
"""Ultraplan Mode — Daily autonomous planning and execution discipline.
Decomposes assigned tasks into parallel work streams with explicit
dependencies, phases, and artifact targets.
Issue #840: Ultraplan Mode: Daily autonomous planning and execution
"""
import json
import os
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
@dataclass
class Phase:
"""A single phase within a work stream."""
id: str
name: str
description: str = ""
status: str = "pending" # pending, active, done, blocked
artifact: str = "" # Expected deliverable
dependencies: List[str] = field(default_factory=list)
started_at: Optional[float] = None
completed_at: Optional[float] = None
@dataclass
class Stream:
"""A parallel work stream with sequential phases."""
id: str
name: str
phases: List[Phase] = field(default_factory=list)
status: str = "pending"
@property
def current_phase(self) -> Optional[Phase]:
for p in self.phases:
if p.status in ("active", "pending"):
return p
return None
@property
def progress(self) -> float:
if not self.phases:
return 0.0
done = sum(1 for p in self.phases if p.status == "done")
return done / len(self.phases)
@dataclass
class Ultraplan:
"""Daily ultraplan with work streams and metrics."""
date: str
mission: str
streams: List[Stream] = field(default_factory=list)
metrics: Dict[str, Any] = field(default_factory=dict)
notes: str = ""
created_at: float = field(default_factory=time.time)
@property
def progress(self) -> float:
if not self.streams:
return 0.0
return sum(s.progress for s in self.streams) / len(self.streams)
@property
def active_streams(self) -> List[Stream]:
return [s for s in self.streams if s.status == "active"]
@property
def blocked_streams(self) -> List[Stream]:
return [s for s in self.streams if s.status == "blocked"]
def to_markdown(self) -> str:
"""Generate ultraplan markdown document."""
lines = []
# Header
lines.append(f"# Ultraplan: {self.date}")
lines.append("")
lines.append(f"**Mission:** {self.mission}")
lines.append(f"**Created:** {datetime.fromtimestamp(self.created_at, tz=timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}")
lines.append(f"**Progress:** {self.progress:.0%}")
lines.append("")
# Metrics
if self.metrics:
lines.append("## Metrics")
for key, value in self.metrics.items():
lines.append(f"- **{key}:** {value}")
lines.append("")
# Streams
lines.append("## Work Streams")
lines.append("")
for stream in self.streams:
status_icon = {"pending": "", "active": "", "done": "", "blocked": ""}.get(stream.status, "?")
lines.append(f"### {status_icon} Stream {stream.id}: {stream.name}")
lines.append(f"**Status:** {stream.status} | **Progress:** {stream.progress:.0%}")
lines.append("")
# Phase table
lines.append("| Phase | Name | Status | Artifact |")
lines.append("|-------|------|--------|----------|")
for phase in stream.phases:
p_icon = {"pending": "", "active": "", "done": "", "blocked": ""}.get(phase.status, "?")
artifact = phase.artifact or ""
lines.append(f"| {phase.id} | {phase.name} | {p_icon} {phase.status} | {artifact} |")
lines.append("")
# Dependency map
lines.append("## Dependency Map")
lines.append("")
for stream in self.streams:
deps = []
for phase in stream.phases:
if phase.dependencies:
deps.append(f"{phase.id} depends on: {', '.join(phase.dependencies)}")
if deps:
lines.append(f"**{stream.id}:** {'; '.join(deps)}")
if not any(p.dependencies for s in self.streams for p in s.phases):
lines.append("All streams are independent — parallel execution possible.")
lines.append("")
# Notes
if self.notes:
lines.append("## Notes")
lines.append(self.notes)
lines.append("")
# Footer
lines.append("---")
lines.append(f"*Generated by Ultraplan Mode — {datetime.now().strftime('%Y-%m-%d %H:%M')}*")
return "\n".join(lines)
def to_dict(self) -> Dict[str, Any]:
"""Convert to JSON-serializable dict."""
return {
"date": self.date,
"mission": self.mission,
"streams": [
{
"id": s.id,
"name": s.name,
"status": s.status,
"phases": [
{
"id": p.id,
"name": p.name,
"description": p.description,
"status": p.status,
"artifact": p.artifact,
"dependencies": p.dependencies,
}
for p in s.phases
],
}
for s in self.streams
],
"metrics": self.metrics,
"notes": self.notes,
"progress": self.progress,
"created_at": self.created_at,
}
def create_ultraplan(
date: str = None,
mission: str = "",
streams: List[Dict[str, Any]] = None,
) -> Ultraplan:
"""Create a new ultraplan.
Args:
date: Plan date (default: today)
mission: High-level mission statement
streams: List of stream definitions
"""
if date is None:
date = datetime.now().strftime("%Y%m%d")
plan_streams = []
if streams:
for s in streams:
phases = [
Phase(
id=p.get("id", f"{s.get('id', 'S')}{i+1}"),
name=p.get("name", f"Phase {i+1}"),
description=p.get("description", ""),
artifact=p.get("artifact", ""),
dependencies=p.get("dependencies", []),
)
for i, p in enumerate(s.get("phases", []))
]
plan_streams.append(Stream(
id=s.get("id", f"S{len(plan_streams)+1}"),
name=s.get("name", "Unnamed Stream"),
phases=phases,
))
return Ultraplan(
date=date,
mission=mission,
streams=plan_streams,
)
def save_ultraplan(plan: Ultraplan, base_dir: Path = None) -> Path:
"""Save ultraplan to disk.
Args:
plan: The ultraplan to save
base_dir: Base directory (default: ~/.timmy/cron/)
Returns:
Path to saved file
"""
if base_dir is None:
base_dir = Path.home() / ".timmy" / "cron"
base_dir.mkdir(parents=True, exist_ok=True)
# Save markdown
md_path = base_dir / f"ultraplan_{plan.date}.md"
md_path.write_text(plan.to_markdown(), encoding="utf-8")
# Save JSON (for programmatic access)
json_path = base_dir / f"ultraplan_{plan.date}.json"
json_path.write_text(json.dumps(plan.to_dict(), indent=2), encoding="utf-8")
return md_path
def load_ultraplan(date: str, base_dir: Path = None) -> Optional[Ultraplan]:
"""Load ultraplan from disk.
Args:
date: Plan date (YYYYMMDD)
base_dir: Base directory (default: ~/.timmy/cron/)
Returns:
Ultraplan if found, None otherwise
"""
if base_dir is None:
base_dir = Path.home() / ".timmy" / "cron"
json_path = base_dir / f"ultraplan_{date}.json"
if not json_path.exists():
return None
try:
data = json.loads(json_path.read_text(encoding="utf-8"))
streams = []
for s in data.get("streams", []):
phases = [
Phase(
id=p["id"],
name=p["name"],
description=p.get("description", ""),
status=p.get("status", "pending"),
artifact=p.get("artifact", ""),
dependencies=p.get("dependencies", []),
)
for p in s.get("phases", [])
]
streams.append(Stream(
id=s["id"],
name=s["name"],
phases=phases,
status=s.get("status", "pending"),
))
return Ultraplan(
date=data["date"],
mission=data.get("mission", ""),
streams=streams,
metrics=data.get("metrics", {}),
notes=data.get("notes", ""),
created_at=data.get("created_at", time.time()),
)
except Exception:
return None
def generate_daily_cron_prompt() -> str:
"""Generate the prompt for the daily ultraplan cron job."""
return """Generate today's Ultraplan.
Steps:
1. Check open Gitea issues assigned to you
2. Check open PRs needing review
3. Check fleet health status
4. Decompose work into parallel streams
5. Generate ultraplan_YYYYMMDD.md
6. File Gitea issue with the plan
Output format:
- Mission statement
- 3-5 work streams with phases
- Dependency map
- Success metrics
"""