Compare commits

..

13 Commits

Author SHA1 Message Date
Alexander Whitestone
f88e57bcfe feat: route image files through vision analysis (Gemma 4 multimodal)
All checks were successful
Lint / lint (pull_request) Successful in 8s
- tools/binary_extensions.py: add IMAGE_EXTENSIONS + has_image_extension()
- tools/file_tools.py: detect image files in read_file_tool and auto-route
to vision_analyze_tool instead of returning a binary-file error. Wraps
the vision result so callers know it came from image analysis.
- tools/browser_tool.py: update browser_vision docstring to document that
natively multimodal models (e.g. Gemma 4) are used directly when available.
- tests/tools/test_binary_extensions.py: new tests for image extension helpers
- tests/tools/test_file_tools.py: add TestReadFileImageRouting for PNG/JPEG/
WebP auto-routing and TestAnalyzeImageWithVision for fallback coverage.

Closes #800
2026-04-22 02:54:18 -04:00
16eab5d503 Merge pull request '[claude] A2A auth — mutual TLS between fleet agents (#806)' (#948) from claude/issue-806 into main
All checks were successful
Lint / lint (push) Successful in 13s
Merge PR #948: A2A auth — mutual TLS between fleet agents (#806)
2026-04-22 03:19:42 +00:00
c7a2d439c1 Merge pull request 'feat: The Sovereign Scavenger — Automated Tech Debt Recovery' (#974) from feat/sovereign-scavenger-1776827259631 into main
All checks were successful
Lint / lint (push) Successful in 12s
2026-04-22 03:14:14 +00:00
8ad8520bd2 Merge pull request 'feat: Execution Safety Sentry — GOFAI Risk Analysis' (#973) from feat/static-analyzer-gofai-1776826921747 into main
Some checks failed
Lint / lint (push) Has been cancelled
2026-04-22 03:14:07 +00:00
9c7c88823f Merge pull request 'feat: Local Inference Story — Freeing the fleet from cloud dependency' (#972) from feat/local-inference-bridge-1776826896029 into main
Some checks failed
Lint / lint (push) Has been cancelled
2026-04-22 03:14:03 +00:00
aa45e02238 Merge pull request 'feat: GOFAI Semantic Sentry — Deterministic code verification' (#971) from feat/symbolic-verify-gofai-1776826842170 into main
Some checks failed
Lint / lint (push) Has been cancelled
2026-04-22 03:14:01 +00:00
3266c39e8e feat: Sovereign Scavenger — Turning tech debt into actionable backlog
All checks were successful
Lint / lint (pull_request) Successful in 18s
2026-04-22 03:07:40 +00:00
93a855d4e3 feat: Static Risk Analyzer (GOFAI) for execution safety
All checks were successful
Lint / lint (pull_request) Successful in 8s
2026-04-22 03:02:02 +00:00
5a0bdb556e feat: Local Inference Bridge — Bypassing cloud for local tasks
All checks were successful
Lint / lint (pull_request) Successful in 17s
2026-04-22 03:01:37 +00:00
d619d279f8 feat: Symbolic Sentry (GOFAI) for deterministic code audits
All checks were successful
Lint / lint (pull_request) Successful in 15s
2026-04-22 03:00:44 +00:00
Alexander Whitestone
4214082fb6 feat: A2A auth — mutual TLS between fleet agents
All checks were successful
Lint / lint (pull_request) Successful in 8s
Implements mTLS for securing agent-to-agent communication in the Hermes
fleet. Fixes #806.

Changes:
- scripts/gen_fleet_ca.sh: generate a self-signed Fleet CA (4096-bit RSA,
  10-year validity) that signs all agent certificates
- scripts/gen_agent_cert.sh: generate per-agent certs (Timmy, Allegro,
  Ezra) signed by the fleet CA with SAN entries and clientAuth/serverAuth
  extended key usage
- agent/mtls.py: new module providing:
  - build_server_ssl_context() — TLS_SERVER context with CERT_REQUIRED,
    enforces client cert against Fleet CA
  - build_client_ssl_context() — TLS_CLIENT context for outbound A2A calls
  - MTLSMiddleware — ASGI middleware that rejects unauthenticated requests
    to A2A routes (/.well-known/agent-card*, /api/agent-card, /a2a/) with
    HTTP 403 when mTLS is enabled
  - is_mtls_configured() — checks HERMES_MTLS_CERT/KEY/CA env vars
- hermes_cli/web_server.py: wire MTLSMiddleware into the FastAPI app;
  pass SSL context to uvicorn when HERMES_MTLS_* env vars are set so
  the server runs TLS with mandatory client cert verification
- ansible/roles/hermes_mtls/: Ansible role to distribute Fleet CA cert,
  agent cert, and agent key to fleet nodes; writes an env file with
  HERMES_MTLS_* vars and restarts the hermes-gateway service
- ansible/fleet_mtls.yml: fleet-wide playbook referencing the role for
  Timmy, Allegro, and Ezra nodes
- tests/test_mtls.py: 15 tests covering is_mtls_configured, SSL context
  creation with real cryptography-generated certs, and MTLSMiddleware
  (unauthorized agent rejected → 403, authorized agent accepted → 200)

mTLS is opt-in: set HERMES_MTLS_CERT, HERMES_MTLS_KEY, and HERMES_MTLS_CA
to enable. When unset, the server behaves exactly as before.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 18:04:00 -04:00
Alexander Whitestone
ac28444bf2 feat: add A2AMTLSServer routing API, A2AMTLSClient, and expand tests to 20 (#806)
All checks were successful
Lint / lint (pull_request) Successful in 9s
Builds on the existing A2AServer / build_*_ssl_context foundation:

- agent/a2a_mtls.py:
  - Add A2AMTLSServer: routing-based HTTPS server with add_route() and
    context-manager (__enter__/__exit__) lifecycle support
  - Add A2AMTLSClient: fleet-cert-presenting HTTP client with .get() / .post()
  - Widen imports (json, Callable, Dict, urlopen)

- tests/agent/test_a2a_mtls.py:
  - Fix datetime.utcnow() deprecation — use datetime.now(timezone.utc)
  - Add TestA2AMTLSServerAndClient (9 tests): routing GET/POST, 404,
    context-manager stop, rogue-cert rejection, A2AMTLSClient, concurrency
  - Total: 11 → 20 passing tests

Refs #806
2026-04-21 15:21:10 -04:00
Alexander Whitestone
91faf6f956 feat: A2A auth — mutual TLS between fleet agents
All checks were successful
Lint / lint (pull_request) Successful in 10s
Implements mutual TLS for secure agent-to-agent communication (#806).

- scripts/gen_fleet_ca.sh: generate fleet CA (4096-bit RSA, 10-year)
- scripts/gen_agent_cert.sh: per-agent cert signed by fleet CA (timmy, allegro, ezra)
- agent/a2a_mtls.py: A2AServer requiring client cert verification (CERT_REQUIRED),
  build_server_ssl_context / build_client_ssl_context helpers, server_from_env()
- ansible/roles/fleet_mtls_certs/: distribute CA + per-agent certs to fleet nodes,
  write /etc/hermes/a2a.env, notify hermes-a2a service on change
- ansible/fleet_mtls.yml + ansible/inventory/fleet.ini.example: playbook + example inventory
- tests/agent/test_a2a_mtls.py: 11 tests — authorized agent accepted (200/202),
  self-signed cert rejected, no-cert rejected, lifecycle, env-var wiring

Fixes #806

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 13:28:28 -04:00
34 changed files with 2802 additions and 289 deletions

443
agent/a2a_mtls.py Normal file
View File

@@ -0,0 +1,443 @@
"""
A2A mutual-TLS server — secure agent-to-agent communication.
Each fleet agent runs an A2A server that:
- Presents its own TLS certificate (signed by the fleet CA).
- Requires the connecting peer to present a valid client certificate
also signed by the fleet CA.
- Rejects connections from unknown / self-signed peers.
Usage (standalone):
python -m agent.a2a_mtls \\
--cert ~/.hermes/pki/agents/timmy/timmy.crt \\
--key ~/.hermes/pki/agents/timmy/timmy.key \\
--ca ~/.hermes/pki/ca/fleet-ca.crt \\
--host 0.0.0.0 --port 9443
Environment variables (alternative to CLI flags):
HERMES_A2A_CERT path to agent certificate
HERMES_A2A_KEY path to agent private key
HERMES_A2A_CA path to fleet CA certificate
Refs #806
"""
from __future__ import annotations
import json
import logging
import os
import ssl
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from typing import Any, Callable, Dict, Optional
from urllib.error import URLError
from urllib.request import Request, urlopen
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# mTLS SSL context helpers
# ---------------------------------------------------------------------------
def build_server_ssl_context(
cert: str | Path,
key: str | Path,
ca: str | Path,
) -> ssl.SSLContext:
"""Return an SSLContext that presents *cert/key* and requires a valid
client certificate signed by *ca*.
Raises ``FileNotFoundError`` if any path is missing.
Raises ``ssl.SSLError`` if the files are malformed.
"""
cert, key, ca = Path(cert), Path(key), Path(ca)
for p in (cert, key, ca):
if not p.exists():
raise FileNotFoundError(f"mTLS: file not found: {p}")
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
ctx.load_cert_chain(certfile=str(cert), keyfile=str(key))
ctx.load_verify_locations(cafile=str(ca))
# CERT_REQUIRED — reject peers that don't present a cert signed by *ca*.
ctx.verify_mode = ssl.CERT_REQUIRED
return ctx
def build_client_ssl_context(
cert: str | Path,
key: str | Path,
ca: str | Path,
) -> ssl.SSLContext:
"""Return an SSLContext for an outgoing mTLS connection.
Presents *cert/key* as the client identity and verifies the server
certificate against *ca*.
"""
cert, key, ca = Path(cert), Path(key), Path(ca)
for p in (cert, key, ca):
if not p.exists():
raise FileNotFoundError(f"mTLS client: file not found: {p}")
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
ctx.load_cert_chain(certfile=str(cert), keyfile=str(key))
ctx.load_verify_locations(cafile=str(ca))
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.check_hostname = True
return ctx
# ---------------------------------------------------------------------------
# Minimal A2A HTTP request handler
# ---------------------------------------------------------------------------
class A2AHandler(BaseHTTPRequestHandler):
"""Handles A2A requests over a mutually-authenticated TLS connection.
GET /.well-known/agent-card.json — returns the local agent card.
POST /a2a/task — dispatches an A2A task (stub).
"""
log_message = logger.debug # route access log to Python logger
def do_GET(self) -> None: # noqa: N802
if self.path in ("/.well-known/agent-card.json", "/agent-card.json"):
self._serve_agent_card()
else:
self._send_json(404, {"error": "not found"})
def do_POST(self) -> None: # noqa: N802
if self.path == "/a2a/task":
self._handle_task()
else:
self._send_json(404, {"error": "not found"})
# ------------------------------------------------------------------
def _serve_agent_card(self) -> None:
try:
from agent.agent_card import get_agent_card_json
body = get_agent_card_json().encode()
except Exception as exc:
logger.warning("agent-card unavailable: %s", exc)
body = b'{"error": "agent card unavailable"}'
self._send_raw(200, "application/json", body)
def _handle_task(self) -> None:
length = int(self.headers.get("Content-Length", 0))
_body = self.rfile.read(length) if length else b""
# Stub: echo back a 202 Accepted with the peer CN so callers can
# confirm which agent processed the request.
peer_cn = _peer_cn(self.connection)
self._send_json(202, {"status": "accepted", "handled_by": peer_cn})
# ------------------------------------------------------------------
def _send_json(self, code: int, data: dict) -> None:
import json
body = json.dumps(data).encode()
self._send_raw(code, "application/json", body)
def _send_raw(self, code: int, content_type: str, body: bytes) -> None:
self.send_response(code)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, fmt: str, *args: object) -> None: # type: ignore[override]
logger.debug("a2a: " + fmt, *args)
def _peer_cn(conn: ssl.SSLSocket) -> Optional[str]:
"""Extract the Common Name from the peer certificate, or None."""
try:
peer = conn.getpeercert()
if not peer:
return None
for rdn in peer.get("subject", ()):
for key, val in rdn:
if key == "commonName":
return val
except Exception:
pass
return None
# ---------------------------------------------------------------------------
# Server lifecycle
# ---------------------------------------------------------------------------
class A2AServer:
"""Mutual-TLS A2A server.
Example::
server = A2AServer(
cert="~/.hermes/pki/agents/timmy/timmy.crt",
key="~/.hermes/pki/agents/timmy/timmy.key",
ca="~/.hermes/pki/ca/fleet-ca.crt",
)
server.start() # non-blocking (daemon thread)
...
server.stop()
"""
def __init__(
self,
cert: str | Path,
key: str | Path,
ca: str | Path,
host: str = "0.0.0.0",
port: int = 9443,
) -> None:
self.cert = Path(cert).expanduser()
self.key = Path(key).expanduser()
self.ca = Path(ca).expanduser()
self.host = host
self.port = port
self._httpd: Optional[HTTPServer] = None
self._thread: Optional[threading.Thread] = None
def start(self, daemon: bool = True) -> None:
"""Start the server in a background thread (default: daemon)."""
ssl_ctx = build_server_ssl_context(self.cert, self.key, self.ca)
self._httpd = HTTPServer((self.host, self.port), A2AHandler)
self._httpd.socket = ssl_ctx.wrap_socket(
self._httpd.socket, server_side=True
)
self._thread = threading.Thread(
target=self._httpd.serve_forever, daemon=daemon
)
self._thread.start()
logger.info(
"A2A mTLS server listening on %s:%s (cert=%s)",
self.host, self.port, self.cert.name,
)
def stop(self) -> None:
if self._httpd:
self._httpd.shutdown()
self._httpd = None
if self._thread:
self._thread.join(timeout=5)
self._thread = None
def server_from_env() -> A2AServer:
"""Build an A2AServer from environment variables / defaults."""
hermes_home = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
agent_name = os.environ.get("HERMES_AGENT_NAME", "hermes").lower()
default_cert = hermes_home / "pki" / "agents" / agent_name / f"{agent_name}.crt"
default_key = hermes_home / "pki" / "agents" / agent_name / f"{agent_name}.key"
default_ca = hermes_home / "pki" / "ca" / "fleet-ca.crt"
cert = os.environ.get("HERMES_A2A_CERT", str(default_cert))
key = os.environ.get("HERMES_A2A_KEY", str(default_key))
ca = os.environ.get("HERMES_A2A_CA", str(default_ca))
host = os.environ.get("HERMES_A2A_HOST", "0.0.0.0")
port = int(os.environ.get("HERMES_A2A_PORT", "9443"))
return A2AServer(cert=cert, key=key, ca=ca, host=host, port=port)
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def _main() -> None:
import argparse
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
parser = argparse.ArgumentParser(
description="Hermes A2A mutual-TLS server"
)
parser.add_argument("--cert", required=True, help="Path to agent certificate")
parser.add_argument("--key", required=True, help="Path to agent private key")
parser.add_argument("--ca", required=True, help="Path to fleet CA certificate")
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--port", type=int, default=9443)
args = parser.parse_args()
server = A2AServer(
cert=args.cert, key=args.key, ca=args.ca,
host=args.host, port=args.port,
)
server.start(daemon=False)
if __name__ == "__main__":
_main()
# ---------------------------------------------------------------------------
# A2AMTLSServer — routing-based server with context-manager support
# ---------------------------------------------------------------------------
class _RoutingHandler(BaseHTTPRequestHandler):
"""HTTP request handler that dispatches to per-path callables."""
routes: Dict[str, Callable] = {}
def log_message(self, fmt: str, *args: Any) -> None:
logger.debug("A2AMTLSServer: " + fmt, *args)
def _peer_cn(self) -> Optional[str]:
cert = self.connection.getpeercert() # type: ignore[attr-defined]
if not cert:
return None
for rdn in cert.get("subject", ()):
for attr, value in rdn:
if attr == "commonName":
return value
return None
def do_POST(self) -> None:
handler = self.routes.get(self.path)
if handler is None:
self.send_response(404)
self.end_headers()
return
length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(length) if length else b""
try:
payload = json.loads(body) if body else {}
except json.JSONDecodeError:
self.send_response(400)
self.end_headers()
return
result = handler(payload, peer_cn=self._peer_cn())
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode())
def do_GET(self) -> None:
handler = self.routes.get(self.path)
if handler is None:
self.send_response(404)
self.end_headers()
return
result = handler({}, peer_cn=self._peer_cn())
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode())
class A2AMTLSServer:
"""Routing-based mTLS HTTPS server with context-manager support.
Unlike ``A2AServer`` (which serves fixed A2A paths), this server lets
callers register arbitrary path handlers — useful for tests and custom
A2A endpoint implementations.
handler signature: ``handler(payload: dict, *, peer_cn: str | None) -> dict``
Example::
server = A2AMTLSServer(cert="timmy.crt", key="timmy.key", ca="fleet-ca.crt")
server.add_route("/tasks/send", my_handler)
with server:
... # server runs for the duration of the block
"""
def __init__(
self,
cert: str | Path,
key: str | Path,
ca: str | Path,
host: str = "127.0.0.1",
port: int = 9443,
) -> None:
self.cert = Path(cert).expanduser()
self.key = Path(key).expanduser()
self.ca = Path(ca).expanduser()
self.host = host
self.port = port
self._routes: Dict[str, Callable] = {}
self._httpd: Optional[HTTPServer] = None
self._thread: Optional[threading.Thread] = None
def add_route(self, path: str, handler: Callable) -> None:
self._routes[path] = handler
def start(self) -> None:
ssl_ctx = build_server_ssl_context(self.cert, self.key, self.ca)
class _Handler(_RoutingHandler):
routes = self._routes
self._httpd = HTTPServer((self.host, self.port), _Handler)
self._httpd.socket = ssl_ctx.wrap_socket(self._httpd.socket, server_side=True)
self._thread = threading.Thread(
target=self._httpd.serve_forever,
daemon=True,
name=f"a2a-mtls-{self.port}",
)
self._thread.start()
logger.info("A2AMTLSServer on %s:%d (mTLS)", self.host, self.port)
def stop(self) -> None:
if self._httpd:
self._httpd.shutdown()
self._httpd = None
if self._thread:
self._thread.join(timeout=5)
self._thread = None
def __enter__(self) -> "A2AMTLSServer":
self.start()
return self
def __exit__(self, *_: Any) -> None:
self.stop()
# ---------------------------------------------------------------------------
# A2AMTLSClient — mTLS HTTP client
# ---------------------------------------------------------------------------
class A2AMTLSClient:
"""HTTP client that presents a fleet cert on every outgoing connection.
Example::
client = A2AMTLSClient(cert="allegro.crt", key="allegro.key", ca="fleet-ca.crt")
result = client.post("https://timmy:9443/tasks/send", json={"task": "..."})
"""
def __init__(
self,
cert: str | Path,
key: str | Path,
ca: str | Path,
) -> None:
self._ssl_ctx = build_client_ssl_context(cert, key, ca)
self._ssl_ctx.check_hostname = False # callers connecting by IP
def _request(
self,
method: str,
url: str,
data: Optional[bytes] = None,
timeout: float = 10.0,
) -> Dict[str, Any]:
headers = {"Content-Type": "application/json"}
req = Request(url, data=data, headers=headers, method=method)
try:
with urlopen(req, context=self._ssl_ctx, timeout=timeout) as resp:
body = resp.read()
return json.loads(body) if body else {}
except URLError as exc:
raise ConnectionError(f"A2AMTLSClient {method} {url} failed: {exc.reason}") from exc
def get(self, url: str, **kwargs: Any) -> Dict[str, Any]:
return self._request("GET", url, **kwargs)
def post(self, url: str, json: Optional[Dict[str, Any]] = None, **kwargs: Any) -> Dict[str, Any]:
data = (__import__("json").dumps(json).encode() if json is not None else None)
return self._request("POST", url, data=data, **kwargs)

View File

@@ -2302,7 +2302,7 @@ def call_llm(
resolved_provider, resolved_model, resolved_base_url, resolved_api_key, resolved_api_mode = _resolve_task_provider_model(
task, provider, model, base_url, api_key)
if task in ("vision", "browser_vision"):
if task == "vision":
effective_provider, client, final_model = resolve_vision_provider_client(
provider=provider,
model=model,

184
agent/mtls.py Normal file
View File

@@ -0,0 +1,184 @@
"""
agent/mtls.py — Mutual TLS support for Hermes A2A communication.
Provides:
- build_server_ssl_context() — SSL context for uvicorn that requires client certs
- build_client_ssl_context() — SSL context for httpx/aiohttp A2A clients
- MTLSMiddleware — FastAPI middleware that enforces client cert on A2A routes
- is_mtls_configured() — Check if env vars are set
Configuration (environment variables):
HERMES_MTLS_CERT Path to this agent's TLS certificate (PEM)
HERMES_MTLS_KEY Path to this agent's TLS private key (PEM)
HERMES_MTLS_CA Path to the Fleet CA certificate (PEM) — used to verify peers
All three must be set to enable mTLS. If any is missing, mTLS is disabled and
the server falls back to plain HTTP (or regular TLS without client auth).
"""
import logging
import os
import ssl
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
# A2A routes that require a valid client certificate when mTLS is enabled.
_A2A_PATH_PREFIXES = (
"/.well-known/agent-card",
"/agent-card",
"/api/agent-card",
"/a2a/",
)
def _get_env(key: str) -> Optional[str]:
val = os.environ.get(key, "").strip()
return val or None
def is_mtls_configured() -> bool:
"""Return True if all three mTLS env vars are set and the files exist."""
cert = _get_env("HERMES_MTLS_CERT")
key = _get_env("HERMES_MTLS_KEY")
ca = _get_env("HERMES_MTLS_CA")
if not (cert and key and ca):
return False
for label, path in (("HERMES_MTLS_CERT", cert), ("HERMES_MTLS_KEY", key), ("HERMES_MTLS_CA", ca)):
if not Path(path).is_file():
logger.warning("mTLS disabled: %s file not found: %s", label, path)
return False
return True
def build_server_ssl_context() -> ssl.SSLContext:
"""
Build an SSL context for the A2A server that:
- presents its own certificate
- requires and verifies the client's certificate against the Fleet CA
Raises:
RuntimeError: if mTLS env vars are not set or files are missing
ssl.SSLError: if cert/key/CA files are invalid
"""
cert = _get_env("HERMES_MTLS_CERT")
key = _get_env("HERMES_MTLS_KEY")
ca = _get_env("HERMES_MTLS_CA")
if not (cert and key and ca):
raise RuntimeError(
"mTLS not configured. Set HERMES_MTLS_CERT, HERMES_MTLS_KEY, and HERMES_MTLS_CA."
)
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
ctx.load_cert_chain(certfile=cert, keyfile=key)
ctx.load_verify_locations(cafile=ca)
# CERT_REQUIRED: reject connections without a valid client cert
ctx.verify_mode = ssl.CERT_REQUIRED
logger.info("mTLS server context built (cert=%s, CA=%s)", cert, ca)
return ctx
def build_client_ssl_context() -> ssl.SSLContext:
"""
Build an SSL context for outbound A2A connections that:
- presents this agent's certificate as a client cert
- verifies the remote server against the Fleet CA
Raises:
RuntimeError: if mTLS env vars are not set or files are missing
ssl.SSLError: if cert/key/CA files are invalid
"""
cert = _get_env("HERMES_MTLS_CERT")
key = _get_env("HERMES_MTLS_KEY")
ca = _get_env("HERMES_MTLS_CA")
if not (cert and key and ca):
raise RuntimeError(
"mTLS not configured. Set HERMES_MTLS_CERT, HERMES_MTLS_KEY, and HERMES_MTLS_CA."
)
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
ctx.load_cert_chain(certfile=cert, keyfile=key)
ctx.load_verify_locations(cafile=ca)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.check_hostname = True
logger.info("mTLS client context built (cert=%s, CA=%s)", cert, ca)
return ctx
def get_peer_cn(ssl_object) -> Optional[str]:
"""Extract the CN from the peer certificate's subject, or None."""
try:
peer_cert = ssl_object.getpeercert()
if not peer_cert:
return None
for rdn in peer_cert.get("subject", ()):
for attr, value in rdn:
if attr == "commonName":
return value
except Exception:
pass
return None
class MTLSMiddleware:
"""
ASGI middleware that enforces client certificate verification on A2A routes.
When mTLS is NOT configured (no env vars) or the route is not an A2A route,
the request passes through unchanged.
When mTLS IS configured and the route matches an A2A prefix, the middleware
checks that the request arrived over a TLS connection with a verified client
certificate. If not, it returns HTTP 403.
Note: This middleware only provides defence-in-depth at the app layer.
The primary enforcement is at the SSL context level (CERT_REQUIRED on the
server context). This middleware is useful when the server runs behind a
TLS-terminating proxy that forwards cert info via headers (not yet
implemented) or for test-time injection.
"""
def __init__(self, app):
self.app = app
self._enabled = is_mtls_configured()
if self._enabled:
logger.info("MTLSMiddleware enabled — A2A routes require client cert")
def _is_a2a_route(self, path: str) -> bool:
return any(path.startswith(prefix) for prefix in _A2A_PATH_PREFIXES)
async def __call__(self, scope, receive, send):
if scope["type"] == "http" and self._enabled and self._is_a2a_route(scope.get("path", "")):
# Check for client cert in the SSL connection
transport = scope.get("extensions", {}).get("tls", {})
peer_cert = transport.get("peer_cert")
if peer_cert is None:
# No client cert — reject
response = _forbidden_response("Client certificate required for A2A endpoints")
await response(scope, receive, send)
return
await self.app(scope, receive, send)
def _forbidden_response(message: str):
"""Return a minimal ASGI 403 response."""
body = message.encode()
async def respond(scope, receive, send):
await send({
"type": "http.response.start",
"status": 403,
"headers": [
(b"content-type", b"text/plain"),
(b"content-length", str(len(body)).encode()),
],
})
await send({"type": "http.response.body", "body": body})
return respond

32
ansible/fleet_mtls.yml Normal file
View File

@@ -0,0 +1,32 @@
---
# fleet_mtls.yml — Deploy mutual-TLS certificates to all fleet agents.
#
# Prerequisites:
# 1. Run scripts/gen_fleet_ca.sh to create the fleet CA.
# 2. For each agent, run:
# scripts/gen_agent_cert.sh --agent timmy
# scripts/gen_agent_cert.sh --agent allegro
# scripts/gen_agent_cert.sh --agent ezra
#
# Usage:
# ansible-playbook -i inventory/fleet.ini ansible/fleet_mtls.yml
#
# Inventory example (inventory/fleet.ini):
# [fleet]
# timmy.local agent_name=timmy
# allegro.local agent_name=allegro
# ezra.local agent_name=ezra
#
# Refs #806
- name: Distribute fleet mTLS certificates
hosts: fleet
become: true
vars:
_pki_base: "{{ lookup('env', 'HOME') }}/.hermes/pki"
roles:
- role: hermes_mtls
vars:
hermes_mtls_local_ca_cert: "{{ _pki_base }}/ca/fleet-ca.crt"
hermes_mtls_local_agent_cert: "{{ _pki_base }}/agents/{{ agent_name }}/{{ agent_name }}.crt"
hermes_mtls_local_agent_key: "{{ _pki_base }}/agents/{{ agent_name }}/{{ agent_name }}.key"

View File

@@ -0,0 +1,12 @@
# Example fleet inventory for mutual-TLS cert distribution.
# Copy to fleet.ini and adjust hostnames/IPs.
# Refs #806
[fleet_agents]
timmy ansible_host=192.168.1.10
allegro ansible_host=192.168.1.11
ezra ansible_host=192.168.1.12
[fleet_agents:vars]
ansible_user=hermes
ansible_python_interpreter=/usr/bin/python3

View File

@@ -0,0 +1,21 @@
---
# Default paths on the *control node* where certs are read from.
# Override these in your inventory / group_vars as needed.
# Fleet CA certificate (public; safe to push to all nodes)
fleet_mtls_ca_cert_src: "{{ lookup('env', 'HOME') }}/.hermes/pki/ca/fleet-ca.crt"
# Per-agent cert/key source dir on the control node.
# Expected layout: <fleet_mtls_agent_certs_dir>/<agent_name>/<agent_name>.{crt,key}
fleet_mtls_agent_certs_dir: "{{ lookup('env', 'HOME') }}/.hermes/pki/agents"
# Remote destination paths on the fleet node
fleet_mtls_remote_pki_dir: "/etc/hermes/pki"
fleet_mtls_remote_ca_dir: "{{ fleet_mtls_remote_pki_dir }}/ca"
fleet_mtls_remote_agent_dir: "{{ fleet_mtls_remote_pki_dir }}/agent"
# The agent name to deploy (set per-host in inventory, e.g. timmy / allegro / ezra)
fleet_mtls_agent_name: "{{ inventory_hostname_short }}"
# Hermes service name (for reload notification)
fleet_mtls_hermes_service: "hermes-a2a"

View File

@@ -0,0 +1,7 @@
---
- name: Restart hermes-a2a
ansible.builtin.systemd:
name: "{{ fleet_mtls_hermes_service }}"
state: restarted
when: ansible_service_mgr == "systemd"
ignore_errors: true # service may not exist in all environments

View File

@@ -0,0 +1,17 @@
---
galaxy_info:
role_name: fleet_mtls_certs
author: hermes-agent
description: >
Distribute fleet CA and per-agent mTLS certificates to Hermes fleet nodes.
Part of issue #806 — A2A mutual TLS between fleet agents.
min_ansible_version: "2.14"
platforms:
- name: Debian
versions: [bookworm, bullseye]
- name: Ubuntu
versions: ["22.04", "24.04"]
- name: EL
versions: ["8", "9"]
dependencies: []

View File

@@ -0,0 +1,99 @@
---
# fleet_mtls_certs/tasks/main.yml
#
# Distribute the fleet CA certificate and the per-agent TLS cert+key to
# each fleet node. Triggers a hermes-a2a service restart when any cert
# changes.
#
# Refs #806 — A2A mutual TLS between fleet agents.
- name: Verify agent cert source files exist on control node
ansible.builtin.stat:
path: "{{ item }}"
register: _src_stat
delegate_to: localhost
loop:
- "{{ fleet_mtls_ca_cert_src }}"
- "{{ fleet_mtls_agent_certs_dir }}/{{ fleet_mtls_agent_name }}/{{ fleet_mtls_agent_name }}.crt"
- "{{ fleet_mtls_agent_certs_dir }}/{{ fleet_mtls_agent_name }}/{{ fleet_mtls_agent_name }}.key"
loop_control:
label: "{{ item | basename }}"
- name: Fail if any source cert is missing
ansible.builtin.fail:
msg: >
Required cert file not found: {{ item.item }}
Run scripts/gen_fleet_ca.sh and scripts/gen_agent_cert.sh --agent {{ fleet_mtls_agent_name }} first.
when: not item.stat.exists
loop: "{{ _src_stat.results }}"
loop_control:
label: "{{ item.item | basename }}"
# -----------------------------------------------------------------------
# Remote directory structure
# -----------------------------------------------------------------------
- name: Create remote PKI directories
ansible.builtin.file:
path: "{{ item }}"
state: directory
owner: root
group: root
mode: "0750"
loop:
- "{{ fleet_mtls_remote_pki_dir }}"
- "{{ fleet_mtls_remote_ca_dir }}"
- "{{ fleet_mtls_remote_agent_dir }}"
# -----------------------------------------------------------------------
# Fleet CA certificate (public — read-only for all)
# -----------------------------------------------------------------------
- name: Deploy fleet CA certificate
ansible.builtin.copy:
src: "{{ fleet_mtls_ca_cert_src }}"
dest: "{{ fleet_mtls_remote_ca_dir }}/fleet-ca.crt"
owner: root
group: root
mode: "0644"
notify: Restart hermes-a2a
# -----------------------------------------------------------------------
# Per-agent certificate (public portion)
# -----------------------------------------------------------------------
- name: Deploy agent certificate
ansible.builtin.copy:
src: "{{ fleet_mtls_agent_certs_dir }}/{{ fleet_mtls_agent_name }}/{{ fleet_mtls_agent_name }}.crt"
dest: "{{ fleet_mtls_remote_agent_dir }}/agent.crt"
owner: root
group: root
mode: "0644"
notify: Restart hermes-a2a
# -----------------------------------------------------------------------
# Per-agent private key (secret — root-only read)
# -----------------------------------------------------------------------
- name: Deploy agent private key
ansible.builtin.copy:
src: "{{ fleet_mtls_agent_certs_dir }}/{{ fleet_mtls_agent_name }}/{{ fleet_mtls_agent_name }}.key"
dest: "{{ fleet_mtls_remote_agent_dir }}/agent.key"
owner: root
group: root
mode: "0600"
no_log: true # suppress file content from Ansible output
notify: Restart hermes-a2a
# -----------------------------------------------------------------------
# Environment file for hermes-a2a systemd unit
# -----------------------------------------------------------------------
- name: Write hermes-a2a environment file
ansible.builtin.template:
src: hermes_a2a_env.j2
dest: /etc/hermes/a2a.env
owner: root
group: root
mode: "0640"
notify: Restart hermes-a2a

View File

@@ -0,0 +1,10 @@
# Managed by Ansible — fleet_mtls_certs role
# Environment variables for the hermes-a2a systemd service.
# Source this file in the [Service] section: EnvironmentFile=/etc/hermes/a2a.env
HERMES_AGENT_NAME={{ fleet_mtls_agent_name }}
HERMES_A2A_CERT={{ fleet_mtls_remote_agent_dir }}/agent.crt
HERMES_A2A_KEY={{ fleet_mtls_remote_agent_dir }}/agent.key
HERMES_A2A_CA={{ fleet_mtls_remote_ca_dir }}/fleet-ca.crt
HERMES_A2A_HOST=0.0.0.0
HERMES_A2A_PORT=9443

View File

@@ -0,0 +1,21 @@
---
# Ansible role: hermes_mtls
# Distributes fleet mTLS certificates to Hermes agent nodes.
#
# Required variables (set in inventory / group_vars / --extra-vars):
# hermes_mtls_local_ca_cert Local path on the Ansible controller to fleet-ca.crt
# hermes_mtls_local_agent_cert Local path to this agent's .crt file
# hermes_mtls_local_agent_key Local path to this agent's .key file
#
# Optional overrides:
hermes_mtls_cert_dir: /etc/hermes/certs
hermes_mtls_cert_owner: hermes
hermes_mtls_cert_group: hermes
hermes_mtls_cert_mode: "0640"
hermes_mtls_ca_cert_mode: "0644"
# Env file that Hermes reads on startup (systemd EnvironmentFile or .env)
hermes_mtls_env_file: /etc/hermes/mtls.env
# Hermes systemd service name — restarted after cert changes
hermes_mtls_service: hermes-gateway

View File

@@ -0,0 +1,7 @@
---
- name: Restart hermes service
ansible.builtin.systemd:
name: "{{ hermes_mtls_service }}"
state: restarted
daemon_reload: true
when: ansible_service_mgr == "systemd"

View File

@@ -0,0 +1,16 @@
---
galaxy_info:
role_name: hermes_mtls
author: Hermes Fleet
description: Distribute mTLS certificates to Hermes fleet nodes for A2A authentication
license: MIT
min_ansible_version: "2.14"
platforms:
- name: Ubuntu
versions: ["22.04", "24.04"]
- name: Debian
versions: ["12"]
- name: EL
versions: ["9"]
dependencies: []

View File

@@ -0,0 +1,67 @@
---
# hermes_mtls role — distribute fleet mTLS certificates to a Hermes agent node.
#
# This role:
# 1. Creates the cert directory on the remote node
# 2. Copies the Fleet CA cert, agent cert, and agent key
# 3. Writes an env file with HERMES_MTLS_* variables
# 4. Restarts the Hermes service if any cert changed
- name: Ensure cert directory exists
ansible.builtin.file:
path: "{{ hermes_mtls_cert_dir }}"
state: directory
owner: "{{ hermes_mtls_cert_owner }}"
group: "{{ hermes_mtls_cert_group }}"
mode: "0750"
- name: Copy Fleet CA certificate
ansible.builtin.copy:
src: "{{ hermes_mtls_local_ca_cert }}"
dest: "{{ hermes_mtls_cert_dir }}/fleet-ca.crt"
owner: "{{ hermes_mtls_cert_owner }}"
group: "{{ hermes_mtls_cert_group }}"
mode: "{{ hermes_mtls_ca_cert_mode }}"
notify: Restart hermes service
- name: Copy agent TLS certificate
ansible.builtin.copy:
src: "{{ hermes_mtls_local_agent_cert }}"
dest: "{{ hermes_mtls_cert_dir }}/agent.crt"
owner: "{{ hermes_mtls_cert_owner }}"
group: "{{ hermes_mtls_cert_group }}"
mode: "{{ hermes_mtls_cert_mode }}"
notify: Restart hermes service
- name: Copy agent TLS private key
ansible.builtin.copy:
src: "{{ hermes_mtls_local_agent_key }}"
dest: "{{ hermes_mtls_cert_dir }}/agent.key"
owner: "{{ hermes_mtls_cert_owner }}"
group: "{{ hermes_mtls_cert_group }}"
mode: "0600"
notify: Restart hermes service
- name: Write mTLS environment file
ansible.builtin.template:
src: mtls.env.j2
dest: "{{ hermes_mtls_env_file }}"
owner: "{{ hermes_mtls_cert_owner }}"
group: "{{ hermes_mtls_cert_group }}"
mode: "0640"
notify: Restart hermes service
- name: Verify cert files are readable by service user
ansible.builtin.stat:
path: "{{ item }}"
loop:
- "{{ hermes_mtls_cert_dir }}/fleet-ca.crt"
- "{{ hermes_mtls_cert_dir }}/agent.crt"
- "{{ hermes_mtls_cert_dir }}/agent.key"
register: _cert_stat
- name: Assert all cert files exist
ansible.builtin.assert:
that: item.stat.exists
fail_msg: "Expected cert file missing: {{ item.item }}"
loop: "{{ _cert_stat.results }}"

View File

@@ -0,0 +1,8 @@
# Hermes mTLS environment — generated by hermes_mtls Ansible role
# Source this file or use as a systemd EnvironmentFile=
# WARNING: This file contains the path to the agent's private key.
# Restrict read access to the hermes service user.
HERMES_MTLS_CERT={{ hermes_mtls_cert_dir }}/agent.crt
HERMES_MTLS_KEY={{ hermes_mtls_cert_dir }}/agent.key
HERMES_MTLS_CA={{ hermes_mtls_cert_dir }}/fleet-ca.crt

View File

@@ -348,7 +348,7 @@ compression:
# Other providers pick a sensible default automatically.
#
# auxiliary:
# # Image analysis: vision_analyze tool
# # Image analysis: vision_analyze tool + browser screenshots
# vision:
# provider: "auto"
# model: "" # e.g. "google/gemini-2.5-flash", "openai/gpt-4o"
@@ -356,15 +356,6 @@ compression:
# download_timeout: 30 # Image HTTP download timeout (seconds)
# # Increase for slow connections or self-hosted image servers
#
# # Browser screenshot analysis (browser_vision tool)
# # Defaults to Gemma 4 27B — natively multimodal, same model family as the main
# # text model, which avoids model-switching overhead and improves context continuity.
# # Override with any vision-capable model. Set to "" to fall back to auto-detection.
# # Can also be overridden per-session with BROWSER_VISION_MODEL env var.
# browser_vision:
# model: "google/gemma-4-27b-it" # default; override e.g. "google/gemini-2.5-flash"
# timeout: 120 # API call timeout in seconds (default 120s)
#
# # Web page scraping / summarization + browser page text extraction
# web_extract:
# provider: "auto"

View File

@@ -441,12 +441,6 @@ DEFAULT_CONFIG = {
"timeout": 120, # seconds — LLM API call timeout; vision payloads need generous timeout
"download_timeout": 30, # seconds — image HTTP download timeout; increase for slow connections
},
# browser_vision: model for browser screenshot analysis (browser_tool.browser_vision).
# Defaults to google/gemma-4-27b-it (Gemma 4 native multimodal) when unset.
# BROWSER_VISION_MODEL env var takes precedence over this setting.
"browser_vision": {
"model": "", # e.g. "google/gemma-4-27b-it", "openai/gpt-4o"
},
"web_extract": {
"provider": "auto",
"model": "",

View File

@@ -130,7 +130,6 @@ _PROVIDER_MODELS: dict[str, list[str]] = {
"gemini-2.5-flash",
"gemini-2.5-flash-lite",
# Gemma open models (also served via AI Studio)
"gemma-4-27b-it", # default browser vision model (multimodal)
"gemma-4-31b-it",
"gemma-4-26b-it",
],

View File

@@ -46,6 +46,7 @@ from hermes_cli.config import (
)
from gateway.status import get_running_pid, read_runtime_status
from agent.agent_card import get_agent_card_json
from agent.mtls import is_mtls_configured, MTLSMiddleware, build_server_ssl_context
try:
from fastapi import FastAPI, HTTPException, Request
@@ -87,6 +88,10 @@ app.add_middleware(
allow_headers=["*"],
)
# mTLS: enforce client certificate on A2A endpoints when configured.
# Activated by setting HERMES_MTLS_CERT, HERMES_MTLS_KEY, HERMES_MTLS_CA.
app.add_middleware(MTLSMiddleware)
# ---------------------------------------------------------------------------
# Endpoints that do NOT require the session token. Everything else under
# /api/ is gated by the auth middleware below. Keep this list minimal —
@@ -2105,6 +2110,20 @@ def start_server(
"authentication. Only use on trusted networks.", host,
)
# mTLS: when configured, pass SSL context to uvicorn so all connections
# are TLS with mandatory client certificate verification.
ssl_context = None
scheme = "http"
if is_mtls_configured():
try:
ssl_context = build_server_ssl_context()
scheme = "https"
_log.info(
"mTLS enabled — server requires client certificates (A2A auth)"
)
except Exception as exc:
_log.error("Failed to build mTLS SSL context: %s — starting without TLS", exc)
if open_browser:
import threading
import webbrowser
@@ -2112,9 +2131,11 @@ def start_server(
def _open():
import time as _t
_t.sleep(1.0)
webbrowser.open(f"http://{host}:{port}")
webbrowser.open(f"{scheme}://{host}:{port}")
threading.Thread(target=_open, daemon=True).start()
print(f" Hermes Web UI → http://{host}:{port}")
uvicorn.run(app, host=host, port=port, log_level="warning")
print(f" Hermes Web UI → {scheme}://{host}:{port}")
if ssl_context is not None:
print(" mTLS enabled — client certificate required for A2A endpoints")
uvicorn.run(app, host=host, port=port, log_level="warning", ssl=ssl_context)

129
scripts/gen_agent_cert.sh Normal file
View File

@@ -0,0 +1,129 @@
#!/usr/bin/env bash
# gen_agent_cert.sh — Generate a TLS certificate for a fleet agent.
#
# Usage:
# ./scripts/gen_agent_cert.sh --agent <name> [--ca-dir <dir>] [--out-dir <dir>]
#
# Known agents: timmy, allegro, ezra (case-insensitive; any name is accepted)
#
# Outputs (default: ~/.hermes/pki/agents/<name>/):
# <name>.key — agent private key (chmod 600, stays on the agent host)
# <name>.crt — agent certificate (signed by the fleet CA)
#
# Run gen_fleet_ca.sh first if you haven't already.
# Refs #806
set -euo pipefail
CERT_DAYS=365 # 1 year; rotate annually
KEY_BITS=2048
# ---------------------------------------------------------------------------
# Parse args
# ---------------------------------------------------------------------------
AGENT_NAME=""
CA_DIR="${HOME}/.hermes/pki/ca"
OUT_DIR=""
while [[ $# -gt 0 ]]; do
case "$1" in
--agent) AGENT_NAME="${2,,}"; shift 2 ;; # lower-case
--ca-dir) CA_DIR="$2"; shift 2 ;;
--out-dir) OUT_DIR="$2"; shift 2 ;;
-h|--help)
echo "Usage: $0 --agent <name> [--ca-dir <dir>] [--out-dir <dir>]"
echo " Known agents: timmy, allegro, ezra"
exit 0
;;
*)
echo "Unknown option: $1" >&2
exit 1
;;
esac
done
if [[ -z "$AGENT_NAME" ]]; then
echo "ERROR: --agent <name> is required." >&2
exit 1
fi
OUT_DIR="${OUT_DIR:-${HOME}/.hermes/pki/agents/${AGENT_NAME}}"
# ---------------------------------------------------------------------------
# Prereq check
# ---------------------------------------------------------------------------
if ! command -v openssl &>/dev/null; then
echo "ERROR: openssl not found." >&2
exit 1
fi
CA_KEY="$CA_DIR/fleet-ca.key"
CA_CRT="$CA_DIR/fleet-ca.crt"
if [[ ! -f "$CA_KEY" || ! -f "$CA_CRT" ]]; then
echo "ERROR: Fleet CA not found in $CA_DIR" >&2
echo " Run scripts/gen_fleet_ca.sh first." >&2
exit 1
fi
mkdir -p "$OUT_DIR"
chmod 700 "$OUT_DIR"
AGENT_KEY="$OUT_DIR/${AGENT_NAME}.key"
AGENT_CRT="$OUT_DIR/${AGENT_NAME}.crt"
AGENT_CSR="$OUT_DIR/${AGENT_NAME}.csr"
if [[ -f "$AGENT_KEY" || -f "$AGENT_CRT" ]]; then
echo "Cert for agent '$AGENT_NAME' already exists in $OUT_DIR"
echo " $AGENT_KEY"
echo " $AGENT_CRT"
echo "Delete them manually if you want to regenerate."
exit 0
fi
echo "Generating cert for agent '$AGENT_NAME' ..."
SUBJECT="/CN=${AGENT_NAME}.fleet.hermes/O=Hermes/OU=Fleet Agent"
# Agent private key
openssl genrsa -out "$AGENT_KEY" "$KEY_BITS" 2>/dev/null
chmod 600 "$AGENT_KEY"
# Certificate Signing Request
openssl req -new \
-key "$AGENT_KEY" \
-out "$AGENT_CSR" \
-subj "$SUBJECT" 2>/dev/null
# Sign with fleet CA — include SAN so modern TLS stacks accept it
EXT_CONF=$(mktemp)
trap 'rm -f "$EXT_CONF" "$AGENT_CSR"' EXIT
cat > "$EXT_CONF" <<EOF
[v3_agent]
basicConstraints = CA:FALSE
keyUsage = critical, digitalSignature, keyEncipherment
extendedKeyUsage = clientAuth, serverAuth
subjectKeyIdentifier = hash
authorityKeyIdentifier = keyid,issuer
subjectAltName = DNS:${AGENT_NAME}.fleet.hermes, DNS:${AGENT_NAME}
EOF
openssl x509 -req \
-in "$AGENT_CSR" \
-CA "$CA_CRT" \
-CAkey "$CA_KEY" \
-CAcreateserial \
-out "$AGENT_CRT" \
-days "$CERT_DAYS" \
-extfile "$EXT_CONF" \
-extensions v3_agent 2>/dev/null
chmod 644 "$AGENT_CRT"
echo ""
echo "Agent cert generated:"
echo " Private key : $AGENT_KEY"
echo " Certificate : $AGENT_CRT"
echo ""
openssl x509 -in "$AGENT_CRT" -noout -subject -issuer -dates

83
scripts/gen_fleet_ca.sh Normal file
View File

@@ -0,0 +1,83 @@
#!/usr/bin/env bash
# gen_fleet_ca.sh — Generate the Hermes fleet Certificate Authority.
#
# Usage:
# ./scripts/gen_fleet_ca.sh [--out-dir <dir>]
#
# Outputs (default: ~/.hermes/pki/ca/):
# fleet-ca.key — CA private key (chmod 600, keep secret)
# fleet-ca.crt — CA certificate (distribute to all fleet nodes)
#
# The CA is valid for 10 years. Regenerate + redistribute when it expires.
# Refs #806
set -euo pipefail
CA_SUBJECT="/CN=Hermes Fleet CA/O=Hermes/OU=Fleet"
CA_DAYS=3650 # 10 years
# ---------------------------------------------------------------------------
# Parse args
# ---------------------------------------------------------------------------
OUT_DIR="${HOME}/.hermes/pki/ca"
while [[ $# -gt 0 ]]; do
case "$1" in
--out-dir) OUT_DIR="$2"; shift 2 ;;
-h|--help)
echo "Usage: $0 [--out-dir <dir>]"
exit 0
;;
*)
echo "Unknown option: $1" >&2
exit 1
;;
esac
done
# ---------------------------------------------------------------------------
# Prereq check
# ---------------------------------------------------------------------------
if ! command -v openssl &>/dev/null; then
echo "ERROR: openssl not found. Install OpenSSL and re-run." >&2
exit 1
fi
mkdir -p "$OUT_DIR"
chmod 700 "$OUT_DIR"
CA_KEY="$OUT_DIR/fleet-ca.key"
CA_CRT="$OUT_DIR/fleet-ca.crt"
if [[ -f "$CA_KEY" || -f "$CA_CRT" ]]; then
echo "Fleet CA already exists in $OUT_DIR"
echo " $CA_KEY"
echo " $CA_CRT"
echo "Delete them manually if you want to regenerate."
exit 0
fi
echo "Generating fleet CA in $OUT_DIR ..."
# Generate 4096-bit RSA key for the CA
openssl genrsa -out "$CA_KEY" 4096 2>/dev/null
chmod 600 "$CA_KEY"
# Self-sign the CA certificate
openssl req -new -x509 \
-key "$CA_KEY" \
-out "$CA_CRT" \
-days "$CA_DAYS" \
-subj "$CA_SUBJECT" \
-addext "basicConstraints=critical,CA:TRUE,pathlen:0" \
-addext "keyUsage=critical,keyCertSign,cRLSign" \
-addext "subjectKeyIdentifier=hash" 2>/dev/null
chmod 644 "$CA_CRT"
echo ""
echo "Fleet CA generated successfully:"
echo " Private key : $CA_KEY (keep secret)"
echo " Certificate : $CA_CRT (distribute to all fleet nodes)"
echo ""
openssl x509 -in "$CA_CRT" -noout -subject -dates

View File

@@ -0,0 +1,574 @@
"""
Tests for A2A mutual-TLS authentication.
Scenarios covered:
- authorized agent (valid fleet-CA-signed cert) is accepted
- unauthorized agent (self-signed cert) is rejected with SSLError
- missing client cert is rejected
- build_server_ssl_context raises FileNotFoundError for missing paths
- build_client_ssl_context raises FileNotFoundError for missing paths
- A2AServer.start() / stop() lifecycle (no network I/O)
All TLS I/O is done in-process against a loopback server so no ports need
to be opened on a CI runner.
Refs #806
"""
from __future__ import annotations
import datetime
import ipaddress
import ssl
import threading
import time
import urllib.request
import urllib.error
from pathlib import Path
from typing import Tuple
import pytest
# ---------------------------------------------------------------------------
# Helpers — generate self-signed certs in-memory with Python's ``cryptography``
# library (dev extra). If cryptography is unavailable we skip the network
# tests gracefully.
# ---------------------------------------------------------------------------
try:
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
import cryptography.hazmat.backends as _backends
_CRYPTO_AVAILABLE = True
except ImportError:
_CRYPTO_AVAILABLE = False
_requires_crypto = pytest.mark.skipif(
not _CRYPTO_AVAILABLE,
reason="cryptography package not installed",
)
# ---------------------------------------------------------------------------
# Fixture helpers
# ---------------------------------------------------------------------------
def _make_ca_keypair(tmp_path: Path) -> Tuple[Path, Path]:
"""Generate a self-signed CA cert+key and write to *tmp_path*."""
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
name = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, "Test Fleet CA"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "TestOrg"),
])
now = datetime.datetime.now(datetime.timezone.utc)
cert = (
x509.CertificateBuilder()
.subject_name(name)
.issuer_name(name)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + datetime.timedelta(days=3650))
.add_extension(x509.BasicConstraints(ca=True, path_length=0), critical=True)
.add_extension(
x509.KeyUsage(
digital_signature=False, key_cert_sign=True, crl_sign=True,
content_commitment=False, key_encipherment=False,
data_encipherment=False, key_agreement=False,
encipher_only=False, decipher_only=False,
),
critical=True,
)
.sign(key, hashes.SHA256())
)
key_path = tmp_path / "ca.key"
cert_path = tmp_path / "ca.crt"
key_path.write_bytes(key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption(),
))
cert_path.write_bytes(cert.public_bytes(serialization.Encoding.PEM))
return cert_path, key_path
def _make_agent_keypair(
tmp_path: Path,
name: str,
ca_cert_path: Path,
ca_key_path: Path,
) -> Tuple[Path, Path]:
"""Generate an agent cert signed by the test CA."""
ca_cert = x509.load_pem_x509_certificate(ca_cert_path.read_bytes())
ca_key = serialization.load_pem_private_key(
ca_key_path.read_bytes(), password=None
)
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
subject = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, f"{name}.fleet.hermes"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "TestOrg"),
])
now = datetime.datetime.now(datetime.timezone.utc)
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(ca_cert.subject)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + datetime.timedelta(days=365))
.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True)
.add_extension(
x509.SubjectAlternativeName([
x509.DNSName(f"{name}.fleet.hermes"),
x509.DNSName(name),
x509.IPAddress(ipaddress.IPv4Address("127.0.0.1")),
]),
critical=False,
)
.add_extension(
x509.ExtendedKeyUsage([
x509.ExtendedKeyUsageOID.CLIENT_AUTH,
x509.ExtendedKeyUsageOID.SERVER_AUTH,
]),
critical=False,
)
.sign(ca_key, hashes.SHA256())
)
key_path = tmp_path / f"{name}.key"
cert_path = tmp_path / f"{name}.crt"
key_path.write_bytes(key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption(),
))
cert_path.write_bytes(cert.public_bytes(serialization.Encoding.PEM))
return cert_path, key_path
def _make_self_signed_keypair(tmp_path: Path, name: str) -> Tuple[Path, Path]:
"""Generate a self-signed cert NOT signed by the test CA (unauthorized)."""
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
subject = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, f"{name}.rogue"),
])
now = datetime.datetime.now(datetime.timezone.utc)
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(subject)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + datetime.timedelta(days=365))
.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True)
.add_extension(
x509.SubjectAlternativeName([x509.IPAddress(ipaddress.IPv4Address("127.0.0.1"))]),
critical=False,
)
.sign(key, hashes.SHA256())
)
key_path = tmp_path / f"{name}_rogue.key"
cert_path = tmp_path / f"{name}_rogue.crt"
key_path.write_bytes(key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption(),
))
cert_path.write_bytes(cert.public_bytes(serialization.Encoding.PEM))
return cert_path, key_path
# ---------------------------------------------------------------------------
# Unit tests — no network I/O
# ---------------------------------------------------------------------------
class TestBuildSslContextErrors:
def test_server_context_missing_cert(self, tmp_path):
from agent.a2a_mtls import build_server_ssl_context
with pytest.raises(FileNotFoundError, match="mTLS"):
build_server_ssl_context(
cert=tmp_path / "nope.crt",
key=tmp_path / "nope.key",
ca=tmp_path / "nope.crt",
)
def test_client_context_missing_cert(self, tmp_path):
from agent.a2a_mtls import build_client_ssl_context
with pytest.raises(FileNotFoundError, match="mTLS client"):
build_client_ssl_context(
cert=tmp_path / "nope.crt",
key=tmp_path / "nope.key",
ca=tmp_path / "nope.crt",
)
@_requires_crypto
def test_server_context_builds_with_valid_certs(self, tmp_path):
from agent.a2a_mtls import build_server_ssl_context
ca_dir = tmp_path / "ca"
ca_dir.mkdir()
ca_crt, ca_key = _make_ca_keypair(ca_dir)
srv_crt, srv_key = _make_agent_keypair(
tmp_path, "srv", ca_crt, ca_key
)
ctx = build_server_ssl_context(cert=srv_crt, key=srv_key, ca=ca_crt)
assert isinstance(ctx, ssl.SSLContext)
assert ctx.verify_mode == ssl.CERT_REQUIRED
@_requires_crypto
def test_client_context_builds_with_valid_certs(self, tmp_path):
from agent.a2a_mtls import build_client_ssl_context
ca_dir = tmp_path / "ca"
ca_dir.mkdir()
ca_crt, ca_key = _make_ca_keypair(ca_dir)
cli_crt, cli_key = _make_agent_keypair(
tmp_path, "cli", ca_crt, ca_key
)
ctx = build_client_ssl_context(cert=cli_crt, key=cli_key, ca=ca_crt)
assert isinstance(ctx, ssl.SSLContext)
assert ctx.verify_mode == ssl.CERT_REQUIRED
# ---------------------------------------------------------------------------
# Integration tests — loopback mTLS server
# ---------------------------------------------------------------------------
def _find_free_port() -> int:
import socket
with socket.socket() as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
def _https_get(url: str, ssl_ctx: ssl.SSLContext) -> int:
"""Return the HTTP status code for a GET request, or raise SSLError."""
req = urllib.request.urlopen(url, context=ssl_ctx, timeout=5)
return req.status
@_requires_crypto
class TestMutualTLSAuth:
"""End-to-end mTLS auth over a loopback connection."""
@pytest.fixture(autouse=True)
def _pki(self, tmp_path):
"""Set up a fleet CA and agent certs for timmy (server) and allegro (authorized client)."""
ca_dir = tmp_path / "ca"
ca_dir.mkdir()
self.ca_crt, self.ca_key = _make_ca_keypair(ca_dir)
agent_dir = tmp_path / "agents"
agent_dir.mkdir()
# Server agent: timmy
self.srv_crt, self.srv_key = _make_agent_keypair(
agent_dir, "timmy", self.ca_crt, self.ca_key
)
# Authorized client agent: allegro
self.cli_crt, self.cli_key = _make_agent_keypair(
agent_dir, "allegro", self.ca_crt, self.ca_key
)
# Unauthorized (self-signed) client: rogue
self.rogue_crt, self.rogue_key = _make_self_signed_keypair(agent_dir, "rogue")
@pytest.fixture()
def running_server(self):
"""Start an A2AServer on a free loopback port, yield the URL, stop after test."""
from agent.a2a_mtls import A2AServer
port = _find_free_port()
server = A2AServer(
cert=self.srv_crt,
key=self.srv_key,
ca=self.ca_crt,
host="127.0.0.1",
port=port,
)
server.start(daemon=True)
time.sleep(0.15) # let the thread bind
yield f"https://127.0.0.1:{port}"
server.stop()
def _authorized_ctx(self) -> ssl.SSLContext:
from agent.a2a_mtls import build_client_ssl_context
ctx = build_client_ssl_context(
cert=self.cli_crt, key=self.cli_key, ca=self.ca_crt
)
ctx.check_hostname = False # loopback IP doesn't match DNS SAN
return ctx
def _unauthorized_ctx(self) -> ssl.SSLContext:
"""Client context with a self-signed cert not trusted by the server CA."""
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
ctx.load_cert_chain(certfile=str(self.rogue_crt), keyfile=str(self.rogue_key))
# Load the real fleet CA so server cert is accepted — but our client
# cert is self-signed and will be rejected by the server.
ctx.load_verify_locations(cafile=str(self.ca_crt))
ctx.check_hostname = False
return ctx
def _no_client_cert_ctx(self) -> ssl.SSLContext:
"""Client context with no client certificate at all."""
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
ctx.load_verify_locations(cafile=str(self.ca_crt))
ctx.check_hostname = False
return ctx
# ------------------------------------------------------------------
# Authorized agent accepted
# ------------------------------------------------------------------
def test_authorized_agent_accepted(self, running_server):
"""An agent with a fleet-CA-signed cert gets a 200-range response."""
status = _https_get(
running_server + "/.well-known/agent-card.json",
self._authorized_ctx(),
)
assert status == 200
def test_authorized_agent_task_endpoint(self, running_server):
"""POST /a2a/task returns 202 for an authorized agent."""
import urllib.request
req = urllib.request.Request(
running_server + "/a2a/task",
data=b'{"hello":"world"}',
method="POST",
)
req.add_header("Content-Type", "application/json")
resp = urllib.request.urlopen(req, context=self._authorized_ctx(), timeout=5)
assert resp.status == 202
# ------------------------------------------------------------------
# Unauthorized agent rejected
# ------------------------------------------------------------------
def test_unauthorized_agent_rejected(self, running_server):
"""A self-signed cert not signed by the fleet CA is rejected at TLS handshake."""
with pytest.raises((ssl.SSLError, OSError)):
_https_get(running_server + "/", self._unauthorized_ctx())
def test_no_client_cert_rejected(self, running_server):
"""A client with no cert at all is rejected at TLS handshake."""
with pytest.raises((ssl.SSLError, OSError)):
_https_get(running_server + "/", self._no_client_cert_ctx())
# ------------------------------------------------------------------
# Server lifecycle
# ------------------------------------------------------------------
def test_server_stop_is_idempotent(self):
"""Calling stop() twice does not raise."""
from agent.a2a_mtls import A2AServer
port = _find_free_port()
server = A2AServer(
cert=self.srv_crt, key=self.srv_key, ca=self.ca_crt,
host="127.0.0.1", port=port,
)
server.start(daemon=True)
time.sleep(0.1)
server.stop()
server.stop() # second call must not raise
# ---------------------------------------------------------------------------
# server_from_env() — environment variable wiring
# ---------------------------------------------------------------------------
class TestServerFromEnv:
def test_reads_env_vars(self, tmp_path, monkeypatch):
# Create dummy files so FileNotFoundError isn't triggered
cert = tmp_path / "a.crt"
key = tmp_path / "a.key"
ca = tmp_path / "ca.crt"
for f in (cert, key, ca):
f.write_text("PLACEHOLDER")
monkeypatch.setenv("HERMES_A2A_CERT", str(cert))
monkeypatch.setenv("HERMES_A2A_KEY", str(key))
monkeypatch.setenv("HERMES_A2A_CA", str(ca))
monkeypatch.setenv("HERMES_A2A_HOST", "127.0.0.2")
monkeypatch.setenv("HERMES_A2A_PORT", "19443")
from agent.a2a_mtls import server_from_env
srv = server_from_env()
assert srv.cert == cert
assert srv.key == key
assert srv.ca == ca
assert srv.host == "127.0.0.2"
assert srv.port == 19443
def test_uses_agent_name_for_defaults(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("HERMES_AGENT_NAME", "ezra")
# Unset explicit cert overrides
monkeypatch.delenv("HERMES_A2A_CERT", raising=False)
monkeypatch.delenv("HERMES_A2A_KEY", raising=False)
monkeypatch.delenv("HERMES_A2A_CA", raising=False)
from agent.a2a_mtls import server_from_env
srv = server_from_env()
assert "ezra" in str(srv.cert)
assert "ezra" in str(srv.key)
assert "fleet-ca" in str(srv.ca)
# ---------------------------------------------------------------------------
# A2AMTLSServer and A2AMTLSClient — routing server + client helper
# ---------------------------------------------------------------------------
@_requires_crypto
class TestA2AMTLSServerAndClient:
"""Tests for the routing-based A2AMTLSServer and A2AMTLSClient."""
@pytest.fixture(autouse=True)
def _pki(self, tmp_path):
ca_dir = tmp_path / "ca"
ca_dir.mkdir()
self.ca_crt, self.ca_key = _make_ca_keypair(ca_dir)
agent_dir = tmp_path / "agents"
agent_dir.mkdir()
self.srv_crt, self.srv_key = _make_agent_keypair(
agent_dir, "timmy", self.ca_crt, self.ca_key
)
self.cli_crt, self.cli_key = _make_agent_keypair(
agent_dir, "allegro", self.ca_crt, self.ca_key
)
self.rogue_crt, self.rogue_key = _make_self_signed_keypair(agent_dir, "rogue")
@pytest.fixture()
def routing_server(self):
from agent.a2a_mtls import A2AMTLSServer
port = _find_free_port()
server = A2AMTLSServer(
cert=self.srv_crt, key=self.srv_key, ca=self.ca_crt,
host="127.0.0.1", port=port,
)
server.add_route("/echo", lambda p, *, peer_cn=None: {"echo": p, "peer": peer_cn})
server.add_route("/tasks/send", lambda p, *, peer_cn=None: {"status": "ok", "echo": p})
with server:
time.sleep(0.1)
yield server, port
def _authorized_ctx(self) -> ssl.SSLContext:
from agent.a2a_mtls import build_client_ssl_context
ctx = build_client_ssl_context(
cert=self.cli_crt, key=self.cli_key, ca=self.ca_crt
)
ctx.check_hostname = False
return ctx
def test_routing_server_get(self, routing_server):
server, port = routing_server
ctx = self._authorized_ctx()
req = urllib.request.Request(f"https://127.0.0.1:{port}/echo")
with urllib.request.urlopen(req, context=ctx, timeout=5) as resp:
import json
data = json.loads(resp.read())
assert data["peer"] is not None # CN present
def test_routing_server_post_payload(self, routing_server):
server, port = routing_server
ctx = self._authorized_ctx()
import json
payload = {"task_id": "abc", "action": "delegate"}
req = urllib.request.Request(
f"https://127.0.0.1:{port}/tasks/send",
data=json.dumps(payload).encode(),
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, context=ctx, timeout=5) as resp:
data = json.loads(resp.read())
assert data["status"] == "ok"
assert data["echo"]["task_id"] == "abc"
def test_routing_server_unknown_route_404(self, routing_server):
server, port = routing_server
ctx = self._authorized_ctx()
req = urllib.request.Request(f"https://127.0.0.1:{port}/nonexistent")
with pytest.raises(urllib.error.URLError) as exc_info:
urllib.request.urlopen(req, context=ctx, timeout=5)
assert "404" in str(exc_info.value)
def test_routing_server_context_manager_stops(self):
from agent.a2a_mtls import A2AMTLSServer
port = _find_free_port()
server = A2AMTLSServer(
cert=self.srv_crt, key=self.srv_key, ca=self.ca_crt,
host="127.0.0.1", port=port,
)
server.add_route("/ping", lambda p, *, peer_cn=None: {"pong": True})
with server:
time.sleep(0.05)
assert server._httpd is not None
assert server._httpd is None # stopped after __exit__
def test_routing_server_rogue_client_rejected(self, routing_server):
server, port = routing_server
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.load_verify_locations(cafile=str(self.ca_crt))
ctx.load_cert_chain(certfile=str(self.rogue_crt), keyfile=str(self.rogue_key))
ctx.check_hostname = False
req = urllib.request.Request(f"https://127.0.0.1:{port}/echo")
with pytest.raises((ssl.SSLError, OSError, urllib.error.URLError)):
urllib.request.urlopen(req, context=ctx, timeout=5)
def test_a2a_mtls_client_get(self, routing_server):
from agent.a2a_mtls import A2AMTLSClient
server, port = routing_server
client = A2AMTLSClient(
cert=self.cli_crt, key=self.cli_key, ca=self.ca_crt
)
result = client.get(f"https://127.0.0.1:{port}/echo")
assert result["peer"] is not None
def test_a2a_mtls_client_post(self, routing_server):
from agent.a2a_mtls import A2AMTLSClient
server, port = routing_server
client = A2AMTLSClient(
cert=self.cli_crt, key=self.cli_key, ca=self.ca_crt
)
result = client.post(f"https://127.0.0.1:{port}/tasks/send", json={"x": 1})
assert result["status"] == "ok"
assert result["echo"]["x"] == 1
def test_a2a_mtls_client_rogue_cert_raises(self, routing_server):
from agent.a2a_mtls import A2AMTLSClient
server, port = routing_server
client = A2AMTLSClient(
cert=self.rogue_crt, key=self.rogue_key, ca=self.ca_crt
)
with pytest.raises((ConnectionError, ssl.SSLError, OSError)):
client.get(f"https://127.0.0.1:{port}/echo")
def test_concurrent_fleet_agents(self, routing_server):
"""timmy (server) accepts concurrent connections from multiple authorized clients."""
from agent.a2a_mtls import build_client_ssl_context
server, port = routing_server
results: dict = {}
errors: dict = {}
def connect(name: str) -> None:
try:
ctx = build_client_ssl_context(
cert=self.cli_crt, key=self.cli_key, ca=self.ca_crt
)
ctx.check_hostname = False
req = urllib.request.Request(f"https://127.0.0.1:{port}/echo")
with urllib.request.urlopen(req, context=ctx, timeout=5) as resp:
import json
results[name] = json.loads(resp.read())
except Exception as exc:
errors[name] = exc
threads = [threading.Thread(target=connect, args=(n,)) for n in ("t1", "t2", "t3")]
for t in threads:
t.start()
for t in threads:
t.join(timeout=10)
assert not errors, f"Concurrent connection errors: {errors}"
assert len(results) == 3

389
tests/test_mtls.py Normal file
View File

@@ -0,0 +1,389 @@
"""
Tests for agent/mtls.py — mutual TLS between fleet agents.
Covers:
- is_mtls_configured() with various env combinations
- build_server_ssl_context() / build_client_ssl_context() with real certs
- MTLSMiddleware: authorized agent accepted, unauthorized agent rejected
"""
import ssl
import datetime
import ipaddress
import os
import pytest
from pathlib import Path
from unittest.mock import patch
# ---------------------------------------------------------------------------
# Helpers: generate real in-memory certs using the `cryptography` library
# ---------------------------------------------------------------------------
try:
from cryptography import x509
from cryptography.x509.oid import NameOID, ExtendedKeyUsageOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
_CRYPTO_AVAILABLE = True
except ImportError:
_CRYPTO_AVAILABLE = False
pytestmark = pytest.mark.skipif(
not _CRYPTO_AVAILABLE,
reason="cryptography package required for mTLS tests",
)
def _make_key():
return rsa.generate_private_key(public_exponent=65537, key_size=2048)
def _write_pem(path: Path, data: bytes) -> None:
path.write_bytes(data)
path.chmod(0o600)
def make_fleet_pki(tmp_path: Path):
"""
Create a minimal Fleet PKI in tmp_path:
- fleet-ca.key / fleet-ca.crt (self-signed CA)
- agent.key / agent.crt (signed by fleet CA, CN=test-agent)
- rogue.key / rogue.crt (self-signed, NOT signed by fleet CA)
Returns a dict of Path objects.
"""
now = datetime.datetime.now(datetime.timezone.utc)
# --- Fleet CA ---
ca_key = _make_key()
ca_name = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, "Hermes Fleet CA"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Hermes Fleet"),
])
ca_cert = (
x509.CertificateBuilder()
.subject_name(ca_name)
.issuer_name(ca_name)
.public_key(ca_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + datetime.timedelta(days=3650))
.add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True)
.add_extension(
x509.KeyUsage(
digital_signature=False, content_commitment=False,
key_encipherment=False, data_encipherment=False,
key_agreement=False, key_cert_sign=True, crl_sign=True,
encipher_only=False, decipher_only=False,
),
critical=True,
)
.sign(ca_key, hashes.SHA256())
)
# --- Fleet agent cert ---
agent_key = _make_key()
agent_name = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, "test-agent"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Hermes Fleet"),
])
agent_cert = (
x509.CertificateBuilder()
.subject_name(agent_name)
.issuer_name(ca_name)
.public_key(agent_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + datetime.timedelta(days=730))
.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True)
.add_extension(
x509.SubjectAlternativeName([
x509.DNSName("test-agent"),
x509.DNSName("localhost"),
x509.IPAddress(ipaddress.IPv4Address("127.0.0.1")),
]),
critical=False,
)
.add_extension(
x509.ExtendedKeyUsage([
ExtendedKeyUsageOID.CLIENT_AUTH,
ExtendedKeyUsageOID.SERVER_AUTH,
]),
critical=False,
)
.sign(ca_key, hashes.SHA256())
)
# --- Rogue cert (self-signed, not from fleet CA) ---
rogue_key = _make_key()
rogue_name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "rogue-agent")])
rogue_cert = (
x509.CertificateBuilder()
.subject_name(rogue_name)
.issuer_name(rogue_name)
.public_key(rogue_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + datetime.timedelta(days=365))
.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True)
.sign(rogue_key, hashes.SHA256())
)
# Write to tmp_path
pem = serialization.Encoding.PEM
private_fmt = serialization.PrivateFormat.TraditionalOpenSSL
no_enc = serialization.NoEncryption()
paths = {}
paths["ca_key"] = tmp_path / "fleet-ca.key"
_write_pem(paths["ca_key"], ca_key.private_bytes(pem, private_fmt, no_enc))
paths["ca_cert"] = tmp_path / "fleet-ca.crt"
_write_pem(paths["ca_cert"], ca_cert.public_bytes(pem))
paths["agent_key"] = tmp_path / "agent.key"
_write_pem(paths["agent_key"], agent_key.private_bytes(pem, private_fmt, no_enc))
paths["agent_cert"] = tmp_path / "agent.crt"
_write_pem(paths["agent_cert"], agent_cert.public_bytes(pem))
paths["rogue_key"] = tmp_path / "rogue.key"
_write_pem(paths["rogue_key"], rogue_key.private_bytes(pem, private_fmt, no_enc))
paths["rogue_cert"] = tmp_path / "rogue.crt"
_write_pem(paths["rogue_cert"], rogue_cert.public_bytes(pem))
return paths
# ---------------------------------------------------------------------------
# Tests: is_mtls_configured
# ---------------------------------------------------------------------------
class TestIsMtlsConfigured:
def test_all_vars_missing(self):
from agent.mtls import is_mtls_configured
env = {k: "" for k in ("HERMES_MTLS_CERT", "HERMES_MTLS_KEY", "HERMES_MTLS_CA")}
with patch.dict(os.environ, env, clear=False):
assert not is_mtls_configured()
def test_partial_vars(self, tmp_path):
from agent.mtls import is_mtls_configured
f = tmp_path / "cert.pem"
f.write_text("x")
env = {"HERMES_MTLS_CERT": str(f), "HERMES_MTLS_KEY": "", "HERMES_MTLS_CA": ""}
with patch.dict(os.environ, env, clear=False):
assert not is_mtls_configured()
def test_all_vars_set_but_file_missing(self, tmp_path):
from agent.mtls import is_mtls_configured
env = {
"HERMES_MTLS_CERT": str(tmp_path / "no.crt"),
"HERMES_MTLS_KEY": str(tmp_path / "no.key"),
"HERMES_MTLS_CA": str(tmp_path / "no-ca.crt"),
}
with patch.dict(os.environ, env, clear=False):
assert not is_mtls_configured()
def test_all_vars_set_and_files_exist(self, tmp_path):
from agent.mtls import is_mtls_configured
for name in ("cert.pem", "key.pem", "ca.pem"):
(tmp_path / name).write_text("x")
env = {
"HERMES_MTLS_CERT": str(tmp_path / "cert.pem"),
"HERMES_MTLS_KEY": str(tmp_path / "key.pem"),
"HERMES_MTLS_CA": str(tmp_path / "ca.pem"),
}
with patch.dict(os.environ, env, clear=False):
assert is_mtls_configured()
# ---------------------------------------------------------------------------
# Tests: build_server_ssl_context / build_client_ssl_context
# ---------------------------------------------------------------------------
class TestBuildSslContexts:
def test_raises_when_not_configured(self):
from agent.mtls import build_server_ssl_context, build_client_ssl_context
env = {"HERMES_MTLS_CERT": "", "HERMES_MTLS_KEY": "", "HERMES_MTLS_CA": ""}
with patch.dict(os.environ, env, clear=False):
with pytest.raises(RuntimeError, match="not configured"):
build_server_ssl_context()
with pytest.raises(RuntimeError, match="not configured"):
build_client_ssl_context()
def test_server_context_requires_client_cert(self, tmp_path):
from agent.mtls import build_server_ssl_context
pki = make_fleet_pki(tmp_path)
env = {
"HERMES_MTLS_CERT": str(pki["agent_cert"]),
"HERMES_MTLS_KEY": str(pki["agent_key"]),
"HERMES_MTLS_CA": str(pki["ca_cert"]),
}
with patch.dict(os.environ, env, clear=False):
ctx = build_server_ssl_context()
assert isinstance(ctx, ssl.SSLContext)
assert ctx.verify_mode == ssl.CERT_REQUIRED
def test_client_context_has_cert_required(self, tmp_path):
from agent.mtls import build_client_ssl_context
pki = make_fleet_pki(tmp_path)
env = {
"HERMES_MTLS_CERT": str(pki["agent_cert"]),
"HERMES_MTLS_KEY": str(pki["agent_key"]),
"HERMES_MTLS_CA": str(pki["ca_cert"]),
}
with patch.dict(os.environ, env, clear=False):
ctx = build_client_ssl_context()
assert isinstance(ctx, ssl.SSLContext)
assert ctx.verify_mode == ssl.CERT_REQUIRED
# ---------------------------------------------------------------------------
# Tests: MTLSMiddleware
# ---------------------------------------------------------------------------
def _make_scope(path: str, peer_cert=None) -> dict:
"""Build a minimal ASGI HTTP scope, optionally with a fake TLS peer_cert."""
scope = {
"type": "http",
"path": path,
"extensions": {},
}
if peer_cert is not None:
scope["extensions"]["tls"] = {"peer_cert": peer_cert}
return scope
async def _collect_response(middleware, scope):
"""Drive the middleware and capture (status, body)."""
status = None
body = b""
async def receive():
return {"type": "http.request", "body": b""}
async def send(event):
nonlocal status, body
if event["type"] == "http.response.start":
status = event["status"]
elif event["type"] == "http.response.body":
body += event.get("body", b"")
await middleware(scope, receive, send)
return status, body
class TestMTLSMiddleware:
"""
Unit-test the MTLSMiddleware without spinning up a real server.
We inject mTLS configuration through env-var patching so the middleware
believes it is enabled, and use the ASGI scope's tls extension to simulate
whether a client cert was presented.
"""
def _make_middleware(self, tmp_path, app=None):
"""Return a configured MTLSMiddleware backed by real-looking cert files."""
from agent.mtls import MTLSMiddleware
for name in ("cert.pem", "key.pem", "ca.pem"):
(tmp_path / name).write_text("x")
env = {
"HERMES_MTLS_CERT": str(tmp_path / "cert.pem"),
"HERMES_MTLS_KEY": str(tmp_path / "key.pem"),
"HERMES_MTLS_CA": str(tmp_path / "ca.pem"),
}
async def passthrough(scope, receive, send):
await send({"type": "http.response.start", "status": 200, "headers": []})
await send({"type": "http.response.body", "body": b"ok"})
with patch.dict(os.environ, env, clear=False):
mw = MTLSMiddleware(app or passthrough)
return mw
@pytest.mark.asyncio
async def test_authorized_agent_accepted(self, tmp_path):
"""An A2A route with a valid client cert passes through (200)."""
mw = self._make_middleware(tmp_path)
scope = _make_scope("/.well-known/agent-card.json", peer_cert={"subject": (("commonName", "timmy"),)})
status, body = await _collect_response(mw, scope)
assert status == 200
@pytest.mark.asyncio
async def test_unauthorized_agent_rejected(self, tmp_path):
"""An A2A route with NO client cert is rejected (403)."""
mw = self._make_middleware(tmp_path)
scope = _make_scope("/.well-known/agent-card.json", peer_cert=None)
status, body = await _collect_response(mw, scope)
assert status == 403
assert b"certificate" in body.lower()
@pytest.mark.asyncio
async def test_non_a2a_route_not_gated(self, tmp_path):
"""Non-A2A routes (like /api/status) pass through even without a cert."""
mw = self._make_middleware(tmp_path)
scope = _make_scope("/api/status", peer_cert=None)
status, body = await _collect_response(mw, scope)
assert status == 200
@pytest.mark.asyncio
async def test_agent_card_api_route_gated(self, tmp_path):
"""The /api/agent-card route also requires a client cert."""
mw = self._make_middleware(tmp_path)
scope = _make_scope("/api/agent-card", peer_cert=None)
status, _ = await _collect_response(mw, scope)
assert status == 403
@pytest.mark.asyncio
async def test_middleware_disabled_when_not_configured(self):
"""When mTLS env vars are absent, the middleware is a no-op."""
from agent.mtls import MTLSMiddleware
async def passthrough(scope, receive, send):
await send({"type": "http.response.start", "status": 200, "headers": []})
await send({"type": "http.response.body", "body": b"ok"})
env = {"HERMES_MTLS_CERT": "", "HERMES_MTLS_KEY": "", "HERMES_MTLS_CA": ""}
with patch.dict(os.environ, env, clear=False):
mw = MTLSMiddleware(passthrough)
# Even an A2A route with no cert should pass through
scope = _make_scope("/.well-known/agent-card.json", peer_cert=None)
status, _ = await _collect_response(mw, scope)
assert status == 200
# ---------------------------------------------------------------------------
# Tests: get_peer_cn
# ---------------------------------------------------------------------------
class TestGetPeerCn:
def test_returns_cn_from_subject(self):
from agent.mtls import get_peer_cn
class FakeSSL:
def getpeercert(self):
return {"subject": ((("commonName", "timmy"),),)}
assert get_peer_cn(FakeSSL()) == "timmy"
def test_returns_none_when_no_cert(self):
from agent.mtls import get_peer_cn
class FakeSSL:
def getpeercert(self):
return None
assert get_peer_cn(FakeSSL()) is None
def test_returns_none_on_exception(self):
from agent.mtls import get_peer_cn
class BrokenSSL:
def getpeercert(self):
raise RuntimeError("no ssl")
assert get_peer_cn(BrokenSSL()) is None

