Compare commits
2 Commits
fix/806
...
feat/822-a
| Author | SHA1 | Date | |
|---|---|---|---|
| 4c8d63a5c9 | |||
| 6bc10419b1 |
@@ -1,2 +0,0 @@
|
||||
"""A2A (Agent-to-Agent) authentication and security."""
|
||||
from .mtls import FleetCA, AgentCert, verify_peer, generate_fleet_certs
|
||||
@@ -1,260 +0,0 @@
|
||||
"""
|
||||
mtls.py — Mutual TLS authentication for agent-to-agent communication.
|
||||
|
||||
Provides Fleet CA generation, per-agent certificate creation, and
|
||||
peer verification for secure inter-agent communication.
|
||||
|
||||
Usage:
|
||||
# Generate fleet CA + certs for all agents
|
||||
python3 -m agent.a2a.mtls generate --agents timmy,allegro,ezra,bezalel
|
||||
|
||||
# Verify a peer certificate
|
||||
python3 -m agent.a2a.mtls verify --cert /path/to/peer.pem --ca /path/to/ca.pem
|
||||
|
||||
# Check cert expiry
|
||||
python3 -m agent.a2a.mtls check --cert /path/to/cert.pem
|
||||
"""
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
import json
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Optional, Dict
|
||||
|
||||
CERTS_DIR = Path.home() / ".hermes" / "a2a" / "certs"
|
||||
CA_DIR = Path.home() / ".hermes" / "a2a" / "ca"
|
||||
|
||||
|
||||
@dataclass
|
||||
class CertInfo:
|
||||
"""Certificate information."""
|
||||
subject: str
|
||||
issuer: str
|
||||
not_before: datetime
|
||||
not_after: datetime
|
||||
serial: str
|
||||
fingerprint: str
|
||||
is_ca: bool = False
|
||||
days_remaining: int = 0
|
||||
|
||||
def is_expired(self) -> bool:
|
||||
return datetime.now() > self.not_after
|
||||
|
||||
def is_expiring_soon(self, days: int = 30) -> bool:
|
||||
return self.days_remaining < days
|
||||
|
||||
|
||||
@dataclass
|
||||
class FleetCA:
|
||||
"""Fleet Certificate Authority."""
|
||||
ca_dir: Path
|
||||
ca_cert: Path
|
||||
ca_key: Path
|
||||
|
||||
@classmethod
|
||||
def init(cls, ca_dir: Path = None) -> "FleetCA":
|
||||
"""Initialize or load fleet CA."""
|
||||
ca_dir = ca_dir or CA_DIR
|
||||
ca_dir.mkdir(parents=True, exist_ok=True)
|
||||
ca_cert = ca_dir / "ca.pem"
|
||||
ca_key = ca_dir / "ca-key.pem"
|
||||
|
||||
if not ca_cert.exists():
|
||||
cls._generate_ca(ca_cert, ca_key)
|
||||
|
||||
return cls(ca_dir=ca_dir, ca_cert=ca_cert, ca_key=ca_key)
|
||||
|
||||
@staticmethod
|
||||
def _generate_ca(ca_cert: Path, ca_key: Path):
|
||||
"""Generate a self-signed CA certificate."""
|
||||
# Generate CA key
|
||||
subprocess.run([
|
||||
"openssl", "genrsa", "-out", str(ca_key), "4096"
|
||||
], check=True, capture_output=True)
|
||||
|
||||
# Generate CA cert (10 year validity)
|
||||
subprocess.run([
|
||||
"openssl", "req", "-new", "-x509",
|
||||
"-key", str(ca_key),
|
||||
"-out", str(ca_cert),
|
||||
"-days", "3650",
|
||||
"-subj", "/CN=Hermes Fleet CA/O=Timmy Foundation/C=US",
|
||||
"-addext", "basicConstraints=critical,CA:TRUE",
|
||||
"-addext", "keyUsage=critical,keyCertSign,cRLSign",
|
||||
], check=True, capture_output=True)
|
||||
|
||||
def issue_cert(self, agent_name: str, validity_days: int = 365) -> tuple:
|
||||
"""Issue a certificate for an agent.
|
||||
|
||||
Returns (cert_path, key_path).
|
||||
"""
|
||||
cert_dir = CERTS_DIR / agent_name
|
||||
cert_dir.mkdir(parents=True, exist_ok=True)
|
||||
cert_path = cert_dir / "cert.pem"
|
||||
key_path = cert_dir / "key.pem"
|
||||
csr_path = cert_dir / "csr.pem"
|
||||
|
||||
# Generate key
|
||||
subprocess.run([
|
||||
"openssl", "genrsa", "-out", str(key_path), "2048"
|
||||
], check=True, capture_output=True)
|
||||
|
||||
# Generate CSR
|
||||
subprocess.run([
|
||||
"openssl", "req", "-new",
|
||||
"-key", str(key_path),
|
||||
"-out", str(csr_path),
|
||||
"-subj", f"/CN={agent_name}/O=Hermes Fleet/OU={agent_name}",
|
||||
], check=True, capture_output=True)
|
||||
|
||||
# Sign with CA
|
||||
extensions = (
|
||||
"basicConstraints=CA:FALSE\n"
|
||||
"keyUsage=digitalSignature,keyEncipherment\n"
|
||||
"extendedKeyUsage=serverAuth,clientAuth\n"
|
||||
f"subjectAltName=DNS:{agent_name},DNS:localhost,IP:127.0.0.1"
|
||||
)
|
||||
ext_file = cert_dir / "ext.cnf"
|
||||
ext_file.write_text(extensions)
|
||||
|
||||
subprocess.run([
|
||||
"openssl", "x509", "-req",
|
||||
"-in", str(csr_path),
|
||||
"-CA", str(self.ca_cert),
|
||||
"-CAkey", str(self.ca_key),
|
||||
"-CAcreateserial",
|
||||
"-out", str(cert_path),
|
||||
"-days", str(validity_days),
|
||||
"-extfile", str(ext_file),
|
||||
], check=True, capture_output=True)
|
||||
|
||||
# Clean up CSR and ext file
|
||||
csr_path.unlink(missing_ok=True)
|
||||
ext_file.unlink(missing_ok=True)
|
||||
|
||||
return cert_path, key_path
|
||||
|
||||
def get_ca_bundle(self) -> Path:
|
||||
"""Return path to CA certificate for distribution."""
|
||||
return self.ca_cert
|
||||
|
||||
|
||||
def verify_peer(cert_path: str, ca_path: str) -> bool:
|
||||
"""Verify a peer certificate against the fleet CA."""
|
||||
try:
|
||||
result = subprocess.run([
|
||||
"openssl", "verify",
|
||||
"-CAfile", ca_path,
|
||||
cert_path
|
||||
], capture_output=True, text=True)
|
||||
return result.returncode == 0 and "OK" in result.stdout
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def get_cert_info(cert_path: str) -> Optional[CertInfo]:
|
||||
"""Extract certificate information."""
|
||||
try:
|
||||
result = subprocess.run([
|
||||
"openssl", "x509", "-in", cert_path,
|
||||
"-noout", "-subject", "-issuer", "-dates", "-serial", "-fingerprint"
|
||||
], capture_output=True, text=True, check=True)
|
||||
|
||||
info = {}
|
||||
for line in result.stdout.strip().split("\n"):
|
||||
if "=" in line:
|
||||
key, _, val = line.partition("=")
|
||||
info[key.strip().lower().replace(" ", "_")] = val.strip()
|
||||
|
||||
not_before = datetime.strptime(info.get("not_before", ""), "%b %d %H:%M:%S %Y %Z")
|
||||
not_after = datetime.strptime(info.get("not_after", ""), "%b %d %H:%M:%S %Y %Z")
|
||||
days_remaining = (not_after - datetime.now()).days
|
||||
|
||||
return CertInfo(
|
||||
subject=info.get("subject", ""),
|
||||
issuer=info.get("issuer", ""),
|
||||
not_before=not_before,
|
||||
not_after=not_after,
|
||||
serial=info.get("serial", ""),
|
||||
fingerprint=info.get("sha1_fingerprint", info.get("sha256_fingerprint", "")),
|
||||
days_remaining=days_remaining,
|
||||
)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def generate_fleet_certs(agents: List[str], ca_dir: Path = None, validity_days: int = 365) -> Dict[str, tuple]:
|
||||
"""Generate certificates for all fleet agents.
|
||||
|
||||
Returns dict of agent_name -> (cert_path, key_path).
|
||||
"""
|
||||
ca = FleetCA.init(ca_dir)
|
||||
results = {}
|
||||
|
||||
for agent in agents:
|
||||
cert_path, key_path = ca.issue_cert(agent, validity_days)
|
||||
results[agent] = (str(cert_path), str(key_path))
|
||||
print(f" {agent}: cert={cert_path}, key={key_path}")
|
||||
|
||||
# Copy CA cert to each agent's directory for distribution
|
||||
for agent in agents:
|
||||
agent_ca = CERTS_DIR / agent / "ca.pem"
|
||||
if not agent_ca.exists():
|
||||
import shutil
|
||||
shutil.copy2(ca.ca_cert, agent_ca)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def main():
|
||||
"""CLI entry point."""
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="A2A mTLS certificate management")
|
||||
sub = parser.add_subparsers(dest="command")
|
||||
|
||||
# Generate
|
||||
gen = sub.add_parser("generate", help="Generate fleet certificates")
|
||||
gen.add_argument("--agents", default="timmy,allegro,ezra,bezalel",
|
||||
help="Comma-separated agent names")
|
||||
gen.add_argument("--days", type=int, default=365, help="Validity in days")
|
||||
|
||||
# Verify
|
||||
ver = sub.add_parser("verify", help="Verify a peer certificate")
|
||||
ver.add_argument("--cert", required=True)
|
||||
ver.add_argument("--ca", required=True)
|
||||
|
||||
# Check
|
||||
chk = sub.add_parser("check", help="Check certificate info")
|
||||
chk.add_argument("--cert", required=True)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.command == "generate":
|
||||
agents = [a.strip() for a in args.agents.split(",")]
|
||||
print(f"Generating certs for: {', '.join(agents)}")
|
||||
results = generate_fleet_certs(agents, validity_days=args.days)
|
||||
print(f"\nGenerated {len(results)} certificates")
|
||||
|
||||
elif args.command == "verify":
|
||||
ok = verify_peer(args.cert, args.ca)
|
||||
print(f"Verification: {'PASS' if ok else 'FAIL'}")
|
||||
|
||||
elif args.command == "check":
|
||||
info = get_cert_info(args.cert)
|
||||
if info:
|
||||
print(f"Subject: {info.subject}")
|
||||
print(f"Issuer: {info.issuer}")
|
||||
print(f"Valid: {info.not_before} to {info.not_after}")
|
||||
print(f"Days remaining: {info.days_remaining}")
|
||||
print(f"Expired: {info.is_expired()}")
|
||||
else:
|
||||
print("Could not read certificate")
|
||||
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,5 +0,0 @@
|
||||
---
|
||||
a2a_certs_dir: "~/.hermes/a2a/certs"
|
||||
a2a_ca_cert_local: "files/ca.pem"
|
||||
a2a_cert_local: "files/cert.pem"
|
||||
a2a_key_local: "files/key.pem"
|
||||
@@ -1,29 +0,0 @@
|
||||
---
|
||||
# Distribute A2A mTLS certificates to fleet nodes
|
||||
- name: Ensure certs directory exists
|
||||
file:
|
||||
path: "{{ a2a_certs_dir }}"
|
||||
state: directory
|
||||
mode: '0700'
|
||||
|
||||
- name: Copy CA certificate
|
||||
copy:
|
||||
src: "{{ a2a_ca_cert_local }}"
|
||||
dest: "{{ a2a_certs_dir }}/ca.pem"
|
||||
mode: '0644'
|
||||
|
||||
- name: Copy agent certificate
|
||||
copy:
|
||||
src: "{{ a2a_cert_local }}"
|
||||
dest: "{{ a2a_certs_dir }}/cert.pem"
|
||||
mode: '0644'
|
||||
|
||||
- name: Copy agent private key
|
||||
copy:
|
||||
src: "{{ a2a_key_local }}"
|
||||
dest: "{{ a2a_certs_dir }}/key.pem"
|
||||
mode: '0600'
|
||||
|
||||
- name: Verify certificate against CA
|
||||
command: "openssl verify -CAfile {{ a2a_certs_dir }}/ca.pem {{ a2a_certs_dir }}/cert.pem"
|
||||
changed_when: false
|
||||
257
hermes_cli/a2a_health.py
Normal file
257
hermes_cli/a2a_health.py
Normal file
@@ -0,0 +1,257 @@
|
||||
"""
|
||||
A2A Health Monitor — Fleet Agent Heartbeat (#822)
|
||||
|
||||
Pings each fleet agent's A2A endpoint and tracks health status.
|
||||
Persists state to ~/.hermes/a2a_health.json.
|
||||
|
||||
Usage:
|
||||
from hermes_cli.a2a_health import check_fleet_health, check_agent_health
|
||||
|
||||
report = check_fleet_health()
|
||||
for agent in report["agents"]:
|
||||
print(f"{agent['name']}: {agent['status']} ({agent['response_ms']}ms)")
|
||||
"""
|
||||
|
||||
import json
|
||||
import time
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
HERMES_HOME = Path.home() / ".hermes"
|
||||
FLEET_CONFIG = HERMES_HOME / "fleet_agents.json"
|
||||
HEALTH_STATE = HERMES_HOME / "a2a_health.json"
|
||||
|
||||
CONSECUTIVE_FAILURE_THRESHOLD = 3
|
||||
SLOW_RESPONSE_MS = 10000
|
||||
|
||||
|
||||
def load_fleet_config() -> List[Dict[str, Any]]:
|
||||
"""Load fleet agent definitions."""
|
||||
if not FLEET_CONFIG.exists():
|
||||
return []
|
||||
try:
|
||||
with open(FLEET_CONFIG) as f:
|
||||
data = json.load(f)
|
||||
return data.get("agents", [])
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
def load_health_state() -> Dict[str, Any]:
|
||||
"""Load persisted health state."""
|
||||
if not HEALTH_STATE.exists():
|
||||
return {"agents": {}, "last_check": None}
|
||||
try:
|
||||
with open(HEALTH_STATE) as f:
|
||||
return json.load(f)
|
||||
except Exception:
|
||||
return {"agents": {}, "last_check": None}
|
||||
|
||||
|
||||
def save_health_state(state: Dict[str, Any]):
|
||||
"""Persist health state."""
|
||||
HEALTH_STATE.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(HEALTH_STATE, "w") as f:
|
||||
json.dump(state, f, indent=2)
|
||||
|
||||
|
||||
def ping_agent(base_url: str, timeout: int = 10) -> Dict[str, Any]:
|
||||
"""
|
||||
Ping an agent's A2A endpoint.
|
||||
|
||||
Tries /health first, falls back to /.well-known/agent-card.json.
|
||||
"""
|
||||
start = time.monotonic()
|
||||
endpoints = ["/health", "/.well-known/agent-card.json"]
|
||||
|
||||
for endpoint in endpoints:
|
||||
url = f"{base_url.rstrip('/')}{endpoint}"
|
||||
try:
|
||||
req = urllib.request.Request(url, method="GET")
|
||||
req.add_header("User-Agent", "hermes-a2a-health/1.0")
|
||||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||||
elapsed = (time.monotonic() - start) * 1000
|
||||
body = resp.read(1024).decode("utf-8", errors="replace")
|
||||
|
||||
result = {
|
||||
"alive": True,
|
||||
"status_code": resp.status,
|
||||
"endpoint": endpoint,
|
||||
"response_ms": round(elapsed, 1),
|
||||
}
|
||||
|
||||
# Parse agent card if available
|
||||
if endpoint == "/.well-known/agent-card.json":
|
||||
try:
|
||||
card = json.loads(body)
|
||||
result["agent_card"] = {
|
||||
"name": card.get("name", "unknown"),
|
||||
"tools_count": len(card.get("skills", [])),
|
||||
}
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return result
|
||||
except urllib.error.URLError:
|
||||
continue
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
elapsed = (time.monotonic() - start) * 1000
|
||||
return {
|
||||
"alive": False,
|
||||
"error": "All endpoints unreachable",
|
||||
"response_ms": round(elapsed, 1),
|
||||
}
|
||||
|
||||
|
||||
def check_agent_health(agent: Dict[str, Any], prev_state: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Check health of a single agent."""
|
||||
name = agent.get("name", "unknown")
|
||||
base_url = ""
|
||||
|
||||
# Get URL from agent config
|
||||
interfaces = agent.get("supportedInterfaces", [])
|
||||
if interfaces:
|
||||
base_url = interfaces[0].get("url", "")
|
||||
if not base_url:
|
||||
base_url = agent.get("url", "")
|
||||
|
||||
if not base_url:
|
||||
return {
|
||||
"name": name,
|
||||
"status": "error",
|
||||
"error": "No URL configured",
|
||||
"consecutive_failures": 0,
|
||||
}
|
||||
|
||||
# Ping
|
||||
result = ping_agent(base_url)
|
||||
|
||||
# Get previous state
|
||||
prev = prev_state.get("agents", {}).get(name, {})
|
||||
prev_failures = prev.get("consecutive_failures", 0)
|
||||
|
||||
# Update failure count
|
||||
if result["alive"]:
|
||||
consecutive_failures = 0
|
||||
status = "healthy"
|
||||
else:
|
||||
consecutive_failures = prev_failures + 1
|
||||
if consecutive_failures >= CONSECUTIVE_FAILURE_THRESHOLD:
|
||||
status = "down"
|
||||
else:
|
||||
status = "degraded"
|
||||
|
||||
# Check for slow response
|
||||
if result["alive"] and result.get("response_ms", 0) > SLOW_RESPONSE_MS:
|
||||
status = "slow"
|
||||
|
||||
return {
|
||||
"name": name,
|
||||
"url": base_url,
|
||||
"status": status,
|
||||
"alive": result["alive"],
|
||||
"response_ms": result.get("response_ms"),
|
||||
"endpoint": result.get("endpoint"),
|
||||
"status_code": result.get("status_code"),
|
||||
"agent_card": result.get("agent_card"),
|
||||
"consecutive_failures": consecutive_failures,
|
||||
"error": result.get("error"),
|
||||
"checked_at": time.strftime("%Y-%m-%dT%H:%M:%S"),
|
||||
}
|
||||
|
||||
|
||||
def check_fleet_health(
|
||||
agent_name: Optional[str] = None,
|
||||
timeout: int = 10,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Check health of all (or one) fleet agent.
|
||||
|
||||
Returns report dict with agents list and summary.
|
||||
"""
|
||||
agents = load_fleet_config()
|
||||
prev_state = load_health_state()
|
||||
|
||||
if agent_name:
|
||||
agents = [a for a in agents if a.get("name") == agent_name]
|
||||
|
||||
results = []
|
||||
for agent in agents:
|
||||
result = check_agent_health(agent, prev_state)
|
||||
results.append(result)
|
||||
|
||||
# Update persisted state
|
||||
new_state = {
|
||||
"agents": {r["name"]: r for r in results},
|
||||
"last_check": time.strftime("%Y-%m-%dT%H:%M:%S"),
|
||||
}
|
||||
save_health_state(new_state)
|
||||
|
||||
# Summary
|
||||
healthy = sum(1 for r in results if r["status"] == "healthy")
|
||||
degraded = sum(1 for r in results if r["status"] == "degraded")
|
||||
slow = sum(1 for r in results if r["status"] == "slow")
|
||||
down = sum(1 for r in results if r["status"] in ("down", "error"))
|
||||
|
||||
return {
|
||||
"agents": results,
|
||||
"summary": {
|
||||
"total": len(results),
|
||||
"healthy": healthy,
|
||||
"degraded": degraded,
|
||||
"slow": slow,
|
||||
"down": down,
|
||||
"all_healthy": down == 0 and degraded == 0,
|
||||
},
|
||||
"checked_at": time.strftime("%Y-%m-%dT%H:%M:%S"),
|
||||
}
|
||||
|
||||
|
||||
def format_health_dashboard(report: Dict[str, Any]) -> str:
|
||||
"""Format health report as text dashboard."""
|
||||
lines = []
|
||||
summary = report["summary"]
|
||||
|
||||
# Header
|
||||
if summary["all_healthy"]:
|
||||
lines.append("\u2705 All fleet agents healthy")
|
||||
elif summary["down"] > 0:
|
||||
lines.append(f"\u274c {summary['down']} agent(s) DOWN")
|
||||
else:
|
||||
lines.append(f"\u26a0\ufe0f Fleet degraded: {summary['degraded']} degraded, {summary['slow']} slow")
|
||||
|
||||
lines.append(f"Checked: {report['checked_at']}")
|
||||
lines.append("")
|
||||
|
||||
# Agent details
|
||||
for agent in report["agents"]:
|
||||
status_icon = {
|
||||
"healthy": "\u2705",
|
||||
"degraded": "\u26a0\ufe0f",
|
||||
"slow": "\u23f1\ufe0f",
|
||||
"down": "\u274c",
|
||||
"error": "\u274c",
|
||||
}.get(agent["status"], "\u2753")
|
||||
|
||||
name = agent["name"]
|
||||
ms = agent.get("response_ms", "?")
|
||||
failures = agent.get("consecutive_failures", 0)
|
||||
|
||||
line = f" {status_icon} {name}"
|
||||
if agent.get("alive"):
|
||||
line += f" — {ms}ms"
|
||||
if agent.get("agent_card"):
|
||||
tools = agent["agent_card"].get("tools_count", 0)
|
||||
line += f" — {tools} tools"
|
||||
else:
|
||||
line += f" — {agent.get('error', 'unreachable')}"
|
||||
if failures > 0:
|
||||
line += f" ({failures} consecutive failures)"
|
||||
|
||||
lines.append(line)
|
||||
|
||||
return "\n".join(lines)
|
||||
@@ -1,92 +0,0 @@
|
||||
"""Tests for A2A mutual TLS authentication."""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from agent.a2a.mtls import (
|
||||
FleetCA,
|
||||
verify_peer,
|
||||
get_cert_info,
|
||||
generate_fleet_certs,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_ca():
|
||||
"""Create a temporary CA for testing."""
|
||||
tmp = tempfile.mkdtemp()
|
||||
ca_dir = Path(tmp) / "ca"
|
||||
ca = FleetCA.init(ca_dir)
|
||||
yield ca
|
||||
shutil.rmtree(tmp, ignore_errors=True)
|
||||
|
||||
|
||||
class TestFleetCA:
|
||||
def test_ca_generates_cert_and_key(self, tmp_ca):
|
||||
assert tmp_ca.ca_cert.exists()
|
||||
assert tmp_ca.ca_key.exists()
|
||||
|
||||
def test_ca_cert_is_ca(self, tmp_ca):
|
||||
info = get_cert_info(str(tmp_ca.ca_cert))
|
||||
assert info is not None
|
||||
assert "CA" in info.subject or "Hermes" in info.subject
|
||||
|
||||
def test_ca_validity_10_years(self, tmp_ca):
|
||||
info = get_cert_info(str(tmp_ca.ca_cert))
|
||||
assert info is not None
|
||||
assert info.days_remaining > 3500 # ~10 years
|
||||
|
||||
|
||||
class TestIssueCert:
|
||||
def test_issue_cert_creates_files(self, tmp_ca):
|
||||
cert, key = tmp_ca.issue_cert("test-agent")
|
||||
assert cert.exists()
|
||||
assert key.exists()
|
||||
|
||||
def test_cert_verifies_against_ca(self, tmp_ca):
|
||||
cert, _ = tmp_ca.issue_cert("test-agent")
|
||||
assert verify_peer(str(cert), str(tmp_ca.ca_cert))
|
||||
|
||||
def test_cert_has_agent_name(self, tmp_ca):
|
||||
cert, _ = tmp_ca.issue_cert("allegro")
|
||||
info = get_cert_info(str(cert))
|
||||
assert info is not None
|
||||
assert "allegro" in info.subject.lower()
|
||||
|
||||
def test_cert_validity_1_year(self, tmp_ca):
|
||||
cert, _ = tmp_ca.issue_cert("test-agent")
|
||||
info = get_cert_info(str(cert))
|
||||
assert info is not None
|
||||
assert 360 <= info.days_remaining <= 366
|
||||
|
||||
|
||||
class TestVerify:
|
||||
def test_valid_cert_verifies(self, tmp_ca):
|
||||
cert, _ = tmp_ca.issue_cert("test-agent")
|
||||
assert verify_peer(str(cert), str(tmp_ca.ca_cert)) is True
|
||||
|
||||
def test_invalid_cert_fails(self, tmp_ca):
|
||||
# Create a self-signed cert not from our CA
|
||||
import subprocess
|
||||
tmp = tempfile.mktemp(suffix=".pem")
|
||||
subprocess.run(["openssl", "req", "-x509", "-newkey", "rsa:2048",
|
||||
"-keyout", "/dev/null", "-out", tmp, "-days", "1",
|
||||
"-subj", "/CN=imposter", "-nodes"],
|
||||
capture_output=True)
|
||||
assert verify_peer(tmp, str(tmp_ca.ca_cert)) is False
|
||||
os.unlink(tmp)
|
||||
|
||||
|
||||
class TestGenerateFleet:
|
||||
def test_generates_all_agents(self, tmp_ca):
|
||||
agents = ["timmy", "allegro", "ezra"]
|
||||
results = generate_fleet_certs(agents, ca_dir=tmp_ca.ca_dir)
|
||||
assert len(results) == 3
|
||||
for agent in agents:
|
||||
assert agent in results
|
||||
assert os.path.exists(results[agent][0])
|
||||
assert os.path.exists(results[agent][1])
|
||||
80
tests/test_a2a_health.py
Normal file
80
tests/test_a2a_health.py
Normal file
@@ -0,0 +1,80 @@
|
||||
"""Tests for A2A health monitor (#822)."""
|
||||
|
||||
import sys
|
||||
import json
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from hermes_cli.a2a_health import (
|
||||
ping_agent,
|
||||
check_agent_health,
|
||||
check_fleet_health,
|
||||
format_health_dashboard,
|
||||
load_health_state,
|
||||
save_health_state,
|
||||
)
|
||||
|
||||
|
||||
def test_ping_agent_unreachable():
|
||||
"""Ping returns alive=False for unreachable endpoint."""
|
||||
result = ping_agent("http://192.0.2.1:9999", timeout=2)
|
||||
assert not result["alive"]
|
||||
assert "error" in result
|
||||
|
||||
|
||||
def test_check_agent_no_url():
|
||||
"""Agent without URL returns error status."""
|
||||
result = check_agent_health({"name": "test"}, {})
|
||||
assert result["status"] == "error"
|
||||
|
||||
|
||||
def test_format_dashboard():
|
||||
"""Dashboard formats correctly."""
|
||||
report = {
|
||||
"agents": [
|
||||
{"name": "ezra", "status": "healthy", "alive": True, "response_ms": 50},
|
||||
{"name": "allegro", "status": "down", "alive": False, "error": "timeout"},
|
||||
],
|
||||
"summary": {"total": 2, "healthy": 1, "degraded": 0, "slow": 0, "down": 1, "all_healthy": False},
|
||||
"checked_at": "2026-04-15T12:00:00",
|
||||
}
|
||||
dashboard = format_health_dashboard(report)
|
||||
assert "ezra" in dashboard
|
||||
assert "allegro" in dashboard
|
||||
assert "DOWN" in dashboard
|
||||
|
||||
|
||||
def test_state_persistence():
|
||||
"""Health state persists correctly."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
state_file = Path(tmpdir) / "health.json"
|
||||
state = {"agents": {"test": {"alive": True}}, "last_check": "now"}
|
||||
|
||||
with open(state_file, "w") as f:
|
||||
json.dump(state, f)
|
||||
|
||||
with open(state_file) as f:
|
||||
loaded = json.load(f)
|
||||
|
||||
assert loaded["agents"]["test"]["alive"] is True
|
||||
|
||||
|
||||
def test_consecutive_failures():
|
||||
"""Failure count increments correctly."""
|
||||
prev = {"agents": {"test": {"consecutive_failures": 2}}}
|
||||
agent = {"name": "test", "url": "http://192.0.2.1:9999"}
|
||||
result = check_agent_health(agent, prev)
|
||||
assert result["consecutive_failures"] == 3
|
||||
assert result["status"] == "down"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
tests = [test_ping_agent_unreachable, test_check_agent_no_url,
|
||||
test_format_dashboard, test_state_persistence, test_consecutive_failures]
|
||||
for t in tests:
|
||||
print(f"Running {t.__name__}...")
|
||||
t()
|
||||
print(" PASS")
|
||||
print("\nAll tests passed.")
|
||||
Reference in New Issue
Block a user