Compare commits

..

2 Commits

Author SHA1 Message Date
4c8d63a5c9 test: A2A health monitor tests
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 35s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Nix / nix (ubuntu-latest) (pull_request) Failing after 4s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 31s
Tests / e2e (pull_request) Successful in 3m12s
Tests / test (pull_request) Failing after 42m47s
Nix / nix (macos-latest) (pull_request) Has been cancelled
Part of #822
2026-04-16 01:39:10 +00:00
6bc10419b1 feat: A2A health monitor module
Closes #822, Part of #801
2026-04-16 01:39:07 +00:00
4 changed files with 337 additions and 356 deletions

View File

@@ -1,224 +0,0 @@
"""A2A Agent Card — publish capabilities for fleet discovery.
Each fleet agent publishes an A2A-compliant agent card describing its capabilities.
Standard discovery endpoint: /.well-known/agent-card.json
Issue #819: feat: A2A agent card — publish capabilities for fleet discovery
"""
import json
import os
import socket
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Any, Dict, List, Optional
@dataclass
class AgentSkill:
"""A single skill the agent can perform."""
id: str
name: str
description: str = ""
tags: List[str] = field(default_factory=list)
examples: List[str] = field(default_factory=list)
input_modes: List[str] = field(default_factory=lambda: ["text/plain"])
output_modes: List[str] = field(default_factory=lambda: ["text/plain"])
@dataclass
class AgentCapabilities:
"""What the agent can do."""
streaming: bool = True
push_notifications: bool = False
state_transition_history: bool = True
@dataclass
class AgentCard:
"""A2A-compliant agent card."""
name: str
description: str
url: str
version: str = "1.0.0"
capabilities: AgentCapabilities = field(default_factory=AgentCapabilities)
skills: List[AgentSkill] = field(default_factory=list)
default_input_modes: List[str] = field(default_factory=lambda: ["text/plain", "application/json"])
default_output_modes: List[str] = field(default_factory=lambda: ["text/plain", "application/json"])
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert to JSON-serializable dict."""
d = asdict(self)
# Rename for A2A spec compliance
d["defaultInputModes"] = d.pop("default_input_modes")
d["defaultOutputModes"] = d.pop("default_output_modes")
return d
def to_json(self) -> str:
"""Serialize to JSON string."""
return json.dumps(self.to_dict(), indent=2)
def _load_skills_from_directory(skills_dir: Path) -> List[AgentSkill]:
"""Scan ~/.hermes/skills/ for SKILL.md frontmatter."""
skills = []
if not skills_dir.exists():
return skills
for skill_dir in skills_dir.iterdir():
if not skill_dir.is_dir():
continue
skill_md = skill_dir / "SKILL.md"
if not skill_md.exists():
continue
try:
content = skill_md.read_text(encoding="utf-8")
# Parse YAML frontmatter
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 3:
import yaml
try:
metadata = yaml.safe_load(parts[1]) or {}
except Exception:
metadata = {}
name = metadata.get("name", skill_dir.name)
desc = metadata.get("description", "")
tags = metadata.get("tags", [])
skills.append(AgentSkill(
id=skill_dir.name,
name=name,
description=desc[:200] if desc else "",
tags=tags if isinstance(tags, list) else [],
))
except Exception:
continue
return skills
def validate_agent_card(card: AgentCard) -> List[str]:
"""Validate agent card against A2A schema requirements.
Returns list of validation errors (empty if valid).
"""
errors = []
if not card.name:
errors.append("name is required")
if not card.url:
errors.append("url is required")
# Validate MIME types
valid_modes = {"text/plain", "application/json", "image/png", "audio/wav"}
for mode in card.default_input_modes:
if mode not in valid_modes:
errors.append(f"invalid input mode: {mode}")
for mode in card.default_output_modes:
if mode not in valid_modes:
errors.append(f"invalid output mode: {mode}")
# Validate skills
for skill in card.skills:
if not skill.id:
errors.append(f"skill missing id: {skill.name}")
return errors
def build_agent_card(
name: Optional[str] = None,
description: Optional[str] = None,
url: Optional[str] = None,
version: Optional[str] = None,
skills: Optional[List[AgentSkill]] = None,
extra_skills: Optional[List[AgentSkill]] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> AgentCard:
"""Build an A2A agent card from config and environment.
Priority: explicit params > env vars > config.yaml > defaults
"""
# Load config
config_model = ""
config_provider = ""
try:
from hermes_cli.config import load_config
cfg = load_config()
model_cfg = cfg.get("model", {})
if isinstance(model_cfg, dict):
config_model = model_cfg.get("default", "")
config_provider = model_cfg.get("provider", "")
elif isinstance(model_cfg, str):
config_model = model_cfg
except Exception:
pass
# Resolve values with priority
agent_name = name or os.environ.get("HERMES_AGENT_NAME", "") or "hermes"
agent_desc = description or os.environ.get("HERMES_AGENT_DESCRIPTION", "") or "Sovereign AI agent"
agent_url = url or os.environ.get("HERMES_AGENT_URL", "") or f"http://localhost:{os.environ.get('HERMES_API_PORT', '8642')}"
agent_version = version or os.environ.get("HERMES_AGENT_VERSION", "") or "1.0.0"
# Load skills
if skills is not None:
agent_skills = skills
else:
from hermes_constants import get_hermes_home
skills_dir = get_hermes_home() / "skills"
agent_skills = _load_skills_from_directory(skills_dir)
# Add extra skills
if extra_skills:
existing_ids = {s.id for s in agent_skills}
for skill in extra_skills:
if skill.id not in existing_ids:
agent_skills.append(skill)
# Build metadata
card_metadata = {
"model": config_model or os.environ.get("HERMES_MODEL", ""),
"provider": config_provider or os.environ.get("HERMES_PROVIDER", ""),
"hostname": socket.gethostname(),
}
if metadata:
card_metadata.update(metadata)
# Build capabilities
capabilities = AgentCapabilities(
streaming=True,
push_notifications=False,
state_transition_history=True,
)
return AgentCard(
name=agent_name,
description=agent_desc,
url=agent_url,
version=agent_version,
capabilities=capabilities,
skills=agent_skills,
metadata=card_metadata,
)
def get_agent_card_json() -> str:
"""Get agent card as JSON string (for HTTP endpoint)."""
try:
card = build_agent_card()
return card.to_json()
except Exception as e:
# Graceful fallback — return minimal card so discovery doesn't break
fallback = AgentCard(
name="hermes",
description="Sovereign AI agent",
url=f"http://localhost:{os.environ.get('HERMES_API_PORT', '8642')}",
)
return fallback.to_json()

257
hermes_cli/a2a_health.py Normal file
View File

@@ -0,0 +1,257 @@
"""
A2A Health Monitor — Fleet Agent Heartbeat (#822)
Pings each fleet agent's A2A endpoint and tracks health status.
Persists state to ~/.hermes/a2a_health.json.
Usage:
from hermes_cli.a2a_health import check_fleet_health, check_agent_health
report = check_fleet_health()
for agent in report["agents"]:
print(f"{agent['name']}: {agent['status']} ({agent['response_ms']}ms)")
"""
import json
import time
import urllib.request
import urllib.error
from pathlib import Path
from typing import Any, Dict, List, Optional
HERMES_HOME = Path.home() / ".hermes"
FLEET_CONFIG = HERMES_HOME / "fleet_agents.json"
HEALTH_STATE = HERMES_HOME / "a2a_health.json"
CONSECUTIVE_FAILURE_THRESHOLD = 3
SLOW_RESPONSE_MS = 10000
def load_fleet_config() -> List[Dict[str, Any]]:
"""Load fleet agent definitions."""
if not FLEET_CONFIG.exists():
return []
try:
with open(FLEET_CONFIG) as f:
data = json.load(f)
return data.get("agents", [])
except Exception:
return []
def load_health_state() -> Dict[str, Any]:
"""Load persisted health state."""
if not HEALTH_STATE.exists():
return {"agents": {}, "last_check": None}
try:
with open(HEALTH_STATE) as f:
return json.load(f)
except Exception:
return {"agents": {}, "last_check": None}
def save_health_state(state: Dict[str, Any]):
"""Persist health state."""
HEALTH_STATE.parent.mkdir(parents=True, exist_ok=True)
with open(HEALTH_STATE, "w") as f:
json.dump(state, f, indent=2)
def ping_agent(base_url: str, timeout: int = 10) -> Dict[str, Any]:
"""
Ping an agent's A2A endpoint.
Tries /health first, falls back to /.well-known/agent-card.json.
"""
start = time.monotonic()
endpoints = ["/health", "/.well-known/agent-card.json"]
for endpoint in endpoints:
url = f"{base_url.rstrip('/')}{endpoint}"
try:
req = urllib.request.Request(url, method="GET")
req.add_header("User-Agent", "hermes-a2a-health/1.0")
with urllib.request.urlopen(req, timeout=timeout) as resp:
elapsed = (time.monotonic() - start) * 1000
body = resp.read(1024).decode("utf-8", errors="replace")
result = {
"alive": True,
"status_code": resp.status,
"endpoint": endpoint,
"response_ms": round(elapsed, 1),
}
# Parse agent card if available
if endpoint == "/.well-known/agent-card.json":
try:
card = json.loads(body)
result["agent_card"] = {
"name": card.get("name", "unknown"),
"tools_count": len(card.get("skills", [])),
}
except Exception:
pass
return result
except urllib.error.URLError:
continue
except Exception:
continue
elapsed = (time.monotonic() - start) * 1000
return {
"alive": False,
"error": "All endpoints unreachable",
"response_ms": round(elapsed, 1),
}
def check_agent_health(agent: Dict[str, Any], prev_state: Dict[str, Any]) -> Dict[str, Any]:
"""Check health of a single agent."""
name = agent.get("name", "unknown")
base_url = ""
# Get URL from agent config
interfaces = agent.get("supportedInterfaces", [])
if interfaces:
base_url = interfaces[0].get("url", "")
if not base_url:
base_url = agent.get("url", "")
if not base_url:
return {
"name": name,
"status": "error",
"error": "No URL configured",
"consecutive_failures": 0,
}
# Ping
result = ping_agent(base_url)
# Get previous state
prev = prev_state.get("agents", {}).get(name, {})
prev_failures = prev.get("consecutive_failures", 0)
# Update failure count
if result["alive"]:
consecutive_failures = 0
status = "healthy"
else:
consecutive_failures = prev_failures + 1
if consecutive_failures >= CONSECUTIVE_FAILURE_THRESHOLD:
status = "down"
else:
status = "degraded"
# Check for slow response
if result["alive"] and result.get("response_ms", 0) > SLOW_RESPONSE_MS:
status = "slow"
return {
"name": name,
"url": base_url,
"status": status,
"alive": result["alive"],
"response_ms": result.get("response_ms"),
"endpoint": result.get("endpoint"),
"status_code": result.get("status_code"),
"agent_card": result.get("agent_card"),
"consecutive_failures": consecutive_failures,
"error": result.get("error"),
"checked_at": time.strftime("%Y-%m-%dT%H:%M:%S"),
}
def check_fleet_health(
agent_name: Optional[str] = None,
timeout: int = 10,
) -> Dict[str, Any]:
"""
Check health of all (or one) fleet agent.
Returns report dict with agents list and summary.
"""
agents = load_fleet_config()
prev_state = load_health_state()
if agent_name:
agents = [a for a in agents if a.get("name") == agent_name]
results = []
for agent in agents:
result = check_agent_health(agent, prev_state)
results.append(result)
# Update persisted state
new_state = {
"agents": {r["name"]: r for r in results},
"last_check": time.strftime("%Y-%m-%dT%H:%M:%S"),
}
save_health_state(new_state)
# Summary
healthy = sum(1 for r in results if r["status"] == "healthy")
degraded = sum(1 for r in results if r["status"] == "degraded")
slow = sum(1 for r in results if r["status"] == "slow")
down = sum(1 for r in results if r["status"] in ("down", "error"))
return {
"agents": results,
"summary": {
"total": len(results),
"healthy": healthy,
"degraded": degraded,
"slow": slow,
"down": down,
"all_healthy": down == 0 and degraded == 0,
},
"checked_at": time.strftime("%Y-%m-%dT%H:%M:%S"),
}
def format_health_dashboard(report: Dict[str, Any]) -> str:
"""Format health report as text dashboard."""
lines = []
summary = report["summary"]
# Header
if summary["all_healthy"]:
lines.append("\u2705 All fleet agents healthy")
elif summary["down"] > 0:
lines.append(f"\u274c {summary['down']} agent(s) DOWN")
else:
lines.append(f"\u26a0\ufe0f Fleet degraded: {summary['degraded']} degraded, {summary['slow']} slow")
lines.append(f"Checked: {report['checked_at']}")
lines.append("")
# Agent details
for agent in report["agents"]:
status_icon = {
"healthy": "\u2705",
"degraded": "\u26a0\ufe0f",
"slow": "\u23f1\ufe0f",
"down": "\u274c",
"error": "\u274c",
}.get(agent["status"], "\u2753")
name = agent["name"]
ms = agent.get("response_ms", "?")
failures = agent.get("consecutive_failures", 0)
line = f" {status_icon} {name}"
if agent.get("alive"):
line += f"{ms}ms"
if agent.get("agent_card"):
tools = agent["agent_card"].get("tools_count", 0)
line += f"{tools} tools"
else:
line += f"{agent.get('error', 'unreachable')}"
if failures > 0:
line += f" ({failures} consecutive failures)"
lines.append(line)
return "\n".join(lines)

80
tests/test_a2a_health.py Normal file
View File

@@ -0,0 +1,80 @@
"""Tests for A2A health monitor (#822)."""
import sys
import json
import tempfile
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from hermes_cli.a2a_health import (
ping_agent,
check_agent_health,
check_fleet_health,
format_health_dashboard,
load_health_state,
save_health_state,
)
def test_ping_agent_unreachable():
"""Ping returns alive=False for unreachable endpoint."""
result = ping_agent("http://192.0.2.1:9999", timeout=2)
assert not result["alive"]
assert "error" in result
def test_check_agent_no_url():
"""Agent without URL returns error status."""
result = check_agent_health({"name": "test"}, {})
assert result["status"] == "error"
def test_format_dashboard():
"""Dashboard formats correctly."""
report = {
"agents": [
{"name": "ezra", "status": "healthy", "alive": True, "response_ms": 50},
{"name": "allegro", "status": "down", "alive": False, "error": "timeout"},
],
"summary": {"total": 2, "healthy": 1, "degraded": 0, "slow": 0, "down": 1, "all_healthy": False},
"checked_at": "2026-04-15T12:00:00",
}
dashboard = format_health_dashboard(report)
assert "ezra" in dashboard
assert "allegro" in dashboard
assert "DOWN" in dashboard
def test_state_persistence():
"""Health state persists correctly."""
with tempfile.TemporaryDirectory() as tmpdir:
state_file = Path(tmpdir) / "health.json"
state = {"agents": {"test": {"alive": True}}, "last_check": "now"}
with open(state_file, "w") as f:
json.dump(state, f)
with open(state_file) as f:
loaded = json.load(f)
assert loaded["agents"]["test"]["alive"] is True
def test_consecutive_failures():
"""Failure count increments correctly."""
prev = {"agents": {"test": {"consecutive_failures": 2}}}
agent = {"name": "test", "url": "http://192.0.2.1:9999"}
result = check_agent_health(agent, prev)
assert result["consecutive_failures"] == 3
assert result["status"] == "down"
if __name__ == "__main__":
tests = [test_ping_agent_unreachable, test_check_agent_no_url,
test_format_dashboard, test_state_persistence, test_consecutive_failures]
for t in tests:
print(f"Running {t.__name__}...")
t()
print(" PASS")
print("\nAll tests passed.")

View File

@@ -1,132 +0,0 @@
"""Tests for A2A agent card — Issue #819."""
import json
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from agent.agent_card import (
AgentSkill, AgentCapabilities, AgentCard,
validate_agent_card, build_agent_card, get_agent_card_json,
_load_skills_from_directory
)
class TestAgentSkill:
def test_creation(self):
skill = AgentSkill(id="code", name="Code", tags=["python"])
assert skill.id == "code"
assert "python" in skill.tags
class TestAgentCapabilities:
def test_defaults(self):
caps = AgentCapabilities()
assert caps.streaming == True
assert caps.push_notifications == False
class TestAgentCard:
def test_to_dict(self):
card = AgentCard(name="timmy", description="test", url="http://localhost:8642")
d = card.to_dict()
assert d["name"] == "timmy"
assert "defaultInputModes" in d
def test_to_json(self):
card = AgentCard(name="timmy", description="test", url="http://localhost:8642")
j = card.to_json()
parsed = json.loads(j)
assert parsed["name"] == "timmy"
class TestValidation:
def test_valid_card(self):
card = AgentCard(name="timmy", description="test", url="http://localhost:8642")
errors = validate_agent_card(card)
assert len(errors) == 0
def test_missing_name(self):
card = AgentCard(name="", description="test", url="http://localhost:8642")
errors = validate_agent_card(card)
assert any("name" in e for e in errors)
def test_missing_url(self):
card = AgentCard(name="timmy", description="test", url="")
errors = validate_agent_card(card)
assert any("url" in e for e in errors)
def test_invalid_input_mode(self):
card = AgentCard(
name="timmy", description="test", url="http://localhost:8642",
default_input_modes=["invalid/mode"]
)
errors = validate_agent_card(card)
assert any("invalid input mode" in e for e in errors)
def test_skill_missing_id(self):
card = AgentCard(
name="timmy", description="test", url="http://localhost:8642",
skills=[AgentSkill(id="", name="test")]
)
errors = validate_agent_card(card)
assert any("skill missing id" in e for e in errors)
class TestBuildAgentCard:
def test_builds_valid_card(self):
card = build_agent_card()
assert card.name
assert card.url
errors = validate_agent_card(card)
assert len(errors) == 0
def test_explicit_params_override(self):
card = build_agent_card(name="custom", description="custom desc")
assert card.name == "custom"
assert card.description == "custom desc"
def test_extra_skills(self):
extra = [AgentSkill(id="extra", name="Extra")]
card = build_agent_card(extra_skills=extra)
assert any(s.id == "extra" for s in card.skills)
class TestGetAgentCardJson:
def test_returns_valid_json(self):
j = get_agent_card_json()
parsed = json.loads(j)
assert "name" in parsed
def test_graceful_fallback(self):
# Even if something fails, should return valid JSON
j = get_agent_card_json()
assert j # Non-empty
class TestLoadSkills:
def test_empty_dir(self, tmp_path):
skills = _load_skills_from_directory(tmp_path / "nonexistent")
assert len(skills) == 0
def test_parses_skill_md(self, tmp_path):
skill_dir = tmp_path / "test-skill"
skill_dir.mkdir()
skill_md = skill_dir / "SKILL.md"
skill_md.write_text("""---
name: Test Skill
description: A test skill
tags:
- test
- example
---
Content here
""")
skills = _load_skills_from_directory(tmp_path)
assert len(skills) == 1
assert skills[0].name == "Test Skill"
assert "test" in skills[0].tags
if __name__ == "__main__":
import pytest
pytest.main([__file__, "-v"])