Compare commits

..

2 Commits

Author SHA1 Message Date
4c8d63a5c9 test: A2A health monitor tests
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 35s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Nix / nix (ubuntu-latest) (pull_request) Failing after 4s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 31s
Tests / e2e (pull_request) Successful in 3m12s
Tests / test (pull_request) Failing after 42m47s
Nix / nix (macos-latest) (pull_request) Has been cancelled
Part of #822
2026-04-16 01:39:10 +00:00
6bc10419b1 feat: A2A health monitor module
Closes #822, Part of #801
2026-04-16 01:39:07 +00:00
3 changed files with 337 additions and 313 deletions

257
hermes_cli/a2a_health.py Normal file
View File

@@ -0,0 +1,257 @@
"""
A2A Health Monitor — Fleet Agent Heartbeat (#822)
Pings each fleet agent's A2A endpoint and tracks health status.
Persists state to ~/.hermes/a2a_health.json.
Usage:
from hermes_cli.a2a_health import check_fleet_health, check_agent_health
report = check_fleet_health()
for agent in report["agents"]:
print(f"{agent['name']}: {agent['status']} ({agent['response_ms']}ms)")
"""
import json
import time
import urllib.request
import urllib.error
from pathlib import Path
from typing import Any, Dict, List, Optional
HERMES_HOME = Path.home() / ".hermes"
FLEET_CONFIG = HERMES_HOME / "fleet_agents.json"
HEALTH_STATE = HERMES_HOME / "a2a_health.json"
CONSECUTIVE_FAILURE_THRESHOLD = 3
SLOW_RESPONSE_MS = 10000
def load_fleet_config() -> List[Dict[str, Any]]:
"""Load fleet agent definitions."""
if not FLEET_CONFIG.exists():
return []
try:
with open(FLEET_CONFIG) as f:
data = json.load(f)
return data.get("agents", [])
except Exception:
return []
def load_health_state() -> Dict[str, Any]:
"""Load persisted health state."""
if not HEALTH_STATE.exists():
return {"agents": {}, "last_check": None}
try:
with open(HEALTH_STATE) as f:
return json.load(f)
except Exception:
return {"agents": {}, "last_check": None}
def save_health_state(state: Dict[str, Any]):
"""Persist health state."""
HEALTH_STATE.parent.mkdir(parents=True, exist_ok=True)
with open(HEALTH_STATE, "w") as f:
json.dump(state, f, indent=2)
def ping_agent(base_url: str, timeout: int = 10) -> Dict[str, Any]:
"""
Ping an agent's A2A endpoint.
Tries /health first, falls back to /.well-known/agent-card.json.
"""
start = time.monotonic()
endpoints = ["/health", "/.well-known/agent-card.json"]
for endpoint in endpoints:
url = f"{base_url.rstrip('/')}{endpoint}"
try:
req = urllib.request.Request(url, method="GET")
req.add_header("User-Agent", "hermes-a2a-health/1.0")
with urllib.request.urlopen(req, timeout=timeout) as resp:
elapsed = (time.monotonic() - start) * 1000
body = resp.read(1024).decode("utf-8", errors="replace")
result = {
"alive": True,
"status_code": resp.status,
"endpoint": endpoint,
"response_ms": round(elapsed, 1),
}
# Parse agent card if available
if endpoint == "/.well-known/agent-card.json":
try:
card = json.loads(body)
result["agent_card"] = {
"name": card.get("name", "unknown"),
"tools_count": len(card.get("skills", [])),
}
except Exception:
pass
return result
except urllib.error.URLError:
continue
except Exception:
continue
elapsed = (time.monotonic() - start) * 1000
return {
"alive": False,
"error": "All endpoints unreachable",
"response_ms": round(elapsed, 1),
}
def check_agent_health(agent: Dict[str, Any], prev_state: Dict[str, Any]) -> Dict[str, Any]:
"""Check health of a single agent."""
name = agent.get("name", "unknown")
base_url = ""
# Get URL from agent config
interfaces = agent.get("supportedInterfaces", [])
if interfaces:
base_url = interfaces[0].get("url", "")
if not base_url:
base_url = agent.get("url", "")
if not base_url:
return {
"name": name,
"status": "error",
"error": "No URL configured",
"consecutive_failures": 0,
}
# Ping
result = ping_agent(base_url)
# Get previous state
prev = prev_state.get("agents", {}).get(name, {})
prev_failures = prev.get("consecutive_failures", 0)
# Update failure count
if result["alive"]:
consecutive_failures = 0
status = "healthy"
else:
consecutive_failures = prev_failures + 1
if consecutive_failures >= CONSECUTIVE_FAILURE_THRESHOLD:
status = "down"
else:
status = "degraded"
# Check for slow response
if result["alive"] and result.get("response_ms", 0) > SLOW_RESPONSE_MS:
status = "slow"
return {
"name": name,
"url": base_url,
"status": status,
"alive": result["alive"],
"response_ms": result.get("response_ms"),
"endpoint": result.get("endpoint"),
"status_code": result.get("status_code"),
"agent_card": result.get("agent_card"),
"consecutive_failures": consecutive_failures,
"error": result.get("error"),
"checked_at": time.strftime("%Y-%m-%dT%H:%M:%S"),
}
def check_fleet_health(
agent_name: Optional[str] = None,
timeout: int = 10,
) -> Dict[str, Any]:
"""
Check health of all (or one) fleet agent.
Returns report dict with agents list and summary.
"""
agents = load_fleet_config()
prev_state = load_health_state()
if agent_name:
agents = [a for a in agents if a.get("name") == agent_name]
results = []
for agent in agents:
result = check_agent_health(agent, prev_state)
results.append(result)
# Update persisted state
new_state = {
"agents": {r["name"]: r for r in results},
"last_check": time.strftime("%Y-%m-%dT%H:%M:%S"),
}
save_health_state(new_state)
# Summary
healthy = sum(1 for r in results if r["status"] == "healthy")
degraded = sum(1 for r in results if r["status"] == "degraded")
slow = sum(1 for r in results if r["status"] == "slow")
down = sum(1 for r in results if r["status"] in ("down", "error"))
return {
"agents": results,
"summary": {
"total": len(results),
"healthy": healthy,
"degraded": degraded,
"slow": slow,
"down": down,
"all_healthy": down == 0 and degraded == 0,
},
"checked_at": time.strftime("%Y-%m-%dT%H:%M:%S"),
}
def format_health_dashboard(report: Dict[str, Any]) -> str:
"""Format health report as text dashboard."""
lines = []
summary = report["summary"]
# Header
if summary["all_healthy"]:
lines.append("\u2705 All fleet agents healthy")
elif summary["down"] > 0:
lines.append(f"\u274c {summary['down']} agent(s) DOWN")
else:
lines.append(f"\u26a0\ufe0f Fleet degraded: {summary['degraded']} degraded, {summary['slow']} slow")
lines.append(f"Checked: {report['checked_at']}")
lines.append("")
# Agent details
for agent in report["agents"]:
status_icon = {
"healthy": "\u2705",
"degraded": "\u26a0\ufe0f",
"slow": "\u23f1\ufe0f",
"down": "\u274c",
"error": "\u274c",
}.get(agent["status"], "\u2753")
name = agent["name"]
ms = agent.get("response_ms", "?")
failures = agent.get("consecutive_failures", 0)
line = f" {status_icon} {name}"
if agent.get("alive"):
line += f"{ms}ms"
if agent.get("agent_card"):
tools = agent["agent_card"].get("tools_count", 0)
line += f"{tools} tools"
else:
line += f"{agent.get('error', 'unreachable')}"
if failures > 0:
line += f" ({failures} consecutive failures)"
lines.append(line)
return "\n".join(lines)

