Compare commits

...

2 Commits

Author SHA1 Message Date
2fba6dade4 feat: A2A health CLI command (#805)
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 38s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Nix / nix (ubuntu-latest) (pull_request) Failing after 2s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 22s
Tests / e2e (pull_request) Successful in 3m10s
Tests / test (pull_request) Failing after 53m41s
Nix / nix (macos-latest) (pull_request) Has been cancelled
hermes a2a health [--json] [--agent NAME]
2026-04-15 22:52:05 +00:00
d7670b98cf feat: A2A health check module (#805)
Core health monitor: ping endpoints, track failures, alert on down agents.
Reads fleet from ~/.hermes/fleet_agents.json, state from ~/.hermes/a2a_health.json.
2026-04-15 22:51:17 +00:00
2 changed files with 356 additions and 0 deletions

292
hermes_cli/a2a_health.py Normal file
View File

@@ -0,0 +1,292 @@
"""A2A Health Check — periodic heartbeat monitoring for fleet agents.
Pings each registered agent's A2A endpoint, records response time and status,
tracks consecutive failures, and fires alerts when thresholds are breached.
Usage:
from hermes_cli.a2a_health import HealthMonitor
monitor = HealthMonitor()
results = monitor.check_all()
monitor.print_dashboard(results)
"""
import json
import os
import time
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import urllib.request
import urllib.error
STATE_FILE = os.path.expanduser("~/.hermes/a2a_health.json")
FLEET_FILE = os.path.expanduser("~/.hermes/fleet_agents.json")
# Thresholds
CONSECUTIVE_FAILURE_THRESHOLD = 3
RESPONSE_TIME_THRESHOLD_SEC = 10.0
CHECK_INTERVAL_SEC = 300 # 5 minutes
@dataclass
class AgentStatus:
name: str
url: str
last_check: Optional[str] = None
last_healthy: Optional[str] = None
response_time_ms: Optional[float] = None
status: str = "unknown" # healthy | degraded | down | unknown
consecutive_failures: int = 0
available_tools: int = 0
agent_status: str = ""
error: Optional[str] = None
@dataclass
class HealthState:
agents: dict = field(default_factory=dict) # name -> AgentStatus dict
last_full_check: Optional[str] = None
alerts_sent: dict = field(default_factory=dict) # name -> timestamp of last alert
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _load_state() -> HealthState:
if os.path.exists(STATE_FILE):
try:
with open(STATE_FILE) as f:
data = json.load(f)
state = HealthState()
state.last_full_check = data.get("last_full_check")
state.alerts_sent = data.get("alerts_sent", {})
for name, agent_data in data.get("agents", {}).items():
status = AgentStatus(name=agent_data["name"], url=agent_data["url"])
status.last_check = agent_data.get("last_check")
status.last_healthy = agent_data.get("last_healthy")
status.response_time_ms = agent_data.get("response_time_ms")
status.status = agent_data.get("status", "unknown")
status.consecutive_failures = agent_data.get("consecutive_failures", 0)
status.available_tools = agent_data.get("available_tools", 0)
status.agent_status = agent_data.get("agent_status", "")
status.error = agent_data.get("error")
state.agents[name] = status
return state
except (json.JSONDecodeError, KeyError):
pass
return HealthState()
def _save_state(state: HealthState) -> None:
os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
data = {
"last_full_check": state.last_full_check,
"alerts_sent": state.alerts_sent,
"agents": {},
}
for name, status in state.agents.items():
data["agents"][name] = asdict(status)
with open(STATE_FILE, "w") as f:
json.dump(data, f, indent=2)
def _load_fleet() -> list[dict]:
"""Load fleet agent definitions from ~/.hermes/fleet_agents.json.
Format:
[
{"name": "ezra", "url": "http://143.198.27.163:8080"},
{"name": "bezalel", "url": "http://167.99.126.228:8080"},
{"name": "allegro", "url": "http://167.99.126.229:8080"}
]
"""
if os.path.exists(FLEET_FILE):
try:
with open(FLEET_FILE) as f:
return json.load(f)
except (json.JSONDecodeError, KeyError):
pass
return []
def _ping_agent(url: str, timeout: float = 15.0) -> dict:
"""Ping an A2A endpoint and return health data.
Tries /health first, falls back to /a2a with a minimal request.
"""
start = time.monotonic()
# Try dedicated health endpoint first
health_url = f"{url.rstrip('/')}/health"
try:
req = urllib.request.Request(health_url, method="GET")
with urllib.request.urlopen(req, timeout=timeout) as resp:
elapsed = time.monotonic() - start
data = json.loads(resp.read())
return {
"ok": True,
"response_time_ms": round(elapsed * 1000, 1),
"status": data.get("status", "healthy"),
"available_tools": data.get("available_tools", 0),
}
except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError, OSError):
pass
# Fallback: try A2A agent card endpoint
card_url = f"{url.rstrip('/')}/.well-known/agent-card.json"
try:
req = urllib.request.Request(card_url, method="GET")
with urllib.request.urlopen(req, timeout=timeout) as resp:
elapsed = time.monotonic() - start
data = json.loads(resp.read())
tools = data.get("capabilities", {}).get("tools", [])
return {
"ok": True,
"response_time_ms": round(elapsed * 1000, 1),
"status": "healthy",
"available_tools": len(tools) if isinstance(tools, list) else 0,
}
except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError, OSError) as e:
elapsed = time.monotonic() - start
return {
"ok": False,
"response_time_ms": round(elapsed * 1000, 1),
"error": str(e),
}
class HealthMonitor:
"""A2A fleet health monitor."""
def __init__(self, state_file: str = STATE_FILE, fleet_file: str = FLEET_FILE):
self.state_file = state_file
self.fleet_file = fleet_file
self.state = _load_state()
def check_agent(self, name: str, url: str) -> AgentStatus:
"""Check a single agent and update state."""
now = _now_iso()
result = _ping_agent(url)
if name not in self.state.agents:
self.state.agents[name] = AgentStatus(name=name, url=url)
agent = self.state.agents[name]
agent.last_check = now
agent.response_time_ms = result.get("response_time_ms")
if result["ok"]:
agent.status = "healthy"
agent.consecutive_failures = 0
agent.available_tools = result.get("available_tools", 0)
agent.agent_status = result.get("status", "healthy")
agent.last_healthy = now
agent.error = None
else:
agent.consecutive_failures += 1
agent.error = result.get("error", "unknown error")
if agent.consecutive_failures >= CONSECUTIVE_FAILURE_THRESHOLD:
agent.status = "down"
else:
agent.status = "degraded"
# Check response time even on success
if result["ok"] and result.get("response_time_ms", 0) > RESPONSE_TIME_THRESHOLD_SEC * 1000:
if agent.status == "healthy":
agent.status = "degraded"
return agent
def check_all(self) -> list[AgentStatus]:
"""Check all registered fleet agents."""
fleet = _load_fleet()
results = []
for agent_def in fleet:
name = agent_def.get("name", "unknown")
url = agent_def.get("url", "")
if not url:
continue
status = self.check_agent(name, url)
results.append(status)
self.state.last_full_check = _now_iso()
_save_state(self.state)
return results
def get_alerts(self, results: list[AgentStatus]) -> list[dict]:
"""Generate alerts for agents that just went down."""
alerts = []
now = time.time()
for agent in results:
if agent.status == "down":
last_alert = self.state.alerts_sent.get(agent.name, 0)
# Alert at most once per hour per agent
if now - last_alert > 3600:
alerts.append({
"agent": agent.name,
"consecutive_failures": agent.consecutive_failures,
"last_healthy": agent.last_healthy,
"error": agent.error,
})
self.state.alerts_sent[agent.name] = now
if alerts:
_save_state(self.state)
return alerts
def print_dashboard(self, results: list[AgentStatus]) -> str:
"""Format a text dashboard of fleet health."""
lines = []
lines.append("=== A2A Fleet Health ===")
lines.append(f"Checked: {_now_iso()[:19]}Z")
lines.append("")
if not results:
lines.append("No agents registered.")
lines.append(f"Add agents to {self.fleet_file}")
return "\n".join(lines)
# Header
lines.append(f"{'AGENT':<12} {'STATUS':<10} {'MS':>8} {'TOOLS':>6} {'FAIL':>5} {'LAST HEALTHY'}")
lines.append("-" * 72)
for agent in sorted(results, key=lambda a: a.name):
status_icon = {
"healthy": "OK",
"degraded": "WARN",
"down": "DOWN",
"unknown": "???",
}.get(agent.status, "???")
ms = f"{agent.response_time_ms:.0f}" if agent.response_time_ms else "-"
tools = str(agent.available_tools) if agent.available_tools else "-"
fail = str(agent.consecutive_failures) if agent.consecutive_failures else "-"
last_h = agent.last_healthy[:19] + "Z" if agent.last_healthy else "never"
lines.append(f"{agent.name:<12} {status_icon:<10} {ms:>8} {tools:>6} {fail:>5} {last_h}")
# Summary
healthy = sum(1 for a in results if a.status == "healthy")
degraded = sum(1 for a in results if a.status == "degraded")
down = sum(1 for a in results if a.status == "down")
lines.append("-" * 72)
lines.append(f"Total: {len(results)} Healthy: {healthy} Degraded: {degraded} Down: {down}")
# Alerts
alerts = self.get_alerts(results)
if alerts:
lines.append("")
lines.append("ALERTS:")
for alert in alerts:
lines.append(f" !! {alert['agent']} is DOWN ({alert['consecutive_failures']} consecutive failures)")
if alert.get("error"):
lines.append(f" Error: {alert['error'][:100]}")
return "\n".join(lines)