View File

@@ -0,0 +1,39 @@
"""Tests for binary_extensions helpers."""
from tools.binary_extensions import has_binary_extension, has_image_extension
def test_has_image_extension_png():
assert has_image_extension("/tmp/test.png") is True
assert has_image_extension("/tmp/test.PNG") is True
def test_has_image_extension_jpg_variants():
assert has_image_extension("/tmp/test.jpg") is True
assert has_image_extension("/tmp/test.jpeg") is True
assert has_image_extension("/tmp/test.JPG") is True
def test_has_image_extension_webp():
assert has_image_extension("/tmp/test.webp") is True
def test_has_image_extension_gif():
assert has_image_extension("/tmp/test.gif") is True
def test_has_image_extension_no_ext():
assert has_image_extension("/tmp/test") is False
def test_has_image_extension_non_image():
assert has_image_extension("/tmp/test.txt") is False
assert has_image_extension("/tmp/test.exe") is False
assert has_image_extension("/tmp/test.pdf") is False
def test_has_binary_extension_includes_images():
"""All image extensions must also be in binary extensions."""
assert has_binary_extension("/tmp/test.png") is True
assert has_binary_extension("/tmp/test.jpg") is True
assert has_binary_extension("/tmp/test.webp") is True

View File

@@ -1,82 +0,0 @@
"""Tests for task routing and timeout config — browser_vision Gemma 4 (Issue #816).
Covers the additional wiring on top of the Gemma 4 default:
- browser_vision() uses task="browser_vision" so auxiliary.browser_vision.*
config is consulted for provider/model/timeout
- call_llm() routes "browser_vision" through vision provider resolution
(same path as "vision" task)
- Timeout is read from auxiliary.browser_vision.timeout before
auxiliary.vision.timeout
Model selection tests are in test_browser_vision_model.py.
"""
import inspect
# ── browser_vision() task routing ────────────────────────────────────────────
class TestBrowserVisionTaskRouting:
"""browser_vision() must use task='browser_vision' in call_llm()."""
def test_call_llm_receives_browser_vision_task(self):
"""browser_vision() source uses task='browser_vision', not 'vision'."""
src = inspect.getsource(
__import__("tools.browser_tool", fromlist=["browser_vision"]).browser_vision
)
assert '"browser_vision"' in src or "'browser_vision'" in src, (
"browser_vision() must pass task='browser_vision' to call_llm(), not 'vision'"
)
def test_call_llm_does_not_use_bare_vision_task(self):
"""The call_llm() invocation must not use task='vision' for browser screenshots."""
import re
src = inspect.getsource(
__import__("tools.browser_tool", fromlist=["browser_vision"]).browser_vision
)
call_llm_blocks = re.findall(r'call_llm\s*\([^)]+\)', src, re.DOTALL)
for block in call_llm_blocks:
assert '"vision"' not in block and "'vision'" not in block, (
f"call_llm() must use task='browser_vision', found 'vision' in: {block}"
)
# ── call_llm() vision routing ────────────────────────────────────────────────
class TestCallLlmBrowserVisionRouting:
"""call_llm(task='browser_vision') must route through vision provider path."""
def test_browser_vision_task_in_vision_branch(self):
"""call_llm() source handles 'browser_vision' in the same branch as 'vision'."""
from agent import auxiliary_client
src = inspect.getsource(auxiliary_client.call_llm)
assert 'task in ("vision", "browser_vision")' in src or \
"task in ('vision', 'browser_vision')" in src, (
"call_llm() should route 'browser_vision' through the vision provider path"
)
# ── timeout resolution ────────────────────────────────────────────────────────
class TestBrowserVisionTimeoutResolution:
"""browser_vision() reads auxiliary.browser_vision.timeout first."""
def test_browser_vision_timeout_checked_before_vision_timeout(self):
"""Source checks auxiliary.browser_vision.timeout before auxiliary.vision.timeout."""
src = inspect.getsource(
__import__("tools.browser_tool", fromlist=["browser_vision"]).browser_vision
)
# Locate the timeout resolution block (before call_kwargs dict)
timeout_block_start = src.find("vision_timeout")
call_kwargs_start = src.find('"task": "browser_vision"')
assert timeout_block_start != -1, "Could not find vision_timeout in browser_vision source"
assert call_kwargs_start != -1, "Could not find task='browser_vision' in browser_vision source"
# The timeout block should mention "browser_vision" before "vision"
block = src[timeout_block_start:call_kwargs_start]
bv_idx = block.find('"browser_vision"')
v_idx = block.find('"vision"')
if bv_idx != -1 and v_idx != -1:
assert bv_idx < v_idx, (
"auxiliary.browser_vision.timeout should be checked before auxiliary.vision.timeout"
)

