Compare commits

...

5 Commits

Author SHA1 Message Date
3659c2c57d feat: mTLS server for A2A (#806) 2026-04-15 16:28:36 +00:00
7331846f87 feat: Fleet CA for mTLS (#806) 2026-04-15 16:28:33 +00:00
6a460857bf test: A2A mTLS tests
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Contributor Attribution Check / check-attribution (pull_request) Failing after 26s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 27s
Tests / e2e (pull_request) Successful in 2m31s
Tests / test (pull_request) Failing after 41m50s
Part of #806
2026-04-15 16:18:14 +00:00
9446db5ee7 feat: A2A mutual TLS verification module
Part of #806
2026-04-15 16:18:12 +00:00
301b8c296b feat: Fleet CA and agent cert generator
Part of #806
2026-04-15 16:18:08 +00:00
5 changed files with 542 additions and 0 deletions

165
agent/a2a_mtls.py Normal file
View File

@@ -0,0 +1,165 @@
"""
A2A Mutual TLS Verification — #806
Provides TLS context factories for mTLS-secured agent-to-agent communication.
Each agent presents its cert, the server verifies against the Fleet CA.
Usage:
from agent.a2a_mtls import get_server_ssl_context, get_client_ssl_context
# Server side (A2A server)
ssl_ctx = get_server_ssl_context(
cert_file="/path/to/agent.crt",
key_file="/path/to/agent.key",
ca_file="/path/to/fleet-ca.crt",
)
# Client side (A2A client)
ssl_ctx = get_client_ssl_context(
cert_file="/path/to/agent.crt",
key_file="/path/to/agent.key",
ca_file="/path/to/fleet-ca.crt",
)
"""
import os
import ssl
from pathlib import Path
from typing import Optional
# Default paths
DEFAULT_CERTS_DIR = Path(os.getenv("FLEET_CERTS_DIR", str(Path.home() / ".hermes" / "fleet-certs")))
def get_server_ssl_context(
cert_file: Optional[str] = None,
key_file: Optional[str] = None,
ca_file: Optional[str] = None,
agent_name: Optional[str] = None,
) -> ssl.SSLContext:
"""
Create SSL context for mTLS server.
Requires client certificate verification.
"""
if agent_name and not cert_file:
cert_file = str(DEFAULT_CERTS_DIR / agent_name / f"{agent_name}.crt")
key_file = str(DEFAULT_CERTS_DIR / agent_name / f"{agent_name}.key")
ca_file = str(DEFAULT_CERTS_DIR / agent_name / "fleet-ca.crt")
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
# Load server cert and key
ctx.load_cert_chain(certfile=cert_file, keyfile=key_file)
# Require client certificate
ctx.verify_mode = ssl.CERT_REQUIRED
# Load CA for verifying client certs
ctx.load_verify_locations(cafile=ca_file)
# Security settings
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
ctx.check_hostname = False # Internal fleet, not public DNS
return ctx
def get_client_ssl_context(
cert_file: Optional[str] = None,
key_file: Optional[str] = None,
ca_file: Optional[str] = None,
agent_name: Optional[str] = None,
) -> ssl.SSLContext:
"""
Create SSL context for mTLS client.
Presents client certificate for server verification.
"""
if agent_name and not cert_file:
cert_file = str(DEFAULT_CERTS_DIR / agent_name / f"{agent_name}.crt")
key_file = str(DEFAULT_CERTS_DIR / agent_name / f"{agent_name}.key")
ca_file = str(DEFAULT_CERTS_DIR / agent_name / "fleet-ca.crt")
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
# Load client cert and key
ctx.load_cert_chain(certfile=cert_file, keyfile=key_file)
# Load CA for verifying server cert
ctx.load_verify_locations(cafile=ca_file)
# Security settings
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
ctx.check_hostname = False # Internal fleet
return ctx
def verify_agent_cert(cert_pem: str, ca_file: Optional[str] = None) -> tuple[bool, str]:
"""
Verify an agent certificate against the Fleet CA.
Returns (valid, subject_cn).
"""
if ca_file is None:
ca_file = str(DEFAULT_CERTS_DIR / "fleet-ca.crt")
try:
from cryptography import x509
from cryptography.x509.verification import PolicyBuilder, Store
cert = x509.load_pem_x509_certificate(cert_pem.encode() if isinstance(cert_pem, str) else cert_pem)
with open(ca_file, "rb") as f:
ca_cert = x509.load_pem_x509_certificate(f.read())
store = Store([ca_cert])
builder = PolicyBuilder().store(store)
verifier = builder.build_server_verifier(x509.DNSName("fleet.local"))
try:
verifier.verify(cert, [cert])
cn = cert.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)[0].value
return True, cn
except Exception as e:
return False, str(e)
except ImportError:
# Fallback: basic validation
try:
from cryptography import x509
cert = x509.load_pem_x509_certificate(cert_pem.encode() if isinstance(cert_pem, str) else cert_pem)
with open(ca_file, "rb") as f:
ca_cert = x509.load_pem_x509_certificate(f.read())
# Check issuer matches CA subject
if cert.issuer == ca_cert.subject:
cn = cert.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)[0].value
return True, cn
return False, "Issuer mismatch"
except Exception as e:
return False, str(e)
def get_agent_cn_from_context(ssl_context: ssl.SSLContext) -> Optional[str]:
"""
Extract agent Common Name from an SSL context's peer certificate.
Used by the server to identify which agent is connecting.
"""
try:
peer_cert = ssl_context.getpeercert(binary_form=True)
if peer_cert:
from cryptography import x509
cert = x509.load_der_x509_certificate(peer_cert)
cn_attrs = cert.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)
if cn_attrs:
cn = cn_attrs[0].value
# Strip "agent-" prefix if present
if cn.startswith("agent-"):
return cn[6:]
return cn
except Exception:
pass
return None