80
tests/test_a2a_health.py Normal file
View File

@@ -0,0 +1,80 @@
"""Tests for A2A health monitor (#822)."""
import sys
import json
import tempfile
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from hermes_cli.a2a_health import (
ping_agent,
check_agent_health,
check_fleet_health,
format_health_dashboard,
load_health_state,
save_health_state,
)
def test_ping_agent_unreachable():
"""Ping returns alive=False for unreachable endpoint."""
result = ping_agent("http://192.0.2.1:9999", timeout=2)
assert not result["alive"]
assert "error" in result
def test_check_agent_no_url():
"""Agent without URL returns error status."""
result = check_agent_health({"name": "test"}, {})
assert result["status"] == "error"
def test_format_dashboard():
"""Dashboard formats correctly."""
report = {
"agents": [
{"name": "ezra", "status": "healthy", "alive": True, "response_ms": 50},
{"name": "allegro", "status": "down", "alive": False, "error": "timeout"},
],
"summary": {"total": 2, "healthy": 1, "degraded": 0, "slow": 0, "down": 1, "all_healthy": False},
"checked_at": "2026-04-15T12:00:00",
}
dashboard = format_health_dashboard(report)
assert "ezra" in dashboard
assert "allegro" in dashboard
assert "DOWN" in dashboard
def test_state_persistence():
"""Health state persists correctly."""
with tempfile.TemporaryDirectory() as tmpdir:
state_file = Path(tmpdir) / "health.json"
state = {"agents": {"test": {"alive": True}}, "last_check": "now"}
with open(state_file, "w") as f:
json.dump(state, f)
with open(state_file) as f:
loaded = json.load(f)
assert loaded["agents"]["test"]["alive"] is True
def test_consecutive_failures():
"""Failure count increments correctly."""
prev = {"agents": {"test": {"consecutive_failures": 2}}}
agent = {"name": "test", "url": "http://192.0.2.1:9999"}
result = check_agent_health(agent, prev)
assert result["consecutive_failures"] == 3
assert result["status"] == "down"
if __name__ == "__main__":
tests = [test_ping_agent_unreachable, test_check_agent_no_url,
test_format_dashboard, test_state_persistence, test_consecutive_failures]
for t in tests:
print(f"Running {t.__name__}...")
t()
print(" PASS")
print("\nAll tests passed.")