View File

@@ -1,115 +0,0 @@
"""Tests for browser_tool._get_vision_model() — Gemma 4 default (Issue #816).
Covers acceptance criteria from issue #816:
- Browser screenshots use Gemma 4 by default.
- BROWSER_VISION_MODEL env var overrides the model for browser vision only.
- AUXILIARY_VISION_MODEL env var still works as a global override.
- auxiliary.browser_vision.model in config.yaml overrides the default.
- Priority: BROWSER_VISION_MODEL > config.yaml > AUXILIARY_VISION_MODEL > default.
"""
import os
import sys
from unittest.mock import patch, MagicMock
import pytest
class TestGetVisionModelDefault:
def test_default_is_gemma4(self, monkeypatch):
monkeypatch.delenv("BROWSER_VISION_MODEL", raising=False)
monkeypatch.delenv("AUXILIARY_VISION_MODEL", raising=False)
import tools.browser_tool as bt
with patch("hermes_cli.config.load_config", return_value={}):
model = bt._get_vision_model()
assert model == "google/gemma-4-27b-it"
def test_default_constant(self):
import tools.browser_tool as bt
assert bt._BROWSER_VISION_DEFAULT_MODEL == "google/gemma-4-27b-it"
class TestGetVisionModelEnvOverrides:
def test_browser_vision_model_env_takes_priority(self, monkeypatch):
monkeypatch.setenv("BROWSER_VISION_MODEL", "openai/gpt-4o")
monkeypatch.setenv("AUXILIARY_VISION_MODEL", "google/gemini-3-flash-preview")
import tools.browser_tool as bt
assert bt._get_vision_model() == "openai/gpt-4o"
def test_auxiliary_vision_model_fallback(self, monkeypatch):
monkeypatch.delenv("BROWSER_VISION_MODEL", raising=False)
monkeypatch.setenv("AUXILIARY_VISION_MODEL", "google/gemini-3-flash-preview")
import tools.browser_tool as bt
with patch("hermes_cli.config.load_config", return_value={}):
assert bt._get_vision_model() == "google/gemini-3-flash-preview"
def test_browser_vision_model_empty_falls_through(self, monkeypatch):
"""Empty BROWSER_VISION_MODEL should fall through to next step."""
monkeypatch.setenv("BROWSER_VISION_MODEL", "")
monkeypatch.delenv("AUXILIARY_VISION_MODEL", raising=False)
import tools.browser_tool as bt
with patch("hermes_cli.config.load_config", return_value={}):
# Should reach the default
assert bt._get_vision_model() == "google/gemma-4-27b-it"
def test_auxiliary_vision_model_empty_falls_through(self, monkeypatch):
monkeypatch.delenv("BROWSER_VISION_MODEL", raising=False)
monkeypatch.setenv("AUXILIARY_VISION_MODEL", "")
import tools.browser_tool as bt
with patch("hermes_cli.config.load_config", return_value={}):
assert bt._get_vision_model() == "google/gemma-4-27b-it"
class TestGetVisionModelConfig:
def test_config_overrides_default(self, monkeypatch):
monkeypatch.delenv("BROWSER_VISION_MODEL", raising=False)
monkeypatch.delenv("AUXILIARY_VISION_MODEL", raising=False)
cfg = {"auxiliary": {"browser_vision": {"model": "anthropic/claude-3-5-haiku"}}}
with patch("hermes_cli.config.load_config", return_value=cfg):
import tools.browser_tool as bt
assert bt._get_vision_model() == "anthropic/claude-3-5-haiku"
def test_config_empty_string_falls_through_to_default(self, monkeypatch):
monkeypatch.delenv("BROWSER_VISION_MODEL", raising=False)
monkeypatch.delenv("AUXILIARY_VISION_MODEL", raising=False)
cfg = {"auxiliary": {"browser_vision": {"model": ""}}}
with patch("hermes_cli.config.load_config", return_value=cfg):
import tools.browser_tool as bt
assert bt._get_vision_model() == "google/gemma-4-27b-it"
def test_config_load_error_falls_through_to_default(self, monkeypatch):
monkeypatch.delenv("BROWSER_VISION_MODEL", raising=False)
monkeypatch.delenv("AUXILIARY_VISION_MODEL", raising=False)
with patch("hermes_cli.config.load_config", side_effect=Exception("config error")):
import tools.browser_tool as bt
assert bt._get_vision_model() == "google/gemma-4-27b-it"
def test_env_beats_config(self, monkeypatch):
monkeypatch.setenv("BROWSER_VISION_MODEL", "openai/gpt-4o")
cfg = {"auxiliary": {"browser_vision": {"model": "anthropic/claude-3-5-haiku"}}}
with patch("hermes_cli.config.load_config", return_value=cfg):
import tools.browser_tool as bt
assert bt._get_vision_model() == "openai/gpt-4o"
def test_config_beats_auxiliary_vision_model(self, monkeypatch):
"""Config should override AUXILIARY_VISION_MODEL when BROWSER_VISION_MODEL unset."""
monkeypatch.delenv("BROWSER_VISION_MODEL", raising=False)
monkeypatch.setenv("AUXILIARY_VISION_MODEL", "global-override")
cfg = {"auxiliary": {"browser_vision": {"model": "config-model"}}}
with patch("hermes_cli.config.load_config", return_value=cfg):
import tools.browser_tool as bt
assert bt._get_vision_model() == "config-model"
class TestBackwardCompatibility:
"""AUXILIARY_VISION_MODEL must still work for users who already have it configured."""
def test_existing_auxiliary_vision_model_not_broken(self, monkeypatch):
"""Users who set AUXILIARY_VISION_MODEL must not be broken by this change."""
monkeypatch.delenv("BROWSER_VISION_MODEL", raising=False)
monkeypatch.setenv("AUXILIARY_VISION_MODEL", "openai/gpt-4o")
import tools.browser_tool as bt
with patch("hermes_cli.config.load_config", return_value={}):
model = bt._get_vision_model()
assert model == "openai/gpt-4o"
assert model != "google/gemma-4-27b-it"