View File

@@ -0,0 +1,206 @@
#!/usr/bin/env python3
"""
Fleet CA and Agent Certificate Generator — #806
Generates a Fleet CA and per-agent TLS certificates for mutual TLS
authentication between fleet agents.
Usage:
# Generate Fleet CA
python scripts/generate_fleet_ca.py --ca-dir ./fleet-ca
# Generate agent cert
python scripts/generate_fleet_ca.py --ca-dir ./fleet-ca --agent timmy
python scripts/generate_fleet_ca.py --ca-dir ./fleet-ca --agent allegro
python scripts/generate_fleet_ca.py --ca-dir ./fleet-ca --agent ezra
# Generate all fleet certs
python scripts/generate_fleet_ca.py --ca-dir ./fleet-ca --all
"""
import argparse
import datetime
import os
import sys
from pathlib import Path
try:
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
HAS_CRYPTO = True
except ImportError:
HAS_CRYPTO = False
FLEET_AGENTS = ["timmy", "allegro", "ezra", "bezalel"]
CA_VALIDITY_DAYS = 3650 # 10 years
CERT_VALIDITY_DAYS = 365 # 1 year
KEY_SIZE = 2048
def generate_ca(ca_dir: Path) -> tuple:
"""Generate Fleet CA key and certificate."""
ca_dir.mkdir(parents=True, exist_ok=True)
# Generate CA key
ca_key = rsa.generate_private_key(public_exponent=65537, key_size=KEY_SIZE)
# Generate CA cert
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Timmy Foundation"),
x509.NameAttribute(NameOID.COMMON_NAME, "Fleet CA"),
])
ca_cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(ca_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.utcnow())
.not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=CA_VALIDITY_DAYS))
.add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True)
.add_extension(
x509.KeyUsage(
digital_signature=True, key_cert_sign=True, crl_sign=True,
content_commitment=False, key_encipherment=False,
data_encipherment=False, key_agreement=False,
encipher_only=False, decipher_only=False,
),
critical=True,
)
.sign(ca_key, hashes.SHA256())
)
# Save
ca_key_path = ca_dir / "fleet-ca.key"
ca_cert_path = ca_dir / "fleet-ca.crt"
ca_key_path.write_bytes(ca_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
))
ca_cert_path.write_bytes(ca_cert.public_bytes(serialization.Encoding.PEM))
# Secure permissions
os.chmod(ca_key_path, 0o600)
os.chmod(ca_cert_path, 0o644)
print(f"CA key: {ca_key_path}")
print(f"CA cert: {ca_cert_path}")
return ca_key, ca_cert
def generate_agent_cert(ca_dir: Path, agent_name: str, ca_key=None, ca_cert=None) -> tuple:
"""Generate TLS certificate for an agent."""
agent_dir = ca_dir / agent_name
agent_dir.mkdir(parents=True, exist_ok=True)
# Load CA if not provided
if ca_key is None or ca_cert is None:
ca_key_path = ca_dir / "fleet-ca.key"
ca_cert_path = ca_dir / "fleet-ca.crt"
if not ca_key_path.exists() or not ca_cert_path.exists():
print(f"Error: CA not found in {ca_dir}. Run --ca first.")
return None, None
with open(ca_key_path, "rb") as f:
ca_key = serialization.load_pem_private_key(f.read(), password=None)
with open(ca_cert_path, "rb") as f:
ca_cert = x509.load_pem_x509_certificate(f.read())
# Generate agent key
agent_key = rsa.generate_private_key(public_exponent=65537, key_size=KEY_SIZE)
# Generate agent cert
subject = x509.Name([
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Timmy Foundation"),
x509.NameAttribute(NameOID.COMMON_NAME, f"agent-{agent_name}"),
])
agent_cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(ca_cert.subject)
.public_key(agent_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.utcnow())
.not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=CERT_VALIDITY_DAYS))
.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True)
.add_extension(
x509.SubjectAlternativeName([
x509.DNSName(f"{agent_name}.fleet.local"),
x509.DNSName(f"{agent_name}"),
x509.DNSName("localhost"),
]),
critical=False,
)
.sign(ca_key, hashes.SHA256())
)
# Save
key_path = agent_dir / f"{agent_name}.key"
cert_path = agent_dir / f"{agent_name}.crt"
key_path.write_bytes(agent_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
))
cert_path.write_bytes(agent_cert.public_bytes(serialization.Encoding.PEM))
# Copy CA cert to agent dir
ca_copy = agent_dir / "fleet-ca.crt"
ca_copy.write_bytes(ca_cert.public_bytes(serialization.Encoding.PEM))
# Secure permissions
os.chmod(key_path, 0o600)
os.chmod(cert_path, 0o644)
print(f"Agent {agent_name}:")
print(f" Key: {key_path}")
print(f" Cert: {cert_path}")
print(f" CA: {ca_copy}")
return agent_key, agent_cert
def main():
parser = argparse.ArgumentParser(description="Fleet CA and Agent Certificate Generator")
parser.add_argument("--ca-dir", type=Path, default=Path("./fleet-ca"), help="CA directory")
parser.add_argument("--ca", action="store_true", help="Generate Fleet CA")
parser.add_argument("--agent", type=str, help="Generate cert for agent")
parser.add_argument("--all", action="store_true", help="Generate certs for all fleet agents")
args = parser.parse_args()
if not HAS_CRYPTO:
print("Error: cryptography package required. pip install cryptography")
sys.exit(1)
if args.ca:
generate_ca(args.ca_dir)
if args.agent:
generate_agent_cert(args.ca_dir, args.agent)
if args.all:
# Generate CA first if not exists
ca_key_path = args.ca_dir / "fleet-ca.key"
if not ca_key_path.exists():
ca_key, ca_cert = generate_ca(args.ca_dir)
else:
ca_key, ca_cert = None, None
for agent in FLEET_AGENTS:
generate_agent_cert(args.ca_dir, agent, ca_key, ca_cert)
if not args.ca and not args.agent and not args.all:
parser.print_help()
if __name__ == "__main__":
main()

