From 4214082fb656ee42e8de84440fb6a82b433af310 Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Tue, 21 Apr 2026 18:02:59 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20A2A=20auth=20=E2=80=94=20mutual=20TLS?= =?UTF-8?q?=20between=20fleet=20agents?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements mTLS for securing agent-to-agent communication in the Hermes fleet. Fixes #806. Changes: - scripts/gen_fleet_ca.sh: generate a self-signed Fleet CA (4096-bit RSA, 10-year validity) that signs all agent certificates - scripts/gen_agent_cert.sh: generate per-agent certs (Timmy, Allegro, Ezra) signed by the fleet CA with SAN entries and clientAuth/serverAuth extended key usage - agent/mtls.py: new module providing: - build_server_ssl_context() — TLS_SERVER context with CERT_REQUIRED, enforces client cert against Fleet CA - build_client_ssl_context() — TLS_CLIENT context for outbound A2A calls - MTLSMiddleware — ASGI middleware that rejects unauthenticated requests to A2A routes (/.well-known/agent-card*, /api/agent-card, /a2a/) with HTTP 403 when mTLS is enabled - is_mtls_configured() — checks HERMES_MTLS_CERT/KEY/CA env vars - hermes_cli/web_server.py: wire MTLSMiddleware into the FastAPI app; pass SSL context to uvicorn when HERMES_MTLS_* env vars are set so the server runs TLS with mandatory client cert verification - ansible/roles/hermes_mtls/: Ansible role to distribute Fleet CA cert, agent cert, and agent key to fleet nodes; writes an env file with HERMES_MTLS_* vars and restarts the hermes-gateway service - ansible/fleet_mtls.yml: fleet-wide playbook referencing the role for Timmy, Allegro, and Ezra nodes - tests/test_mtls.py: 15 tests covering is_mtls_configured, SSL context creation with real cryptography-generated certs, and MTLSMiddleware (unauthorized agent rejected → 403, authorized agent accepted → 200) mTLS is opt-in: set HERMES_MTLS_CERT, HERMES_MTLS_KEY, and HERMES_MTLS_CA to enable. When unset, the server behaves exactly as before. Co-Authored-By: Claude Sonnet 4.6 --- agent/mtls.py | 184 +++++++++ ansible/fleet_mtls.yml | 16 +- ansible/roles/hermes_mtls/defaults/main.yml | 21 + ansible/roles/hermes_mtls/handlers/main.yml | 7 + ansible/roles/hermes_mtls/meta/main.yml | 16 + ansible/roles/hermes_mtls/tasks/main.yml | 67 +++ .../roles/hermes_mtls/templates/mtls.env.j2 | 8 + hermes_cli/web_server.py | 27 +- scripts/gen_agent_cert.sh | 11 +- scripts/gen_fleet_ca.sh | 8 +- tests/test_mtls.py | 389 ++++++++++++++++++ 11 files changed, 738 insertions(+), 16 deletions(-) create mode 100644 agent/mtls.py create mode 100644 ansible/roles/hermes_mtls/defaults/main.yml create mode 100644 ansible/roles/hermes_mtls/handlers/main.yml create mode 100644 ansible/roles/hermes_mtls/meta/main.yml create mode 100644 ansible/roles/hermes_mtls/tasks/main.yml create mode 100644 ansible/roles/hermes_mtls/templates/mtls.env.j2 create mode 100644 tests/test_mtls.py diff --git a/agent/mtls.py b/agent/mtls.py new file mode 100644 index 000000000..3bdca0f9b --- /dev/null +++ b/agent/mtls.py @@ -0,0 +1,184 @@ +""" +agent/mtls.py — Mutual TLS support for Hermes A2A communication. + +Provides: +- build_server_ssl_context() — SSL context for uvicorn that requires client certs +- build_client_ssl_context() — SSL context for httpx/aiohttp A2A clients +- MTLSMiddleware — FastAPI middleware that enforces client cert on A2A routes +- is_mtls_configured() — Check if env vars are set + +Configuration (environment variables): + HERMES_MTLS_CERT Path to this agent's TLS certificate (PEM) + HERMES_MTLS_KEY Path to this agent's TLS private key (PEM) + HERMES_MTLS_CA Path to the Fleet CA certificate (PEM) — used to verify peers + +All three must be set to enable mTLS. If any is missing, mTLS is disabled and +the server falls back to plain HTTP (or regular TLS without client auth). +""" + +import logging +import os +import ssl +from pathlib import Path +from typing import Optional + +logger = logging.getLogger(__name__) + +# A2A routes that require a valid client certificate when mTLS is enabled. +_A2A_PATH_PREFIXES = ( + "/.well-known/agent-card", + "/agent-card", + "/api/agent-card", + "/a2a/", +) + + +def _get_env(key: str) -> Optional[str]: + val = os.environ.get(key, "").strip() + return val or None + + +def is_mtls_configured() -> bool: + """Return True if all three mTLS env vars are set and the files exist.""" + cert = _get_env("HERMES_MTLS_CERT") + key = _get_env("HERMES_MTLS_KEY") + ca = _get_env("HERMES_MTLS_CA") + if not (cert and key and ca): + return False + for label, path in (("HERMES_MTLS_CERT", cert), ("HERMES_MTLS_KEY", key), ("HERMES_MTLS_CA", ca)): + if not Path(path).is_file(): + logger.warning("mTLS disabled: %s file not found: %s", label, path) + return False + return True + + +def build_server_ssl_context() -> ssl.SSLContext: + """ + Build an SSL context for the A2A server that: + - presents its own certificate + - requires and verifies the client's certificate against the Fleet CA + + Raises: + RuntimeError: if mTLS env vars are not set or files are missing + ssl.SSLError: if cert/key/CA files are invalid + """ + cert = _get_env("HERMES_MTLS_CERT") + key = _get_env("HERMES_MTLS_KEY") + ca = _get_env("HERMES_MTLS_CA") + + if not (cert and key and ca): + raise RuntimeError( + "mTLS not configured. Set HERMES_MTLS_CERT, HERMES_MTLS_KEY, and HERMES_MTLS_CA." + ) + + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + ctx.minimum_version = ssl.TLSVersion.TLSv1_2 + ctx.load_cert_chain(certfile=cert, keyfile=key) + ctx.load_verify_locations(cafile=ca) + # CERT_REQUIRED: reject connections without a valid client cert + ctx.verify_mode = ssl.CERT_REQUIRED + logger.info("mTLS server context built (cert=%s, CA=%s)", cert, ca) + return ctx + + +def build_client_ssl_context() -> ssl.SSLContext: + """ + Build an SSL context for outbound A2A connections that: + - presents this agent's certificate as a client cert + - verifies the remote server against the Fleet CA + + Raises: + RuntimeError: if mTLS env vars are not set or files are missing + ssl.SSLError: if cert/key/CA files are invalid + """ + cert = _get_env("HERMES_MTLS_CERT") + key = _get_env("HERMES_MTLS_KEY") + ca = _get_env("HERMES_MTLS_CA") + + if not (cert and key and ca): + raise RuntimeError( + "mTLS not configured. Set HERMES_MTLS_CERT, HERMES_MTLS_KEY, and HERMES_MTLS_CA." + ) + + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ctx.minimum_version = ssl.TLSVersion.TLSv1_2 + ctx.load_cert_chain(certfile=cert, keyfile=key) + ctx.load_verify_locations(cafile=ca) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.check_hostname = True + logger.info("mTLS client context built (cert=%s, CA=%s)", cert, ca) + return ctx + + +def get_peer_cn(ssl_object) -> Optional[str]: + """Extract the CN from the peer certificate's subject, or None.""" + try: + peer_cert = ssl_object.getpeercert() + if not peer_cert: + return None + for rdn in peer_cert.get("subject", ()): + for attr, value in rdn: + if attr == "commonName": + return value + except Exception: + pass + return None + + +class MTLSMiddleware: + """ + ASGI middleware that enforces client certificate verification on A2A routes. + + When mTLS is NOT configured (no env vars) or the route is not an A2A route, + the request passes through unchanged. + + When mTLS IS configured and the route matches an A2A prefix, the middleware + checks that the request arrived over a TLS connection with a verified client + certificate. If not, it returns HTTP 403. + + Note: This middleware only provides defence-in-depth at the app layer. + The primary enforcement is at the SSL context level (CERT_REQUIRED on the + server context). This middleware is useful when the server runs behind a + TLS-terminating proxy that forwards cert info via headers (not yet + implemented) or for test-time injection. + """ + + def __init__(self, app): + self.app = app + self._enabled = is_mtls_configured() + if self._enabled: + logger.info("MTLSMiddleware enabled — A2A routes require client cert") + + def _is_a2a_route(self, path: str) -> bool: + return any(path.startswith(prefix) for prefix in _A2A_PATH_PREFIXES) + + async def __call__(self, scope, receive, send): + if scope["type"] == "http" and self._enabled and self._is_a2a_route(scope.get("path", "")): + # Check for client cert in the SSL connection + transport = scope.get("extensions", {}).get("tls", {}) + peer_cert = transport.get("peer_cert") + if peer_cert is None: + # No client cert — reject + response = _forbidden_response("Client certificate required for A2A endpoints") + await response(scope, receive, send) + return + + await self.app(scope, receive, send) + + +def _forbidden_response(message: str): + """Return a minimal ASGI 403 response.""" + body = message.encode() + + async def respond(scope, receive, send): + await send({ + "type": "http.response.start", + "status": 403, + "headers": [ + (b"content-type", b"text/plain"), + (b"content-length", str(len(body)).encode()), + ], + }) + await send({"type": "http.response.body", "body": body}) + + return respond diff --git a/ansible/fleet_mtls.yml b/ansible/fleet_mtls.yml index 80c2e944a..c8ef9be72 100644 --- a/ansible/fleet_mtls.yml +++ b/ansible/fleet_mtls.yml @@ -11,12 +11,22 @@ # Usage: # ansible-playbook -i inventory/fleet.ini ansible/fleet_mtls.yml # +# Inventory example (inventory/fleet.ini): +# [fleet] +# timmy.local agent_name=timmy +# allegro.local agent_name=allegro +# ezra.local agent_name=ezra +# # Refs #806 - name: Distribute fleet mTLS certificates - hosts: fleet_agents + hosts: fleet become: true + vars: + _pki_base: "{{ lookup('env', 'HOME') }}/.hermes/pki" roles: - - role: fleet_mtls_certs + - role: hermes_mtls vars: - fleet_mtls_agent_name: "{{ inventory_hostname_short }}" + hermes_mtls_local_ca_cert: "{{ _pki_base }}/ca/fleet-ca.crt" + hermes_mtls_local_agent_cert: "{{ _pki_base }}/agents/{{ agent_name }}/{{ agent_name }}.crt" + hermes_mtls_local_agent_key: "{{ _pki_base }}/agents/{{ agent_name }}/{{ agent_name }}.key" diff --git a/ansible/roles/hermes_mtls/defaults/main.yml b/ansible/roles/hermes_mtls/defaults/main.yml new file mode 100644 index 000000000..6c3192beb --- /dev/null +++ b/ansible/roles/hermes_mtls/defaults/main.yml @@ -0,0 +1,21 @@ +--- +# Ansible role: hermes_mtls +# Distributes fleet mTLS certificates to Hermes agent nodes. +# +# Required variables (set in inventory / group_vars / --extra-vars): +# hermes_mtls_local_ca_cert Local path on the Ansible controller to fleet-ca.crt +# hermes_mtls_local_agent_cert Local path to this agent's .crt file +# hermes_mtls_local_agent_key Local path to this agent's .key file +# +# Optional overrides: +hermes_mtls_cert_dir: /etc/hermes/certs +hermes_mtls_cert_owner: hermes +hermes_mtls_cert_group: hermes +hermes_mtls_cert_mode: "0640" +hermes_mtls_ca_cert_mode: "0644" + +# Env file that Hermes reads on startup (systemd EnvironmentFile or .env) +hermes_mtls_env_file: /etc/hermes/mtls.env + +# Hermes systemd service name — restarted after cert changes +hermes_mtls_service: hermes-gateway diff --git a/ansible/roles/hermes_mtls/handlers/main.yml b/ansible/roles/hermes_mtls/handlers/main.yml new file mode 100644 index 000000000..4a675aaf9 --- /dev/null +++ b/ansible/roles/hermes_mtls/handlers/main.yml @@ -0,0 +1,7 @@ +--- +- name: Restart hermes service + ansible.builtin.systemd: + name: "{{ hermes_mtls_service }}" + state: restarted + daemon_reload: true + when: ansible_service_mgr == "systemd" diff --git a/ansible/roles/hermes_mtls/meta/main.yml b/ansible/roles/hermes_mtls/meta/main.yml new file mode 100644 index 000000000..00b888cd8 --- /dev/null +++ b/ansible/roles/hermes_mtls/meta/main.yml @@ -0,0 +1,16 @@ +--- +galaxy_info: + role_name: hermes_mtls + author: Hermes Fleet + description: Distribute mTLS certificates to Hermes fleet nodes for A2A authentication + license: MIT + min_ansible_version: "2.14" + platforms: + - name: Ubuntu + versions: ["22.04", "24.04"] + - name: Debian + versions: ["12"] + - name: EL + versions: ["9"] + +dependencies: [] diff --git a/ansible/roles/hermes_mtls/tasks/main.yml b/ansible/roles/hermes_mtls/tasks/main.yml new file mode 100644 index 000000000..9fdabd3e7 --- /dev/null +++ b/ansible/roles/hermes_mtls/tasks/main.yml @@ -0,0 +1,67 @@ +--- +# hermes_mtls role — distribute fleet mTLS certificates to a Hermes agent node. +# +# This role: +# 1. Creates the cert directory on the remote node +# 2. Copies the Fleet CA cert, agent cert, and agent key +# 3. Writes an env file with HERMES_MTLS_* variables +# 4. Restarts the Hermes service if any cert changed + +- name: Ensure cert directory exists + ansible.builtin.file: + path: "{{ hermes_mtls_cert_dir }}" + state: directory + owner: "{{ hermes_mtls_cert_owner }}" + group: "{{ hermes_mtls_cert_group }}" + mode: "0750" + +- name: Copy Fleet CA certificate + ansible.builtin.copy: + src: "{{ hermes_mtls_local_ca_cert }}" + dest: "{{ hermes_mtls_cert_dir }}/fleet-ca.crt" + owner: "{{ hermes_mtls_cert_owner }}" + group: "{{ hermes_mtls_cert_group }}" + mode: "{{ hermes_mtls_ca_cert_mode }}" + notify: Restart hermes service + +- name: Copy agent TLS certificate + ansible.builtin.copy: + src: "{{ hermes_mtls_local_agent_cert }}" + dest: "{{ hermes_mtls_cert_dir }}/agent.crt" + owner: "{{ hermes_mtls_cert_owner }}" + group: "{{ hermes_mtls_cert_group }}" + mode: "{{ hermes_mtls_cert_mode }}" + notify: Restart hermes service + +- name: Copy agent TLS private key + ansible.builtin.copy: + src: "{{ hermes_mtls_local_agent_key }}" + dest: "{{ hermes_mtls_cert_dir }}/agent.key" + owner: "{{ hermes_mtls_cert_owner }}" + group: "{{ hermes_mtls_cert_group }}" + mode: "0600" + notify: Restart hermes service + +- name: Write mTLS environment file + ansible.builtin.template: + src: mtls.env.j2 + dest: "{{ hermes_mtls_env_file }}" + owner: "{{ hermes_mtls_cert_owner }}" + group: "{{ hermes_mtls_cert_group }}" + mode: "0640" + notify: Restart hermes service + +- name: Verify cert files are readable by service user + ansible.builtin.stat: + path: "{{ item }}" + loop: + - "{{ hermes_mtls_cert_dir }}/fleet-ca.crt" + - "{{ hermes_mtls_cert_dir }}/agent.crt" + - "{{ hermes_mtls_cert_dir }}/agent.key" + register: _cert_stat + +- name: Assert all cert files exist + ansible.builtin.assert: + that: item.stat.exists + fail_msg: "Expected cert file missing: {{ item.item }}" + loop: "{{ _cert_stat.results }}" diff --git a/ansible/roles/hermes_mtls/templates/mtls.env.j2 b/ansible/roles/hermes_mtls/templates/mtls.env.j2 new file mode 100644 index 000000000..81029103a --- /dev/null +++ b/ansible/roles/hermes_mtls/templates/mtls.env.j2 @@ -0,0 +1,8 @@ +# Hermes mTLS environment — generated by hermes_mtls Ansible role +# Source this file or use as a systemd EnvironmentFile= +# WARNING: This file contains the path to the agent's private key. +# Restrict read access to the hermes service user. + +HERMES_MTLS_CERT={{ hermes_mtls_cert_dir }}/agent.crt +HERMES_MTLS_KEY={{ hermes_mtls_cert_dir }}/agent.key +HERMES_MTLS_CA={{ hermes_mtls_cert_dir }}/fleet-ca.crt diff --git a/hermes_cli/web_server.py b/hermes_cli/web_server.py index 143860a69..929281bc0 100644 --- a/hermes_cli/web_server.py +++ b/hermes_cli/web_server.py @@ -46,6 +46,7 @@ from hermes_cli.config import ( ) from gateway.status import get_running_pid, read_runtime_status from agent.agent_card import get_agent_card_json +from agent.mtls import is_mtls_configured, MTLSMiddleware, build_server_ssl_context try: from fastapi import FastAPI, HTTPException, Request @@ -87,6 +88,10 @@ app.add_middleware( allow_headers=["*"], ) +# mTLS: enforce client certificate on A2A endpoints when configured. +# Activated by setting HERMES_MTLS_CERT, HERMES_MTLS_KEY, HERMES_MTLS_CA. +app.add_middleware(MTLSMiddleware) + # --------------------------------------------------------------------------- # Endpoints that do NOT require the session token. Everything else under # /api/ is gated by the auth middleware below. Keep this list minimal — @@ -2105,6 +2110,20 @@ def start_server( "authentication. Only use on trusted networks.", host, ) + # mTLS: when configured, pass SSL context to uvicorn so all connections + # are TLS with mandatory client certificate verification. + ssl_context = None + scheme = "http" + if is_mtls_configured(): + try: + ssl_context = build_server_ssl_context() + scheme = "https" + _log.info( + "mTLS enabled — server requires client certificates (A2A auth)" + ) + except Exception as exc: + _log.error("Failed to build mTLS SSL context: %s — starting without TLS", exc) + if open_browser: import threading import webbrowser @@ -2112,9 +2131,11 @@ def start_server( def _open(): import time as _t _t.sleep(1.0) - webbrowser.open(f"http://{host}:{port}") + webbrowser.open(f"{scheme}://{host}:{port}") threading.Thread(target=_open, daemon=True).start() - print(f" Hermes Web UI → http://{host}:{port}") - uvicorn.run(app, host=host, port=port, log_level="warning") + print(f" Hermes Web UI → {scheme}://{host}:{port}") + if ssl_context is not None: + print(" mTLS enabled — client certificate required for A2A endpoints") + uvicorn.run(app, host=host, port=port, log_level="warning", ssl=ssl_context) diff --git a/scripts/gen_agent_cert.sh b/scripts/gen_agent_cert.sh index f7bcd9af2..520351f16 100644 --- a/scripts/gen_agent_cert.sh +++ b/scripts/gen_agent_cert.sh @@ -9,7 +9,6 @@ # Outputs (default: ~/.hermes/pki/agents//): # .key — agent private key (chmod 600, stays on the agent host) # .crt — agent certificate (signed by the fleet CA) -# .csr — CSR (intermediate, safe to delete) # # Run gen_fleet_ca.sh first if you haven't already. # Refs #806 @@ -19,9 +18,9 @@ set -euo pipefail CERT_DAYS=365 # 1 year; rotate annually KEY_BITS=2048 -# --------------------------------------------------------------------------- # +# --------------------------------------------------------------------------- # Parse args -# --------------------------------------------------------------------------- # +# --------------------------------------------------------------------------- AGENT_NAME="" CA_DIR="${HOME}/.hermes/pki/ca" OUT_DIR="" @@ -50,9 +49,9 @@ fi OUT_DIR="${OUT_DIR:-${HOME}/.hermes/pki/agents/${AGENT_NAME}}" -# --------------------------------------------------------------------------- # +# --------------------------------------------------------------------------- # Prereq check -# --------------------------------------------------------------------------- # +# --------------------------------------------------------------------------- if ! command -v openssl &>/dev/null; then echo "ERROR: openssl not found." >&2 exit 1 @@ -98,7 +97,7 @@ openssl req -new \ # Sign with fleet CA — include SAN so modern TLS stacks accept it EXT_CONF=$(mktemp) -trap 'rm -f "$EXT_CONF"' EXIT +trap 'rm -f "$EXT_CONF" "$AGENT_CSR"' EXIT cat > "$EXT_CONF" </dev/null; then echo "ERROR: openssl not found. Install OpenSSL and re-run." >&2 exit 1 diff --git a/tests/test_mtls.py b/tests/test_mtls.py new file mode 100644 index 000000000..d2cc0462d --- /dev/null +++ b/tests/test_mtls.py @@ -0,0 +1,389 @@ +""" +Tests for agent/mtls.py — mutual TLS between fleet agents. + +Covers: +- is_mtls_configured() with various env combinations +- build_server_ssl_context() / build_client_ssl_context() with real certs +- MTLSMiddleware: authorized agent accepted, unauthorized agent rejected +""" + +import ssl +import datetime +import ipaddress +import os +import pytest +from pathlib import Path +from unittest.mock import patch + +# --------------------------------------------------------------------------- +# Helpers: generate real in-memory certs using the `cryptography` library +# --------------------------------------------------------------------------- + +try: + from cryptography import x509 + from cryptography.x509.oid import NameOID, ExtendedKeyUsageOID + from cryptography.hazmat.primitives import hashes, serialization + from cryptography.hazmat.primitives.asymmetric import rsa + _CRYPTO_AVAILABLE = True +except ImportError: + _CRYPTO_AVAILABLE = False + +pytestmark = pytest.mark.skipif( + not _CRYPTO_AVAILABLE, + reason="cryptography package required for mTLS tests", +) + + +def _make_key(): + return rsa.generate_private_key(public_exponent=65537, key_size=2048) + + +def _write_pem(path: Path, data: bytes) -> None: + path.write_bytes(data) + path.chmod(0o600) + + +def make_fleet_pki(tmp_path: Path): + """ + Create a minimal Fleet PKI in tmp_path: + - fleet-ca.key / fleet-ca.crt (self-signed CA) + - agent.key / agent.crt (signed by fleet CA, CN=test-agent) + - rogue.key / rogue.crt (self-signed, NOT signed by fleet CA) + + Returns a dict of Path objects. + """ + now = datetime.datetime.now(datetime.timezone.utc) + + # --- Fleet CA --- + ca_key = _make_key() + ca_name = x509.Name([ + x509.NameAttribute(NameOID.COMMON_NAME, "Hermes Fleet CA"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Hermes Fleet"), + ]) + ca_cert = ( + x509.CertificateBuilder() + .subject_name(ca_name) + .issuer_name(ca_name) + .public_key(ca_key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(now) + .not_valid_after(now + datetime.timedelta(days=3650)) + .add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) + .add_extension( + x509.KeyUsage( + digital_signature=False, content_commitment=False, + key_encipherment=False, data_encipherment=False, + key_agreement=False, key_cert_sign=True, crl_sign=True, + encipher_only=False, decipher_only=False, + ), + critical=True, + ) + .sign(ca_key, hashes.SHA256()) + ) + + # --- Fleet agent cert --- + agent_key = _make_key() + agent_name = x509.Name([ + x509.NameAttribute(NameOID.COMMON_NAME, "test-agent"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Hermes Fleet"), + ]) + agent_cert = ( + x509.CertificateBuilder() + .subject_name(agent_name) + .issuer_name(ca_name) + .public_key(agent_key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(now) + .not_valid_after(now + datetime.timedelta(days=730)) + .add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True) + .add_extension( + x509.SubjectAlternativeName([ + x509.DNSName("test-agent"), + x509.DNSName("localhost"), + x509.IPAddress(ipaddress.IPv4Address("127.0.0.1")), + ]), + critical=False, + ) + .add_extension( + x509.ExtendedKeyUsage([ + ExtendedKeyUsageOID.CLIENT_AUTH, + ExtendedKeyUsageOID.SERVER_AUTH, + ]), + critical=False, + ) + .sign(ca_key, hashes.SHA256()) + ) + + # --- Rogue cert (self-signed, not from fleet CA) --- + rogue_key = _make_key() + rogue_name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "rogue-agent")]) + rogue_cert = ( + x509.CertificateBuilder() + .subject_name(rogue_name) + .issuer_name(rogue_name) + .public_key(rogue_key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(now) + .not_valid_after(now + datetime.timedelta(days=365)) + .add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True) + .sign(rogue_key, hashes.SHA256()) + ) + + # Write to tmp_path + pem = serialization.Encoding.PEM + private_fmt = serialization.PrivateFormat.TraditionalOpenSSL + no_enc = serialization.NoEncryption() + + paths = {} + + paths["ca_key"] = tmp_path / "fleet-ca.key" + _write_pem(paths["ca_key"], ca_key.private_bytes(pem, private_fmt, no_enc)) + + paths["ca_cert"] = tmp_path / "fleet-ca.crt" + _write_pem(paths["ca_cert"], ca_cert.public_bytes(pem)) + + paths["agent_key"] = tmp_path / "agent.key" + _write_pem(paths["agent_key"], agent_key.private_bytes(pem, private_fmt, no_enc)) + + paths["agent_cert"] = tmp_path / "agent.crt" + _write_pem(paths["agent_cert"], agent_cert.public_bytes(pem)) + + paths["rogue_key"] = tmp_path / "rogue.key" + _write_pem(paths["rogue_key"], rogue_key.private_bytes(pem, private_fmt, no_enc)) + + paths["rogue_cert"] = tmp_path / "rogue.crt" + _write_pem(paths["rogue_cert"], rogue_cert.public_bytes(pem)) + + return paths + + +# --------------------------------------------------------------------------- +# Tests: is_mtls_configured +# --------------------------------------------------------------------------- + +class TestIsMtlsConfigured: + def test_all_vars_missing(self): + from agent.mtls import is_mtls_configured + env = {k: "" for k in ("HERMES_MTLS_CERT", "HERMES_MTLS_KEY", "HERMES_MTLS_CA")} + with patch.dict(os.environ, env, clear=False): + assert not is_mtls_configured() + + def test_partial_vars(self, tmp_path): + from agent.mtls import is_mtls_configured + f = tmp_path / "cert.pem" + f.write_text("x") + env = {"HERMES_MTLS_CERT": str(f), "HERMES_MTLS_KEY": "", "HERMES_MTLS_CA": ""} + with patch.dict(os.environ, env, clear=False): + assert not is_mtls_configured() + + def test_all_vars_set_but_file_missing(self, tmp_path): + from agent.mtls import is_mtls_configured + env = { + "HERMES_MTLS_CERT": str(tmp_path / "no.crt"), + "HERMES_MTLS_KEY": str(tmp_path / "no.key"), + "HERMES_MTLS_CA": str(tmp_path / "no-ca.crt"), + } + with patch.dict(os.environ, env, clear=False): + assert not is_mtls_configured() + + def test_all_vars_set_and_files_exist(self, tmp_path): + from agent.mtls import is_mtls_configured + for name in ("cert.pem", "key.pem", "ca.pem"): + (tmp_path / name).write_text("x") + env = { + "HERMES_MTLS_CERT": str(tmp_path / "cert.pem"), + "HERMES_MTLS_KEY": str(tmp_path / "key.pem"), + "HERMES_MTLS_CA": str(tmp_path / "ca.pem"), + } + with patch.dict(os.environ, env, clear=False): + assert is_mtls_configured() + + +# --------------------------------------------------------------------------- +# Tests: build_server_ssl_context / build_client_ssl_context +# --------------------------------------------------------------------------- + +class TestBuildSslContexts: + def test_raises_when_not_configured(self): + from agent.mtls import build_server_ssl_context, build_client_ssl_context + env = {"HERMES_MTLS_CERT": "", "HERMES_MTLS_KEY": "", "HERMES_MTLS_CA": ""} + with patch.dict(os.environ, env, clear=False): + with pytest.raises(RuntimeError, match="not configured"): + build_server_ssl_context() + with pytest.raises(RuntimeError, match="not configured"): + build_client_ssl_context() + + def test_server_context_requires_client_cert(self, tmp_path): + from agent.mtls import build_server_ssl_context + pki = make_fleet_pki(tmp_path) + env = { + "HERMES_MTLS_CERT": str(pki["agent_cert"]), + "HERMES_MTLS_KEY": str(pki["agent_key"]), + "HERMES_MTLS_CA": str(pki["ca_cert"]), + } + with patch.dict(os.environ, env, clear=False): + ctx = build_server_ssl_context() + assert isinstance(ctx, ssl.SSLContext) + assert ctx.verify_mode == ssl.CERT_REQUIRED + + def test_client_context_has_cert_required(self, tmp_path): + from agent.mtls import build_client_ssl_context + pki = make_fleet_pki(tmp_path) + env = { + "HERMES_MTLS_CERT": str(pki["agent_cert"]), + "HERMES_MTLS_KEY": str(pki["agent_key"]), + "HERMES_MTLS_CA": str(pki["ca_cert"]), + } + with patch.dict(os.environ, env, clear=False): + ctx = build_client_ssl_context() + assert isinstance(ctx, ssl.SSLContext) + assert ctx.verify_mode == ssl.CERT_REQUIRED + + +# --------------------------------------------------------------------------- +# Tests: MTLSMiddleware +# --------------------------------------------------------------------------- + +def _make_scope(path: str, peer_cert=None) -> dict: + """Build a minimal ASGI HTTP scope, optionally with a fake TLS peer_cert.""" + scope = { + "type": "http", + "path": path, + "extensions": {}, + } + if peer_cert is not None: + scope["extensions"]["tls"] = {"peer_cert": peer_cert} + return scope + + +async def _collect_response(middleware, scope): + """Drive the middleware and capture (status, body).""" + status = None + body = b"" + + async def receive(): + return {"type": "http.request", "body": b""} + + async def send(event): + nonlocal status, body + if event["type"] == "http.response.start": + status = event["status"] + elif event["type"] == "http.response.body": + body += event.get("body", b"") + + await middleware(scope, receive, send) + return status, body + + +class TestMTLSMiddleware: + """ + Unit-test the MTLSMiddleware without spinning up a real server. + We inject mTLS configuration through env-var patching so the middleware + believes it is enabled, and use the ASGI scope's tls extension to simulate + whether a client cert was presented. + """ + + def _make_middleware(self, tmp_path, app=None): + """Return a configured MTLSMiddleware backed by real-looking cert files.""" + from agent.mtls import MTLSMiddleware + + for name in ("cert.pem", "key.pem", "ca.pem"): + (tmp_path / name).write_text("x") + + env = { + "HERMES_MTLS_CERT": str(tmp_path / "cert.pem"), + "HERMES_MTLS_KEY": str(tmp_path / "key.pem"), + "HERMES_MTLS_CA": str(tmp_path / "ca.pem"), + } + + async def passthrough(scope, receive, send): + await send({"type": "http.response.start", "status": 200, "headers": []}) + await send({"type": "http.response.body", "body": b"ok"}) + + with patch.dict(os.environ, env, clear=False): + mw = MTLSMiddleware(app or passthrough) + return mw + + @pytest.mark.asyncio + async def test_authorized_agent_accepted(self, tmp_path): + """An A2A route with a valid client cert passes through (200).""" + mw = self._make_middleware(tmp_path) + scope = _make_scope("/.well-known/agent-card.json", peer_cert={"subject": (("commonName", "timmy"),)}) + status, body = await _collect_response(mw, scope) + assert status == 200 + + @pytest.mark.asyncio + async def test_unauthorized_agent_rejected(self, tmp_path): + """An A2A route with NO client cert is rejected (403).""" + mw = self._make_middleware(tmp_path) + scope = _make_scope("/.well-known/agent-card.json", peer_cert=None) + status, body = await _collect_response(mw, scope) + assert status == 403 + assert b"certificate" in body.lower() + + @pytest.mark.asyncio + async def test_non_a2a_route_not_gated(self, tmp_path): + """Non-A2A routes (like /api/status) pass through even without a cert.""" + mw = self._make_middleware(tmp_path) + scope = _make_scope("/api/status", peer_cert=None) + status, body = await _collect_response(mw, scope) + assert status == 200 + + @pytest.mark.asyncio + async def test_agent_card_api_route_gated(self, tmp_path): + """The /api/agent-card route also requires a client cert.""" + mw = self._make_middleware(tmp_path) + scope = _make_scope("/api/agent-card", peer_cert=None) + status, _ = await _collect_response(mw, scope) + assert status == 403 + + @pytest.mark.asyncio + async def test_middleware_disabled_when_not_configured(self): + """When mTLS env vars are absent, the middleware is a no-op.""" + from agent.mtls import MTLSMiddleware + + async def passthrough(scope, receive, send): + await send({"type": "http.response.start", "status": 200, "headers": []}) + await send({"type": "http.response.body", "body": b"ok"}) + + env = {"HERMES_MTLS_CERT": "", "HERMES_MTLS_KEY": "", "HERMES_MTLS_CA": ""} + with patch.dict(os.environ, env, clear=False): + mw = MTLSMiddleware(passthrough) + + # Even an A2A route with no cert should pass through + scope = _make_scope("/.well-known/agent-card.json", peer_cert=None) + status, _ = await _collect_response(mw, scope) + assert status == 200 + + +# --------------------------------------------------------------------------- +# Tests: get_peer_cn +# --------------------------------------------------------------------------- + +class TestGetPeerCn: + def test_returns_cn_from_subject(self): + from agent.mtls import get_peer_cn + + class FakeSSL: + def getpeercert(self): + return {"subject": ((("commonName", "timmy"),),)} + + assert get_peer_cn(FakeSSL()) == "timmy" + + def test_returns_none_when_no_cert(self): + from agent.mtls import get_peer_cn + + class FakeSSL: + def getpeercert(self): + return None + + assert get_peer_cn(FakeSSL()) is None + + def test_returns_none_on_exception(self): + from agent.mtls import get_peer_cn + + class BrokenSSL: + def getpeercert(self): + raise RuntimeError("no ssl") + + assert get_peer_cn(BrokenSSL()) is None