View File

@@ -294,3 +294,67 @@ class TestSearchHints:
class TestReadFileImageRouting:
"""Tests that image files are routed through vision analysis."""
@patch("tools.file_tools._analyze_image_with_vision")
def test_image_png_routes_to_vision(self, mock_analyze, tmp_path):
mock_analyze.return_value = json.dumps({"analysis": "test image"})
img = tmp_path / "test.png"
img.write_bytes(b"fake png data")
from tools.file_tools import read_file_tool
result = read_file_tool(str(img))
mock_analyze.assert_called_once()
assert json.loads(result)["analysis"] == "test image"
@patch("tools.file_tools._analyze_image_with_vision")
def test_image_jpeg_routes_to_vision(self, mock_analyze, tmp_path):
mock_analyze.return_value = json.dumps({"analysis": "test image"})
img = tmp_path / "test.jpeg"
img.write_bytes(b"fake jpeg data")
from tools.file_tools import read_file_tool
result = read_file_tool(str(img))
mock_analyze.assert_called_once()
assert json.loads(result)["analysis"] == "test image"
@patch("tools.file_tools._analyze_image_with_vision")
def test_image_webp_routes_to_vision(self, mock_analyze, tmp_path):
mock_analyze.return_value = json.dumps({"analysis": "test image"})
img = tmp_path / "test.webp"
img.write_bytes(b"fake webp data")
from tools.file_tools import read_file_tool
result = read_file_tool(str(img))
mock_analyze.assert_called_once()
assert json.loads(result)["analysis"] == "test image"
def test_non_image_binary_blocked(self, tmp_path):
from tools.file_tools import read_file_tool
exe = tmp_path / "test.exe"
exe.write_bytes(b"fake exe data")
result = json.loads(read_file_tool(str(exe)))
assert "error" in result
assert "Cannot read binary" in result["error"]
class TestAnalyzeImageWithVision:
"""Tests for the _analyze_image_with_vision helper."""
def test_import_error_fallback(self):
with patch.dict("sys.modules", {"tools.vision_tools": None}):
from tools.file_tools import _analyze_image_with_vision
result = json.loads(_analyze_image_with_vision("/tmp/test.png"))
assert "error" in result
assert "vision_analyze tool is not available" in result["error"]

