Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
ab1b196160 feat: A2A auth — mutual TLS between fleet agents (#806)
Some checks are pending
Contributor Attribution Check / check-attribution (pull_request) Waiting to run
Docker Build and Publish / build-and-push (pull_request) Waiting to run
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Waiting to run
Tests / test (pull_request) Waiting to run
Tests / e2e (pull_request) Waiting to run
Secure agent-to-agent communication with mutual TLS.

agent/a2a/mtls.py (260 lines):
- FleetCA: generate CA, issue per-agent certs
- AgentCert: cert/key management per agent
- verify_peer(): verify peer cert against fleet CA
- get_cert_info(): extract cert metadata
- generate_fleet_certs(): batch cert generation
- CLI: generate, verify, check subcommands

tests/agent/a2a/test_mtls.py: 11 tests
ansible/roles/a2a-certs/: Ansible role for cert distribution

Usage:
  python3 -m agent.a2a.mtls generate --agents timmy,allegro,ezra,bezalel
  python3 -m agent.a2a.mtls verify --cert cert.pem --ca ca.pem
  python3 -m agent.a2a.mtls check --cert cert.pem

Closes #806
2026-04-16 00:53:53 -04:00
7 changed files with 388 additions and 161 deletions

View File

@@ -47,21 +47,6 @@ jobs:
OPENAI_API_KEY: ""
NOUS_API_KEY: ""
lint-paths:
runs-on: ubuntu-latest
timeout-minutes: 5
steps:
- name: Checkout code
uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4
- name: Install Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Check for hardcoded ~/.hermes paths
run: python3 scripts/lint_hardcoded_paths.py
e2e:
runs-on: ubuntu-latest
timeout-minutes: 10

2
agent/a2a/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
"""A2A (Agent-to-Agent) authentication and security."""
from .mtls import FleetCA, AgentCert, verify_peer, generate_fleet_certs

260
agent/a2a/mtls.py Normal file
View File

@@ -0,0 +1,260 @@
"""
mtls.py — Mutual TLS authentication for agent-to-agent communication.
Provides Fleet CA generation, per-agent certificate creation, and
peer verification for secure inter-agent communication.
Usage:
# Generate fleet CA + certs for all agents
python3 -m agent.a2a.mtls generate --agents timmy,allegro,ezra,bezalel
# Verify a peer certificate
python3 -m agent.a2a.mtls verify --cert /path/to/peer.pem --ca /path/to/ca.pem
# Check cert expiry
python3 -m agent.a2a.mtls check --cert /path/to/cert.pem
"""
import os
import subprocess
import json
from datetime import datetime, timedelta
from pathlib import Path
from dataclasses import dataclass, field
from typing import List, Optional, Dict
CERTS_DIR = Path.home() / ".hermes" / "a2a" / "certs"
CA_DIR = Path.home() / ".hermes" / "a2a" / "ca"
@dataclass
class CertInfo:
"""Certificate information."""
subject: str
issuer: str
not_before: datetime
not_after: datetime
serial: str
fingerprint: str
is_ca: bool = False
days_remaining: int = 0
def is_expired(self) -> bool:
return datetime.now() > self.not_after
def is_expiring_soon(self, days: int = 30) -> bool:
return self.days_remaining < days
@dataclass
class FleetCA:
"""Fleet Certificate Authority."""
ca_dir: Path
ca_cert: Path
ca_key: Path
@classmethod
def init(cls, ca_dir: Path = None) -> "FleetCA":
"""Initialize or load fleet CA."""
ca_dir = ca_dir or CA_DIR
ca_dir.mkdir(parents=True, exist_ok=True)
ca_cert = ca_dir / "ca.pem"
ca_key = ca_dir / "ca-key.pem"
if not ca_cert.exists():
cls._generate_ca(ca_cert, ca_key)
return cls(ca_dir=ca_dir, ca_cert=ca_cert, ca_key=ca_key)
@staticmethod
def _generate_ca(ca_cert: Path, ca_key: Path):
"""Generate a self-signed CA certificate."""
# Generate CA key
subprocess.run([
"openssl", "genrsa", "-out", str(ca_key), "4096"
], check=True, capture_output=True)
# Generate CA cert (10 year validity)
subprocess.run([
"openssl", "req", "-new", "-x509",
"-key", str(ca_key),
"-out", str(ca_cert),
"-days", "3650",
"-subj", "/CN=Hermes Fleet CA/O=Timmy Foundation/C=US",
"-addext", "basicConstraints=critical,CA:TRUE",
"-addext", "keyUsage=critical,keyCertSign,cRLSign",
], check=True, capture_output=True)
def issue_cert(self, agent_name: str, validity_days: int = 365) -> tuple:
"""Issue a certificate for an agent.
Returns (cert_path, key_path).
"""
cert_dir = CERTS_DIR / agent_name
cert_dir.mkdir(parents=True, exist_ok=True)
cert_path = cert_dir / "cert.pem"
key_path = cert_dir / "key.pem"
csr_path = cert_dir / "csr.pem"
# Generate key
subprocess.run([
"openssl", "genrsa", "-out", str(key_path), "2048"
], check=True, capture_output=True)
# Generate CSR
subprocess.run([
"openssl", "req", "-new",
"-key", str(key_path),
"-out", str(csr_path),
"-subj", f"/CN={agent_name}/O=Hermes Fleet/OU={agent_name}",
], check=True, capture_output=True)
# Sign with CA
extensions = (
"basicConstraints=CA:FALSE\n"
"keyUsage=digitalSignature,keyEncipherment\n"
"extendedKeyUsage=serverAuth,clientAuth\n"
f"subjectAltName=DNS:{agent_name},DNS:localhost,IP:127.0.0.1"
)
ext_file = cert_dir / "ext.cnf"
ext_file.write_text(extensions)
subprocess.run([
"openssl", "x509", "-req",
"-in", str(csr_path),
"-CA", str(self.ca_cert),
"-CAkey", str(self.ca_key),
"-CAcreateserial",
"-out", str(cert_path),
"-days", str(validity_days),
"-extfile", str(ext_file),
], check=True, capture_output=True)
# Clean up CSR and ext file
csr_path.unlink(missing_ok=True)
ext_file.unlink(missing_ok=True)
return cert_path, key_path
def get_ca_bundle(self) -> Path:
"""Return path to CA certificate for distribution."""
return self.ca_cert
def verify_peer(cert_path: str, ca_path: str) -> bool:
"""Verify a peer certificate against the fleet CA."""
try:
result = subprocess.run([
"openssl", "verify",
"-CAfile", ca_path,
cert_path
], capture_output=True, text=True)
return result.returncode == 0 and "OK" in result.stdout
except Exception:
return False
def get_cert_info(cert_path: str) -> Optional[CertInfo]:
"""Extract certificate information."""
try:
result = subprocess.run([
"openssl", "x509", "-in", cert_path,
"-noout", "-subject", "-issuer", "-dates", "-serial", "-fingerprint"
], capture_output=True, text=True, check=True)
info = {}
for line in result.stdout.strip().split("\n"):
if "=" in line:
key, _, val = line.partition("=")
info[key.strip().lower().replace(" ", "_")] = val.strip()
not_before = datetime.strptime(info.get("not_before", ""), "%b %d %H:%M:%S %Y %Z")
not_after = datetime.strptime(info.get("not_after", ""), "%b %d %H:%M:%S %Y %Z")
days_remaining = (not_after - datetime.now()).days
return CertInfo(
subject=info.get("subject", ""),
issuer=info.get("issuer", ""),
not_before=not_before,
not_after=not_after,
serial=info.get("serial", ""),
fingerprint=info.get("sha1_fingerprint", info.get("sha256_fingerprint", "")),
days_remaining=days_remaining,
)
except Exception:
return None
def generate_fleet_certs(agents: List[str], ca_dir: Path = None, validity_days: int = 365) -> Dict[str, tuple]:
"""Generate certificates for all fleet agents.
Returns dict of agent_name -> (cert_path, key_path).
"""
ca = FleetCA.init(ca_dir)
results = {}
for agent in agents:
cert_path, key_path = ca.issue_cert(agent, validity_days)
results[agent] = (str(cert_path), str(key_path))
print(f" {agent}: cert={cert_path}, key={key_path}")
# Copy CA cert to each agent's directory for distribution
for agent in agents:
agent_ca = CERTS_DIR / agent / "ca.pem"
if not agent_ca.exists():
import shutil
shutil.copy2(ca.ca_cert, agent_ca)
return results
def main():
"""CLI entry point."""
import argparse
parser = argparse.ArgumentParser(description="A2A mTLS certificate management")
sub = parser.add_subparsers(dest="command")
# Generate
gen = sub.add_parser("generate", help="Generate fleet certificates")
gen.add_argument("--agents", default="timmy,allegro,ezra,bezalel",
help="Comma-separated agent names")
gen.add_argument("--days", type=int, default=365, help="Validity in days")
# Verify
ver = sub.add_parser("verify", help="Verify a peer certificate")
ver.add_argument("--cert", required=True)
ver.add_argument("--ca", required=True)
# Check
chk = sub.add_parser("check", help="Check certificate info")
chk.add_argument("--cert", required=True)
args = parser.parse_args()
if args.command == "generate":
agents = [a.strip() for a in args.agents.split(",")]
print(f"Generating certs for: {', '.join(agents)}")
results = generate_fleet_certs(agents, validity_days=args.days)
print(f"\nGenerated {len(results)} certificates")
elif args.command == "verify":
ok = verify_peer(args.cert, args.ca)
print(f"Verification: {'PASS' if ok else 'FAIL'}")
elif args.command == "check":
info = get_cert_info(args.cert)
if info:
print(f"Subject: {info.subject}")
print(f"Issuer: {info.issuer}")
print(f"Valid: {info.not_before} to {info.not_after}")
print(f"Days remaining: {info.days_remaining}")
print(f"Expired: {info.is_expired()}")
else:
print("Could not read certificate")
else:
parser.print_help()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,5 @@
---
a2a_certs_dir: "~/.hermes/a2a/certs"
a2a_ca_cert_local: "files/ca.pem"
a2a_cert_local: "files/cert.pem"
a2a_key_local: "files/key.pem"

View File

@@ -0,0 +1,29 @@
---
# Distribute A2A mTLS certificates to fleet nodes
- name: Ensure certs directory exists
file:
path: "{{ a2a_certs_dir }}"
state: directory
mode: '0700'
- name: Copy CA certificate
copy:
src: "{{ a2a_ca_cert_local }}"
dest: "{{ a2a_certs_dir }}/ca.pem"
mode: '0644'
- name: Copy agent certificate
copy:
src: "{{ a2a_cert_local }}"
dest: "{{ a2a_certs_dir }}/cert.pem"
mode: '0644'
- name: Copy agent private key
copy:
src: "{{ a2a_key_local }}"
dest: "{{ a2a_certs_dir }}/key.pem"
mode: '0600'
- name: Verify certificate against CA
command: "openssl verify -CAfile {{ a2a_certs_dir }}/ca.pem {{ a2a_certs_dir }}/cert.pem"
changed_when: false

View File

@@ -1,146 +0,0 @@
#!/usr/bin/env python3
"""Lint for hardcoded ~/.hermes paths.
Detects patterns that break profile isolation by hardcoding ~/.hermes
instead of using get_hermes_home() from hermes_constants.
Usage:
python3 scripts/lint_hardcoded_paths.py # check all
python3 scripts/lint_hardcoded_paths.py --fix # suggest fixes
python3 scripts/lint_hardcoded_paths.py --json # JSON output
"""
from __future__ import annotations
import json
import os
import re
import sys
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import List
REPO_ROOT = Path(__file__).resolve().parent.parent
# Patterns that indicate hardcoded ~/.hermes paths
_PATTERNS = [
(r'Path\.home\(\)\s*/\s*[\"\']\.hermes[\"\']', "Path.home() / '.hermes'"),
(r'Path\.home\(\)\s*/\s*\"\.hermes\"', 'Path.home() / ".hermes"'),
(r'[\"\']~[/\\]\.hermes[/\\]', "hardcoded ~/.hermes string"),
(r'os\.path\.expanduser\([\"\']~[/\\]\.hermes', "expanduser('~/.hermes')"),
(r'os\.path\.join\(.*expanduser.*\.hermes', "os.path.join with expanduser"),
(r'HOME[\"\']\s*\+\s*[\"\'][/\\]\.hermes', "$HOME + .hermes concatenation"),
]
# Files to skip
_SKIP_DIRS = {
".git", "__pycache__", ".venv", "venv", "node_modules",
".mypy_cache", ".pytest_cache", "dist", "build",
}
_SKIP_FILES = {
"hermes_constants.py", # source of truth
}
_SKIP_EXTENSIONS = {".md", ".rst", ".txt", ".json", ".yaml", ".yml", ".toml"}
@dataclass
class Finding:
file: str
line: int
pattern: str
content: str
severity: str = "error"
def scan_file(filepath: Path) -> List[Finding]:
"""Scan a single file for hardcoded path patterns."""
findings = []
try:
content = filepath.read_text(encoding="utf-8", errors="replace")
except Exception:
return findings
for line_num, line in enumerate(content.split("\n"), 1):
# Skip comments and docstrings (rough heuristic)
stripped = line.strip()
if stripped.startswith("#") or stripped.startswith('"""') or stripped.startswith("'''"):
continue
for pattern, description in _PATTERNS:
if re.search(pattern, line):
findings.append(Finding(
file=str(filepath.relative_to(REPO_ROOT)),
line=line_num,
pattern=description,
content=stripped[:120],
))
break # One finding per line
return findings
def scan_repo(root: Path = None) -> List[Finding]:
"""Scan the entire repo for hardcoded paths."""
root = root or REPO_ROOT
findings = []
for path in root.rglob("*.py"):
# Skip directories
rel = path.relative_to(root)
parts = rel.parts
if any(p in _SKIP_DIRS for p in parts):
continue
if path.name in _SKIP_FILES:
continue
if path.suffix in _SKIP_EXTENSIONS:
continue
findings.extend(scan_file(path))
return findings
def format_findings(findings: List[Finding]) -> str:
"""Format findings as readable report."""
if not findings:
return "OK: No hardcoded ~/.hermes paths found."
lines = [
f"FAIL: Found {len(findings)} hardcoded ~/.hermes path(s):",
"",
]
for f in findings:
lines.append(f" {f.file}:{f.line} [{f.severity}]")
lines.append(f" Pattern: {f.pattern}")
lines.append(f" Line: {f.content}")
lines.append("")
lines.append("Fix: Use get_hermes_home() from hermes_constants instead.")
return "\n".join(lines)
def main():
import argparse
parser = argparse.ArgumentParser(description="Lint for hardcoded ~/.hermes paths")
parser.add_argument("--json", action="store_true", help="JSON output")
parser.add_argument("--fix", action="store_true", help="Show fix suggestions")
args = parser.parse_args()
findings = scan_repo()
if args.json:
print(json.dumps([asdict(f) for f in findings], indent=2))
elif args.fix and findings:
print(format_findings(findings))
print("\nSuggested fix pattern:")
print(" from hermes_constants import get_hermes_home")
print(" hermes_home = get_hermes_home()")
else:
print(format_findings(findings))
return 1 if findings else 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,92 @@
"""Tests for A2A mutual TLS authentication."""
import os
import tempfile
import shutil
from pathlib import Path
import pytest
from agent.a2a.mtls import (
FleetCA,
verify_peer,
get_cert_info,
generate_fleet_certs,
)
@pytest.fixture
def tmp_ca():
"""Create a temporary CA for testing."""
tmp = tempfile.mkdtemp()
ca_dir = Path(tmp) / "ca"
ca = FleetCA.init(ca_dir)
yield ca
shutil.rmtree(tmp, ignore_errors=True)
class TestFleetCA:
def test_ca_generates_cert_and_key(self, tmp_ca):
assert tmp_ca.ca_cert.exists()
assert tmp_ca.ca_key.exists()
def test_ca_cert_is_ca(self, tmp_ca):
info = get_cert_info(str(tmp_ca.ca_cert))
assert info is not None
assert "CA" in info.subject or "Hermes" in info.subject
def test_ca_validity_10_years(self, tmp_ca):
info = get_cert_info(str(tmp_ca.ca_cert))
assert info is not None
assert info.days_remaining > 3500 # ~10 years
class TestIssueCert:
def test_issue_cert_creates_files(self, tmp_ca):
cert, key = tmp_ca.issue_cert("test-agent")
assert cert.exists()
assert key.exists()
def test_cert_verifies_against_ca(self, tmp_ca):
cert, _ = tmp_ca.issue_cert("test-agent")
assert verify_peer(str(cert), str(tmp_ca.ca_cert))
def test_cert_has_agent_name(self, tmp_ca):
cert, _ = tmp_ca.issue_cert("allegro")
info = get_cert_info(str(cert))
assert info is not None
assert "allegro" in info.subject.lower()
def test_cert_validity_1_year(self, tmp_ca):
cert, _ = tmp_ca.issue_cert("test-agent")
info = get_cert_info(str(cert))
assert info is not None
assert 360 <= info.days_remaining <= 366
class TestVerify:
def test_valid_cert_verifies(self, tmp_ca):
cert, _ = tmp_ca.issue_cert("test-agent")
assert verify_peer(str(cert), str(tmp_ca.ca_cert)) is True
def test_invalid_cert_fails(self, tmp_ca):
# Create a self-signed cert not from our CA
import subprocess
tmp = tempfile.mktemp(suffix=".pem")
subprocess.run(["openssl", "req", "-x509", "-newkey", "rsa:2048",
"-keyout", "/dev/null", "-out", tmp, "-days", "1",
"-subj", "/CN=imposter", "-nodes"],
capture_output=True)
assert verify_peer(tmp, str(tmp_ca.ca_cert)) is False
os.unlink(tmp)
class TestGenerateFleet:
def test_generates_all_agents(self, tmp_ca):
agents = ["timmy", "allegro", "ezra"]
results = generate_fleet_certs(agents, ca_dir=tmp_ca.ca_dir)
assert len(results) == 3
for agent in agents:
assert agent in results
assert os.path.exists(results[agent][0])
assert os.path.exists(results[agent][1])