60
tests/test_a2a_mtls.py Normal file
View File

@@ -0,0 +1,60 @@
"""Tests for A2A mutual TLS (#806)."""
import sys
import tempfile
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
def test_import():
"""Module imports cleanly."""
from agent.a2a_mtls import get_server_ssl_context, get_client_ssl_context, verify_agent_cert
assert callable(get_server_ssl_context)
assert callable(get_client_ssl_context)
assert callable(verify_agent_cert)
def test_default_paths():
"""Default cert paths resolve correctly."""
from agent.a2a_mtls import DEFAULT_CERTS_DIR
assert DEFAULT_CERTS_DIR is not None
assert "fleet-certs" in str(DEFAULT_CERTS_DIR)
def test_server_context_creation():
"""Server SSL context can be created with agent name."""
# This will fail if certs don't exist, which is expected
from agent.a2a_mtls import get_server_ssl_context
try:
ctx = get_server_ssl_context(agent_name="timmy")
assert ctx is not None
except FileNotFoundError:
pass # Expected when certs don't exist
def test_client_context_creation():
"""Client SSL context can be created with agent name."""
from agent.a2a_mtls import get_client_ssl_context
try:
ctx = get_client_ssl_context(agent_name="timmy")
assert ctx is not None
except FileNotFoundError:
pass # Expected when certs don't exist
def test_verify_agent_cert_invalid():
"""Invalid cert returns False."""
from agent.a2a_mtls import verify_agent_cert
valid, msg = verify_agent_cert("not a cert")
assert not valid
if __name__ == "__main__":
tests = [test_import, test_default_paths, test_server_context_creation,
test_client_context_creation, test_verify_agent_cert_invalid]
for t in tests:
print(f"Running {t.__name__}...")
t()
print(" PASS")
print("\nAll tests passed.")

71
tools/fleet_ca.py Normal file
View File