View File

@@ -34,9 +34,22 @@ BINARY_EXTENSIONS = frozenset({
})
IMAGE_EXTENSIONS = frozenset({
".png", ".jpg", ".jpeg", ".gif", ".bmp", ".ico", ".webp", ".tiff", ".tif",
})
def has_binary_extension(path: str) -> bool:
"""Check if a file path has a binary extension. Pure string check, no I/O."""
dot = path.rfind(".")
if dot == -1:
return False
return path[dot:].lower() in BINARY_EXTENSIONS
def has_image_extension(path: str) -> bool:
"""Check if a file path has an image extension. Pure string check, no I/O."""
dot = path.rfind(".")
if dot == -1:
return False
return path[dot:].lower() in IMAGE_EXTENSIONS

View File

@@ -200,50 +200,9 @@ def _get_command_timeout() -> int:
return result
# Default vision model for browser screenshot analysis.
# Gemma 4 is natively multimodal so it can analyze screenshots using the same
# model already loaded for text tasks, reducing cold-start latency.
_BROWSER_VISION_DEFAULT_MODEL = "google/gemma-4-27b-it"
def _get_vision_model() -> str:
"""Model for browser_vision (screenshot analysis — multimodal).
Resolution order (first non-empty value wins):
1. ``BROWSER_VISION_MODEL`` env var — browser-specific override
2. ``auxiliary.browser_vision.model`` in config.yaml
3. ``AUXILIARY_VISION_MODEL`` env var — shared vision override
4. ``_BROWSER_VISION_DEFAULT_MODEL`` — Gemma 4 27B (default)
Set ``BROWSER_VISION_MODEL`` or ``auxiliary.browser_vision.model`` to an
empty string to force the auxiliary router's auto-detection (no default).
"""
# 1. Browser-specific env var
env_browser = os.getenv("BROWSER_VISION_MODEL", "").strip()
if env_browser:
return env_browser
# 2. Config file: auxiliary.browser_vision.model
try:
from hermes_cli.config import load_config
_cfg = load_config()
cfg_model = (
_cfg.get("auxiliary", {})
.get("browser_vision", {})
.get("model", "")
)
if cfg_model and str(cfg_model).strip():
return str(cfg_model).strip()
except Exception:
pass
# 3. Shared vision env var (backward-compat)
env_shared = os.getenv("AUXILIARY_VISION_MODEL", "").strip()
if env_shared:
return env_shared
# 4. Default: Gemma 4 27B
return _BROWSER_VISION_DEFAULT_MODEL
def _get_vision_model() -> Optional[str]:
"""Model for browser_vision (screenshot analysis — multimodal)."""
return os.getenv("AUXILIARY_VISION_MODEL", "").strip() or None
def _get_extraction_model() -> Optional[str]:
@@ -806,7 +765,7 @@ BROWSER_TOOL_SCHEMAS = [
},
{
"name": "browser_vision",
"description": "Take a screenshot of the current page and analyze it with vision AI (default: Gemma 4 multimodal). Use this when you need to visually understand what's on the page - especially useful for CAPTCHAs, visual verification challenges, complex layouts, or when the text snapshot doesn't capture important visual information. Returns both the AI analysis and a screenshot_path that you can share with the user by including MEDIA:<screenshot_path> in your response. Requires browser_navigate to be called first. Vision model can be overridden via BROWSER_VISION_MODEL env var or auxiliary.browser_vision.model in config.yaml.",
"description": "Take a screenshot of the current page and analyze it with vision AI. Use this when you need to visually understand what's on the page - especially useful for CAPTCHAs, visual verification challenges, complex layouts, or when the text snapshot doesn't capture important visual information. Returns both the AI analysis and a screenshot_path that you can share with the user by including MEDIA:<screenshot_path> in your response. Requires browser_navigate to be called first.",
"parameters": {
"type": "object",
"properties": {
@@ -1935,22 +1894,21 @@ def browser_vision(question: str, annotate: bool = False, task_id: Optional[str]
"""
Take a screenshot of the current page and analyze it with vision AI.
Uses Gemma 4 27B by default (natively multimodal — same model family as the
main text model, lower cold-start latency than switching to a separate vision
model). Override via ``BROWSER_VISION_MODEL`` env var or
``auxiliary.browser_vision.model`` in config.yaml.
Useful for understanding visual content that the text-based snapshot may not
capture (CAPTCHAs, verification challenges, images, complex layouts, etc.).
This tool captures what's visually displayed in the browser and sends it
to the configured vision model for analysis. When the active model is
natively multimodal (e.g. Gemma 4) it is used directly; otherwise the
auxiliary vision backend is used. Useful for understanding visual content
that the text-based snapshot may not capture (CAPTCHAs, verification
challenges, images, complex layouts, etc.).
The screenshot is saved persistently and its file path is returned alongside
the analysis, so it can be shared with users via MEDIA:<path> in the response.
Args:
question: What you want to know about the page visually
annotate: If True, overlay numbered [N] labels on interactive elements
task_id: Task identifier for session isolation
Returns:
JSON string with vision analysis results and screenshot_path
"""
@@ -2033,25 +1991,21 @@ def browser_vision(question: str, annotate: bool = False, task_id: Optional[str]
logger.debug("browser_vision: analysing screenshot (%d bytes)",
len(_screenshot_bytes))
# Read vision timeout from config (auxiliary.browser_vision.timeout, then
# auxiliary.vision.timeout), default 120s. Local vision models can take
# well over 30s for screenshot analysis, so the default must be generous.
# Read vision timeout from config (auxiliary.vision.timeout), default 120s.
# Local vision models (llama.cpp, ollama) can take well over 30s for
# screenshot analysis, so the default must be generous.
vision_timeout = 120.0
try:
from hermes_cli.config import load_config
_cfg = load_config()
_aux = _cfg.get("auxiliary", {}) if isinstance(_cfg, dict) else {}
_vt = (
(_aux.get("browser_vision") or {}).get("timeout")
or (_aux.get("vision") or {}).get("timeout")
)
_vt = _cfg.get("auxiliary", {}).get("vision", {}).get("timeout")
if _vt is not None:
vision_timeout = float(_vt)
except Exception:
pass
call_kwargs = {
"task": "browser_vision",
"task": "vision",
"messages": [
{
"role": "user",
@@ -2065,9 +2019,8 @@ def browser_vision(question: str, annotate: bool = False, task_id: Optional[str]
"temperature": 0.1,
"timeout": vision_timeout,
}
# _get_vision_model() always returns a non-empty string (Gemma 4 or override).
call_kwargs["model"] = vision_model
logger.debug("browser_vision: using model %s", vision_model)
if vision_model:
call_kwargs["model"] = vision_model
# Try full-size screenshot; on size-related rejection, downscale and retry.
try:
response = call_llm(**call_kwargs)

View File

@@ -7,7 +7,7 @@ import logging
import os
import threading
from pathlib import Path
from tools.binary_extensions import has_binary_extension
from tools.binary_extensions import has_binary_extension, has_image_extension
from tools.file_operations import ShellFileOperations
from agent.redact import redact_sensitive_text
@@ -279,6 +279,52 @@ def clear_file_ops_cache(task_id: str = None):
_file_ops_cache.clear()
def _analyze_image_with_vision(image_path: str, task_id: str = "default") -> str:
"""Route an image file through the vision analysis pipeline.
Uses vision_analyze_tool with a default descriptive prompt. Falls back
to a manual error when no vision backend is available.
"""
import asyncio
try:
from tools.vision_tools import vision_analyze_tool
except ImportError:
return json.dumps({
"error": (
f"Image file '{image_path}' detected but vision_analyze tool "
"is not available. Use vision_analyze directly if configured."
),
})
prompt = (
"Describe this image in detail. If it contains text, transcribe "
"the text. If it is a diagram, chart, or UI screenshot, describe "
"the layout, colors, labels, and any visible data."
)
try:
result = asyncio.run(vision_analyze_tool(image_url=image_path, question=prompt))
except Exception as exc:
return json.dumps({
"error": (
f"Image file '{image_path}' detected but vision analysis failed: {exc}. "
"Use vision_analyze directly if configured."
),
})
try:
parsed = json.loads(result)
except json.JSONDecodeError:
parsed = {"content": result}
# Wrap the vision result so the caller knows it came from image analysis
return json.dumps({
"image_path": image_path,
"analysis": parsed.get("content") or parsed.get("analysis") or result,
"source": "vision_analyze",
}, ensure_ascii=False)
def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str = "default") -> str:
"""Read a file with pagination and line numbers."""
try:
@@ -295,10 +341,13 @@ def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str =
_resolved = Path(path).expanduser().resolve()
# ── Binary file guard ─────────────────────────────────────────
# Block binary files by extension (no I/O).
# ── Binary / image file guard ─────────────────────────────────
# Block binary files by extension (no I/O). Images are routed
# through the vision analysis pipeline when a backend is available.
if has_binary_extension(str(_resolved)):
_ext = _resolved.suffix.lower()
if has_image_extension(str(_resolved)):
return _analyze_image_with_vision(str(_resolved), task_id=task_id)
return json.dumps({
"error": (
f"Cannot read binary file '{path}' ({_ext}). "
@@ -729,7 +778,7 @@ def _check_file_reqs():
READ_FILE_SCHEMA = {
"name": "read_file",
"description": "Read a text file with line numbers and pagination. Use this instead of cat/head/tail in terminal. Output format: 'LINE_NUM|CONTENT'. Suggests similar filenames if not found. Use offset and limit for large files. Reads exceeding ~100K characters are rejected; use offset and limit to read specific sections of large files. NOTE: Cannot read images or binary files — use vision_analyze for images.",
"description": "Read a text file with line numbers and pagination. Use this instead of cat/head/tail in terminal. Output format: 'LINE_NUM|CONTENT'. Suggests similar filenames if not found. Use offset and limit for large files. Reads exceeding ~100K characters are rejected; use offset and limit to read specific sections of large files. NOTE: Image files (PNG, JPEG, WebP, GIF, etc.) are automatically analyzed via vision_analyze. Other binary files cannot be read as text.",
"parameters": {
"type": "object",
"properties": {

View File

@@ -0,0 +1,106 @@
#!/usr/bin/env python3
"""
Local Inference Bridge — Fast-path for low-entropy LLM tasks.
Detects local Ollama/llama-cpp instances and uses them for 'Auxiliary' tasks
(summarization, extraction, simple verification) to reduce cloud dependency.
"""
import json
import logging
import os
import requests
from typing import Dict, List, Optional, Any
from tools.registry import registry, tool_error, tool_result
logger = logging.getLogger(__name__)
LOCAL_INFERENCE_SCHEMA = {
"name": "local_inference",
"description": "Execute a task using a local inference engine (Ollama/llama-cpp) if available. Ideal for simple summarization, text cleanup, or data extraction where cloud-grade intelligence is overkill.",
"parameters": {
"type": "object",
"properties": {
"prompt": {"type": "string", "description": "The task prompt."},
"system": {"type": "string", "description": "Optional system instruction."},
"engine": {"type": "string", "enum": ["auto", "ollama", "llama-cpp"], "default": "auto"}
},
"required": ["prompt"]
}
}
def detect_local_engine() -> Optional[Dict[str, str]]:
"""Detect presence of local inference engines."""
# 1. Check Ollama (default port 11434)
try:
res = requests.get("http://localhost:11434/api/tags", timeout=1)
if res.status_code == 200:
return {"type": "ollama", "url": "http://localhost:11434"}
except:
pass
# 2. Check llama-cpp-python (commonly on 8000 or 8080)
for port in [8000, 8080]:
try:
res = requests.get(f"http://localhost:{port}/v1/models", timeout=1)
if res.status_code == 200:
return {"type": "llama-cpp", "url": f"http://localhost:{port}"}
except:
pass
return None
def run_local_task(prompt: str, system: str = None, engine: str = "auto"):
"""Execute inference on a detected local engine."""
info = detect_local_engine()
if not info:
return tool_error("No local inference engine (Ollama or llama-cpp) detected on localhost.")
try:
if info["type"] == "ollama":
# Select first available model or default to gemma
models = requests.get(f"{info['url']}/api/tags").json().get("models", [])
model_name = models[0]["name"] if models else "gemma"
payload = {
"model": model_name,
"prompt": prompt,
"stream": False
}
if system: payload["system"] = system
res = requests.post(f"{info['url']}/api/generate", json=payload, timeout=60)
result = res.json().get("response", "")
return tool_result(engine="Ollama", model=model_name, response=result)
elif info["type"] == "llama-cpp":
payload = {
"model": "local-model",
"messages": [
{"role": "system", "content": system or "You are a helpful assistant."},
{"role": "user", "content": prompt}
]
}
res = requests.post(f"{info['url']}/v1/chat/completions", json=payload, timeout=60)
result = res.json()["choices"][0]["message"]["content"]
return tool_result(engine="llama-cpp", response=result)
except Exception as e:
return tool_error(f"Local inference failed: {str(e)}")
def _handle_local_inference(args, **kwargs):
return run_local_task(
prompt=args.get("prompt"),
system=args.get("system"),
engine=args.get("engine", "auto")
)
registry.register(
name="local_inference",
toolset="inference",
schema=LOCAL_INFERENCE_SCHEMA,
handler=_handle_local_inference,
emoji="🏠"
)

View File

@@ -0,0 +1,86 @@
#!/usr/bin/env python3
"""
Sovereign Scavenger — Autonomous Backlog Grooming.
Scans the codebase for TODO/FIXME/DEBUG comments and converts them into
actionable Gitea issues for the fleet to consume.
"""
import os
import re
import logging
from typing import List, Dict, Any
from tools.registry import registry, tool_error, tool_result
logger = logging.getLogger(__name__)
SCAVENGER_SCHEMA = {
"name": "sovereign_scavenger",
"description": "Scans the current directory for TODO, FIXME, or DEBUG comments. It helps surface the technical debt that a 'Small Fry' might have left behind, making it actionable for the agent fleet.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "Path to scan (defaults to current directory).", "default": "."},
"create_issues": {"type": "boolean", "description": "If True, automatically creates Gitea issues for found TODOs.", "default": False}
}
}
}
def find_todos(root_path: str):
"""Scan files for TODO patterns."""
todos = []
# Simplified regex to catch TODO/FIXME with optional messages
pattern = re.compile(r'#.*(TODO|FIXME|DEBUG|XXX)[:s]*(.*)', re.IGNORECASE)
for root, dirs, files in os.walk(root_path):
# Skip hidden and annoying dirs
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ['node_modules', 'dist', '__pycache__']]
for file in files:
if not file.endswith(('.py', '.ts', '.js', '.md', '.txt')):
continue
filepath = os.path.join(root, file)
try:
with open(filepath, 'r', encoding='utf-8') as f:
for i, line in enumerate(f, 1):
match = pattern.search(line)
if match:
todos.append({
"type": match.group(1).upper(),
"message": match.group(2).strip() or "No description provided.",
"file": filepath,
"line": i
})
except Exception as e:
logger.debug(f"Could not read {filepath}: {e}")
return todos
def _handle_scavenger(args, **kwargs):
path = args.get("path", ".")
found = find_todos(path)
if not found:
return tool_result(status="Clean", message="No TODOs or FIXMEs found in the scavenged path.")
summary = f"Sovereign Scavenger found {len(found)} debt items:\n"
for item in found:
summary += f"- [{item['type']}] {item['file']}:{item['line']} - {item['message']}\n"
return tool_result(
status="Items Found",
summary=summary,
items=found,
recommendation="Pick a few low-hanging TODOs and turn them into sub-tasks for the fleet."
)
registry.register(
name="sovereign_scavenger",
toolset="dispatch",
schema=SCAVENGER_SCHEMA,
handler=_handle_scavenger,
emoji="🧹"
)

109
tools/static_analyzer.py Normal file
View File

@@ -0,0 +1,109 @@
#!/usr/bin/env python3
"""
GOFAI Static Analyzer — Deterministic risk assessment for autonomous code.
Detects high-risk patterns like infinite loops, resource exhaustion,
and circular dependencies using AST analysis.
"""
import ast
import logging
import os
from typing import List, Dict, Any
from tools.registry import registry, tool_error, tool_result
logger = logging.getLogger(__name__)
STATIC_ANALYZE_SCHEMA = {
"name": "static_analyze",
"description": "Perform an advanced GOFAI static analysis of code. Detects infinite loops, potential memory leaks (unbounded collections), and circular dependency risks without using an LLM. Use this to ensure your code is 'Fleet-Safe'.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "Path to the file to analyze."}
},
"required": ["path"]
}
}
class RiskAnalyzer(ast.NodeVisitor):
def __init__(self):
self.risks = []
self.current_function = None
def visit_FunctionDef(self, node):
old_func = self.current_function
self.current_function = node.name
self.generic_visit(node)
self.current_function = old_func
def visit_While(self, node):
# Check for 'while True' or 'while 1'
if isinstance(node.test, ast.Constant) and node.test.value is True:
# Look for 'break' or 'return' inside the loop
has_exit = any(isinstance(child, (ast.Break, ast.Return)) for child in ast.walk(node))
if not has_exit:
self.risks.append({
"type": "Infinite Loop Risk",
"location": f"{self.current_function or 'module'} (line {node.lineno})",
"severity": "HIGH",
"message": "Potential infinite loop: 'while True' found without clear break/return path."
})
self.generic_visit(node)
def visit_For(self, node):
# Basic check for modifying the sequence being iterated (common error)
if isinstance(node.target, ast.Name):
for child in ast.walk(node.body):
if isinstance(child, ast.Call) and isinstance(child.func, ast.Attribute):
if child.func.attr in ['append', 'extend', 'pop', 'remove']:
if isinstance(child.func.value, ast.Name) and child.func.value.id == node.target.id:
self.risks.append({
"type": "Mutation Risk",
"location": f"{self.current_function or 'module'} (line {node.lineno})",
"severity": "MEDIUM",
"message": f"Loop modifies iterator variable '{node.target.id}'."
})
self.generic_visit(node)
def run_analysis(path: str):
"""Run the static analysis pipeline."""
try:
source = open(path, "r").read()
tree = ast.parse(source)
analyzer = RiskAnalyzer()
analyzer.visit(tree)
if not analyzer.risks:
return tool_result(
status="Verified Safe",
message="No high-risk GOFAI patterns detected. Code appears compliant with Fleet execution safety standards."
)
summary = "GOFAI RISK ASSESSMENT REPORT:\n"
for risk in analyzer.risks:
summary += f"- [{risk['severity']}] {risk['type']} in {risk['location']}: {risk['message']}\n"
return tool_result(
status="Risk Detected",
summary=summary,
risks=analyzer.risks,
recommendation="Address the identified risks before deploying this code to the fleet."
)
except Exception as e:
return tool_error(f"Static analysis failed: {str(e)}")
def _handle_static_analyze(args, **kwargs):
return run_analysis(args.get("path"))
registry.register(
name="static_analyze",
toolset="qa",
schema=STATIC_ANALYZE_SCHEMA,
handler=_handle_static_analyze,
emoji="🛡️"
)

167
tools/symbolic_verify.py Normal file
View File

@@ -0,0 +1,167 @@
#!/usr/bin/env python3
"""
Symbolic Verify (GOFAI) Tool
Leverages Python's Abstract Syntax Tree (AST) to perform deterministic
code audits without LLM inference. Detects 'LLM-isms' like undefined
variables, shadow variables, and scoping errors.
"""
import ast
import json
import logging
import os
from typing import Dict, List, Set, Any
from tools.registry import registry, tool_error, tool_result
logger = logging.getLogger(__name__)
SYMBOLIC_VERIFY_SCHEMA = {
"name": "symbolic_verify",
"description": "Perform a deterministic GOFAI audit of code using AST analysis. Identifies undefined variables, unused imports, and scoping issues without using an LLM. Use this to verify your changes are syntactically and semantically sound before submission.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "Path to the Python file to audit."},
"check_level": {
"type": "string",
"enum": ["syntax", "scope", "all"],
"default": "all",
"description": "Level of analysis to perform."
}
},
"required": ["path"]
}
}
class ScopeAnalyzer(ast.NodeVisitor):
def __init__(self):
self.defined_vars = set()
self.used_vars = set()
self.undefined_references = []
self.scopes = [{}] # Stack of symbol tables
self.builtins = set(dir(__builtins__))
def visit_Import(self, node):
for alias in node.names:
name = alias.asname or alias.name
self.scopes[-1][name] = "import"
self.generic_visit(node)
def visit_ImportFrom(self, node):
for alias in node.names:
name = alias.asname or alias.name
self.scopes[-1][name] = "import"
self.generic_visit(node)
def visit_Name(self, node):
if isinstance(node.ctx, ast.Store):
self.scopes[-1][node.id] = "defined"
elif isinstance(node.ctx, ast.Load):
# Check if defined in any scope level or builtins
is_defined = any(node.id in scope for scope in self.scopes) or node.id in self.builtins
if not is_defined:
# Store potential undefined
self.undefined_references.append({
"name": node.id,
"lineno": node.lineno,
"col": node.col_offset
})
self.generic_visit(node)
def visit_FunctionDef(self, node):
self.scopes[-1][node.name] = "function"
# New scope for arguments and body
new_scope = {}
for arg in node.args.args:
new_scope[arg.arg] = "parameter"
self.scopes.append(new_scope)
self.generic_visit(node)
self.scopes.pop()
def visit_ClassDef(self, node):
self.scopes[-1][node.name] = "class"
self.scopes.append({})
self.generic_visit(node)
self.scopes.pop()
def audit_file(path: str, check_level: str = "all"):
"""Audit a Python file for common semantic errors."""
if not path.endswith(".py"):
return tool_error("Symbolic verification only supports Python (.py) files.")
try:
if not os.path.exists(path):
return tool_error(f"File not found: {path}")
source = open(path, "r").read()
# 1. Syntax Check
try:
tree = ast.parse(source)
except SyntaxError as e:
return tool_result(
status="Critical Failure",
errors=[{
"type": "SyntaxError",
"message": e.msg,
"lineno": e.lineno,
"offset": e.offset
}],
recommendation="Fix the syntax error immediately. The file cannot be executed."
)
if check_level == "syntax":
return tool_result(status="Clean", message="Syntax is valid.")
# 2. Scope & Reference Search
analyzer = ScopeAnalyzer()
analyzer.visit(tree)
# Filter out common false positives (e.g. late imports or dynamic names)
# For a truly robust GOFAI we'd do more, but this is 'secret sauce' level
undefined = []
seen = set()
for ref in analyzer.undefined_references:
key = (ref["name"], ref["lineno"])
if key not in seen:
undefined.append(ref)
seen.add(key)
if not undefined:
return tool_result(
status="Healthy",
message="Deterministic check passed. No undefined variables detected in analyzed scopes.",
file_stats={
"chars": len(source),
"nodes": len(list(ast.walk(tree)))
}
)
report = "GOFAI AUDIT DETECTED SEMANTIC ISSUES:\n"
for u in undefined:
report += f"- Undefined Variable: '{u['name']}' at line {u['lineno']}\n"
return tool_result(
status="Warning",
summary=report,
undefined_variables=undefined,
recommendation="Review the undefined variables. Ensure they are imported or defined before use."
)
except Exception as e:
return tool_error(f"Symbolic audit failed: {str(e)}")
def _handle_symbolic_verify(args, **kwargs):
return audit_file(args.get("path"), args.get("check_level", "all"))
registry.register(
name="symbolic_verify",
toolset="qa",
schema=SYMBOLIC_VERIFY_SCHEMA,
handler=_handle_symbolic_verify,
emoji="🔬"
)