View File

@@ -0,0 +1,64 @@
"""CLI command: hermes a2a health
Fleet agent health check via A2A heartbeat.
Usage:
hermes a2a health # Check all agents, show dashboard
hermes a2a health --json # JSON output for scripting
hermes a2a health --agent NAME # Check single agent
"""
import argparse
import json
import sys
from hermes_cli.a2a_health import HealthMonitor, _load_fleet, _ping_agent
def register_subparser(subparsers):
"""Register 'hermes a2a health' subcommand."""
a2a_parser = subparsers.add_parser("a2a", help="A2A agent communication")
a2a_sub = a2a_parser.add_subparsers(dest="a2a_command")
health_parser = a2a_sub.add_parser("health", help="Check fleet agent health")
health_parser.add_argument("--json", action="store_true", help="JSON output")
health_parser.add_argument("--agent", type=str, help="Check single agent by name")
health_parser.set_defaults(func=run_health_check)
return a2a_parser
def run_health_check(args: argparse.Namespace) -> int:
"""Execute the health check command."""
monitor = HealthMonitor()
if args.agent:
# Single agent check
fleet = _load_fleet()
agent_def = next((a for a in fleet if a.get("name") == args.agent), None)
if not agent_def:
print(f"Error: agent '{args.agent}' not found in fleet", file=sys.stderr)
print(f"Add it to ~/.hermes/fleet_agents.json", file=sys.stderr)
return 1
result = monitor.check_agent(args.agent, agent_def["url"])
if args.json:
from dataclasses import asdict
print(json.dumps(asdict(result), indent=2))
else:
print(monitor.print_dashboard([result]))
return 0 if result.status in ("healthy", "degraded") else 1
# Full fleet check
results = monitor.check_all()
if args.json:
from dataclasses import asdict
print(json.dumps([asdict(r) for r in results], indent=2))
else:
print(monitor.print_dashboard(results))
# Exit code: 0 if all healthy, 1 if any down
has_down = any(r.status == "down" for r in results)
return 1 if has_down else 0