View File

@@ -1,313 +0,0 @@
"""Context-Aware Risk Scoring — ML-lite tier detection enhancement.
Enhances the existing approval.py dangerous-command detection with
context-aware risk scoring. Instead of pure pattern matching, considers:
1. Path context: rm /tmp/x is safer than rm /etc/passwd
2. Command context: chmod 777 on project dir vs system dir
3. Recency: repeated dangerous commands increase risk
4. Scope: commands affecting more files = higher risk
Usage:
from tools.risk_scorer import score_action, RiskResult
result = score_action("rm -rf /tmp/build")
# result.tier = MEDIUM (not HIGH, because /tmp is safe)
# result.confidence = 0.7
"""
import os
import re
import time
from dataclasses import dataclass, field
from enum import IntEnum
from typing import Any, Dict, List, Optional, Tuple
# Risk tiers (aligned with approval_tiers.py)
class RiskTier(IntEnum):
SAFE = 0
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
@dataclass
class RiskResult:
"""Result of risk scoring."""
tier: RiskTier
confidence: float # 0.0 to 1.0
reasons: List[str] = field(default_factory=list)
context_factors: Dict[str, Any] = field(default_factory=dict)
# --- Path risk assessment ---
SAFE_PATHS = {
"/tmp", "/var/tmp", "/dev/shm",
"~/.hermes/sessions", "~/.hermes/cache", "~/.hermes/logs",
"/tmp/", "/var/tmp/",
}
HIGH_RISK_PATHS = {
"/etc", "/boot", "/usr/lib", "/usr/bin",
"~/.ssh", "~/.gnupg",
"/var/lib", "/opt",
}
CRITICAL_PATHS = {
"/", "/etc/passwd", "/etc/shadow", "/etc/sudoers",
"~/.ssh/id_rsa", "~/.ssh/authorized_keys",
"/boot/vmlinuz", "/dev/sda", "/dev/nvme",
}
def _extract_paths(command: str) -> List[str]:
"""Extract file paths from a command."""
paths = []
# Match common path patterns
for match in re.finditer(r'[/~][\w/.~-]+', command):
paths.append(match.group())
# Also match $HOME, $HERMES_HOME expansions
for match in re.finditer(r'\$(?:HOME|HERMES_HOME|PWD)[/\w]*', command):
paths.append(match.group())
return paths
def _classify_path(path: str) -> str:
"""Classify a path as safe, high-risk, or critical."""
path_lower = path.lower().replace("\\", "/")
for critical in CRITICAL_PATHS:
if path_lower.startswith(critical.lower()):
return "critical"
for high in HIGH_RISK_PATHS:
if path_lower.startswith(high.lower()):
return "high"
for safe in SAFE_PATHS:
if path_lower.startswith(safe.lower()):
return "safe"
# Unknown paths default to medium
return "unknown"
# --- Command risk modifiers ---
RISK_MODIFIERS = {
# Flags that increase risk
"-rf": 1.5,
"-r": 1.2,
"--force": 1.5,
"--recursive": 1.2,
"--no-preserve-root": 3.0,
"-f": 1.3,
"--hard": 1.5,
"--force-push": 2.0,
"-D": 1.4,
# Flags that decrease risk
"--dry-run": 0.1,
"-n": 0.3,
"--no-act": 0.1,
"--interactive": 0.7,
"-i": 0.7,
}
def _get_command_risk_modifier(command: str) -> float:
"""Get risk modifier based on command flags."""
modifier = 1.0
for flag, mod in RISK_MODIFIERS.items():
if flag in command:
modifier *= mod
return modifier
# --- Scope assessment ---
def _assess_scope(command: str) -> float:
"""Assess the scope of a command (how many files/systems affected)."""
scope = 1.0
# Wildcards increase scope
if "*" in command or "?" in command:
scope *= 2.0
# Recursive operations increase scope
if re.search(r'-r[f]?\b', command):
scope *= 1.5
# find/xargs pipelines increase scope
if "find" in command and ("exec" in command or "xargs" in command):
scope *= 2.0
# Multiple targets increase scope
paths = _extract_paths(command)
if len(paths) > 2:
scope *= 1.3
return min(scope, 5.0) # Cap at 5x
# --- Recent command tracking ---
_recent_commands: List[Tuple[float, str]] = []
_TRACK_WINDOW = 300 # 5 minutes
def _track_command(command: str) -> float:
"""Track command and return escalation factor based on recency."""
now = time.time()
# Clean old entries
global _recent_commands
_recent_commands = [
(ts, cmd) for ts, cmd in _recent_commands
if now - ts < _TRACK_WINDOW
]
# Check for repeated dangerous patterns
escalation = 1.0
for ts, recent_cmd in _recent_commands:
# Same command repeated = escalating risk
if recent_cmd == command:
escalation += 0.2
# Similar commands = moderate escalation
elif _commands_similar(command, recent_cmd):
escalation += 0.1
_recent_commands.append((now, command))
return min(escalation, 3.0) # Cap at 3x
def _commands_similar(cmd1: str, cmd2: str) -> bool:
"""Check if two commands are structurally similar."""
# Extract command name
name1 = cmd1.split()[0] if cmd1.split() else ""
name2 = cmd2.split()[0] if cmd2.split() else ""
return name1 == name2
# --- Main scoring function ---
# Base tier mapping from command name
COMMAND_BASE_TIERS = {
"rm": RiskTier.HIGH,
"chmod": RiskTier.MEDIUM,
"chown": RiskTier.HIGH,
"mkfs": RiskTier.CRITICAL,
"dd": RiskTier.HIGH,
"kill": RiskTier.HIGH,
"pkill": RiskTier.HIGH,
"systemctl": RiskTier.HIGH,
"git": RiskTier.LOW,
"sed": RiskTier.LOW,
"cp": RiskTier.LOW,
"mv": RiskTier.LOW,
"python3": RiskTier.LOW,
"pip": RiskTier.LOW,
"npm": RiskTier.LOW,
"docker": RiskTier.MEDIUM,
"ansible": RiskTier.HIGH,
}
def score_action(action: str, context: Optional[Dict[str, Any]] = None) -> RiskResult:
"""Score an action's risk level with context awareness.
Considers:
- Command base risk
- Path context (safe vs critical paths)
- Command flags (force, recursive, dry-run)
- Scope (wildcards, multiple targets)
- Recency (repeated commands escalate)
Returns:
RiskResult with tier, confidence, and reasons.
"""
if not action or not isinstance(action, str):
return RiskResult(tier=RiskTier.SAFE, confidence=1.0, reasons=["empty input"])
parts = action.strip().split()
if not parts:
return RiskResult(tier=RiskTier.SAFE, confidence=1.0, reasons=["empty command"])
cmd_name = parts[0].split("/")[-1] # Extract command name
# Base tier from command name
base_tier = COMMAND_BASE_TIERS.get(cmd_name, RiskTier.SAFE)
# Path risk assessment
paths = _extract_paths(action)
max_path_risk = "safe"
for path in paths:
path_risk = _classify_path(path)
risk_order = {"safe": 0, "unknown": 1, "high": 2, "critical": 3}
if risk_order.get(path_risk, 0) > risk_order.get(max_path_risk, 0):
max_path_risk = path_risk
# Calculate final tier
reasons = []
# Path-based tier adjustment
if max_path_risk == "critical":
base_tier = RiskTier.CRITICAL
reasons.append(f"Critical path detected: {paths[0] if paths else 'unknown'}")
elif max_path_risk == "high":
if base_tier.value < RiskTier.HIGH.value:
base_tier = RiskTier.HIGH
reasons.append(f"High-risk path: {paths[0] if paths else 'unknown'}")
elif max_path_risk == "safe":
# Downgrade if all paths are safe
if base_tier.value > RiskTier.MEDIUM.value:
base_tier = RiskTier.MEDIUM
reasons.append("Safe path context — risk downgraded")
# Apply modifiers
modifier = _get_command_risk_modifier(action)
scope = _assess_scope(action)
recency = _track_command(action)
# Check for dry-run (overrides everything)
if "--dry-run" in action or "-n " in action:
return RiskResult(
tier=RiskTier.SAFE,
confidence=0.95,
reasons=["dry-run mode — no actual changes"],
context_factors={"dry_run": True},
)
# Calculate confidence
confidence = 0.8 # Base confidence
if max_path_risk == "safe":
confidence = 0.9
elif max_path_risk == "unknown":
confidence = 0.6
elif max_path_risk == "critical":
confidence = 0.95
# Reasons
if modifier > 1.5:
reasons.append(f"Force/recursive flags (modifier: {modifier:.1f}x)")
if scope > 1.5:
reasons.append(f"Wide scope (wildcards/multiple targets, {scope:.1f}x)")
if recency > 1.2:
reasons.append(f"Repeated command pattern ({recency:.1f}x escalation)")
if not reasons:
reasons.append(f"Command '{cmd_name}' classified as {base_tier.name}")
return RiskResult(
tier=base_tier,
confidence=round(confidence, 2),
reasons=reasons,
context_factors={
"path_risk": max_path_risk,
"modifier": round(modifier, 2),
"scope": round(scope, 2),
"recency": round(recency, 2),
"paths": paths,
},
)