@@ -0,0 +1,71 @@
"""Fleet CA for agent-to-agent mTLS (#806)."""
import argparse, datetime, os, sys
from pathlib import Path
def init_ca(output_dir, ca_name="Timmy Fleet CA", days=3650):
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
out = Path(output_dir); out.mkdir(parents=True, exist_ok=True)
ca_key = rsa.generate_private_key(65537, 4096)
ca_cert = (x509.CertificateBuilder()
.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, ca_name)]))
.issuer_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, ca_name)]))
.public_key(ca_key.public_key()).serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.utcnow())
.not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=days))
.add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True)
.sign(ca_key, hashes.SHA256()))
with open(out/"ca.key","wb") as f: f.write(ca_key.private_bytes(serialization.Encoding.PEM, serialization.PrivateFormat.TraditionalOpenSSL, serialization.NoEncryption()))
os.chmod(out/"ca.key", 0o600)
with open(out/"ca.crt","wb") as f: f.write(ca_cert.public_bytes(serialization.Encoding.PEM))
print(f"CA created: {out}/ca.crt")
def issue_cert(agent_name, output_dir, days=365):
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
out = Path(output_dir)
with open(out/"ca.key","rb") as f: ca_key = serialization.load_pem_private_key(f.read(), None)
with open(out/"ca.crt","rb") as f: ca_cert = x509.load_pem_x509_certificate(f.read())
key = rsa.generate_private_key(65537, 2048)
cert = (x509.CertificateBuilder()
.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, agent_name)]))
.issuer_name(ca_cert.subject).public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.utcnow())
.not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=days))
.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True)
.add_extension(x509.SubjectAlternativeName([x509.DNSName(agent_name), x509.DNSName(f"{agent_name}.local")]), critical=False)
.sign(ca_key, hashes.SHA256()))
with open(out/f"{agent_name}.key","wb") as f: f.write(key.private_bytes(serialization.Encoding.PEM, serialization.PrivateFormat.TraditionalOpenSSL, serialization.NoEncryption()))
os.chmod(out/f"{agent_name}.key", 0o600)
with open(out/f"{agent_name}.crt","wb") as f: f.write(cert.public_bytes(serialization.Encoding.PEM))
print(f"Cert issued: {out}/{agent_name}.crt")
def verify_cert(cert_path, ca_path):
from cryptography import x509
from cryptography.hazmat.primitives.asymmetric import padding
with open(ca_path,"rb") as f: ca = x509.load_pem_x509_certificate(f.read())
with open(cert_path,"rb") as f: cert = x509.load_pem_x509_certificate(f.read())
try:
ca.public_key().verify(cert.signature, cert.tbs_certificate_bytes, padding.PKCS1v15(), cert.signature_hash_algorithm)
print(f"OK: {cert.subject} signed by {ca.subject}"); return True
except Exception as e: print(f"FAIL: {e}"); return False
def main():
p = argparse.ArgumentParser(description="Fleet CA for mTLS")
sub = p.add_subparsers(dest="cmd")
pi = sub.add_parser("init"); pi.add_argument("--output-dir", default=os.path.expanduser("~/.hermes/certs"))
pi.add_argument("--ca-name", default="Timmy Fleet CA"); pi.add_argument("--days", type=int, default=3650)
pu = sub.add_parser("issue"); pu.add_argument("--agent", required=True); pu.add_argument("--output-dir", default=os.path.expanduser("~/.hermes/certs"))
pv = sub.add_parser("verify"); pv.add_argument("--cert", required=True); pv.add_argument("--ca", required=True)
args = p.parse_args()
if args.cmd == "init": init_ca(args.output_dir, args.ca_name, args.days)
elif args.cmd == "issue": issue_cert(args.agent, args.output_dir)
elif args.cmd == "verify": sys.exit(0 if verify_cert(args.cert, args.ca) else 1)
else: p.print_help()
if __name__ == "__main__": main()

40
tools/mtls_server.py Normal file
View File

@@ -0,0 +1,40 @@
"""mTLS server for A2A auth (#806)."""
import asyncio, logging, ssl
from pathlib import Path
from typing import Callable, Optional
logger = logging.getLogger(__name__)
def create_ssl_context(ca_cert_path, server_cert_path, server_key_path):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
ctx.load_cert_chain(certfile=server_cert_path, keyfile=server_key_path)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(cafile=ca_cert_path)
return ctx
def get_client_identity(ssl_obj):
try:
cert = ssl_obj.getpeercert()
if cert:
for rdn in cert.get("subject", ()):
for attr in rdn:
if attr[0] == "commonName": return attr[1]
except Exception: pass
return None
async def create_mtls_server(handler, host="127.0.0.1", port=8766, ca_cert="", server_cert="", server_key=""):
ca_cert = str(Path(ca_cert).expanduser())
server_cert = str(Path(server_cert).expanduser())
server_key = str(Path(server_key).expanduser())
ssl_ctx = create_ssl_context(ca_cert, server_cert, server_key)
async def _wrapper(reader, writer):
ssl_obj = writer.transport.get_extra_info("ssl_object")
agent = get_client_identity(ssl_obj) or "unknown"
logger.info("mTLS connection from: %s", agent)
try: await handler(ssl_obj, reader, writer)
except Exception as e: logger.error("Handler error: %s", e)
finally: writer.close()
server = await asyncio.start_server(_wrapper, host, port, ssl=ssl_ctx)
logger.info("mTLS server on %s:%d", host, port)
return server