Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ab1b196160 |
2
agent/a2a/__init__.py
Normal file
2
agent/a2a/__init__.py
Normal file
@@ -0,0 +1,2 @@
|
||||
"""A2A (Agent-to-Agent) authentication and security."""
|
||||
from .mtls import FleetCA, AgentCert, verify_peer, generate_fleet_certs
|
||||
260
agent/a2a/mtls.py
Normal file
260
agent/a2a/mtls.py
Normal file
@@ -0,0 +1,260 @@
|
||||
"""
|
||||
mtls.py — Mutual TLS authentication for agent-to-agent communication.
|
||||
|
||||
Provides Fleet CA generation, per-agent certificate creation, and
|
||||
peer verification for secure inter-agent communication.
|
||||
|
||||
Usage:
|
||||
# Generate fleet CA + certs for all agents
|
||||
python3 -m agent.a2a.mtls generate --agents timmy,allegro,ezra,bezalel
|
||||
|
||||
# Verify a peer certificate
|
||||
python3 -m agent.a2a.mtls verify --cert /path/to/peer.pem --ca /path/to/ca.pem
|
||||
|
||||
# Check cert expiry
|
||||
python3 -m agent.a2a.mtls check --cert /path/to/cert.pem
|
||||
"""
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
import json
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Optional, Dict
|
||||
|
||||
CERTS_DIR = Path.home() / ".hermes" / "a2a" / "certs"
|
||||
CA_DIR = Path.home() / ".hermes" / "a2a" / "ca"
|
||||
|
||||
|
||||
@dataclass
|
||||
class CertInfo:
|
||||
"""Certificate information."""
|
||||
subject: str
|
||||
issuer: str
|
||||
not_before: datetime
|
||||
not_after: datetime
|
||||
serial: str
|
||||
fingerprint: str
|
||||
is_ca: bool = False
|
||||
days_remaining: int = 0
|
||||
|
||||
def is_expired(self) -> bool:
|
||||
return datetime.now() > self.not_after
|
||||
|
||||
def is_expiring_soon(self, days: int = 30) -> bool:
|
||||
return self.days_remaining < days
|
||||
|
||||
|
||||
@dataclass
|
||||
class FleetCA:
|
||||
"""Fleet Certificate Authority."""
|
||||
ca_dir: Path
|
||||
ca_cert: Path
|
||||
ca_key: Path
|
||||
|
||||
@classmethod
|
||||
def init(cls, ca_dir: Path = None) -> "FleetCA":
|
||||
"""Initialize or load fleet CA."""
|
||||
ca_dir = ca_dir or CA_DIR
|
||||
ca_dir.mkdir(parents=True, exist_ok=True)
|
||||
ca_cert = ca_dir / "ca.pem"
|
||||
ca_key = ca_dir / "ca-key.pem"
|
||||
|
||||
if not ca_cert.exists():
|
||||
cls._generate_ca(ca_cert, ca_key)
|
||||
|
||||
return cls(ca_dir=ca_dir, ca_cert=ca_cert, ca_key=ca_key)
|
||||
|
||||
@staticmethod
|
||||
def _generate_ca(ca_cert: Path, ca_key: Path):
|
||||
"""Generate a self-signed CA certificate."""
|
||||
# Generate CA key
|
||||
subprocess.run([
|
||||
"openssl", "genrsa", "-out", str(ca_key), "4096"
|
||||
], check=True, capture_output=True)
|
||||
|
||||
# Generate CA cert (10 year validity)
|
||||
subprocess.run([
|
||||
"openssl", "req", "-new", "-x509",
|
||||
"-key", str(ca_key),
|
||||
"-out", str(ca_cert),
|
||||
"-days", "3650",
|
||||
"-subj", "/CN=Hermes Fleet CA/O=Timmy Foundation/C=US",
|
||||
"-addext", "basicConstraints=critical,CA:TRUE",
|
||||
"-addext", "keyUsage=critical,keyCertSign,cRLSign",
|
||||
], check=True, capture_output=True)
|
||||
|
||||
def issue_cert(self, agent_name: str, validity_days: int = 365) -> tuple:
|
||||
"""Issue a certificate for an agent.
|
||||
|
||||
Returns (cert_path, key_path).
|
||||
"""
|
||||
cert_dir = CERTS_DIR / agent_name
|
||||
cert_dir.mkdir(parents=True, exist_ok=True)
|
||||
cert_path = cert_dir / "cert.pem"
|
||||
key_path = cert_dir / "key.pem"
|
||||
csr_path = cert_dir / "csr.pem"
|
||||
|
||||
# Generate key
|
||||
subprocess.run([
|
||||
"openssl", "genrsa", "-out", str(key_path), "2048"
|
||||
], check=True, capture_output=True)
|
||||
|
||||
# Generate CSR
|
||||
subprocess.run([
|
||||
"openssl", "req", "-new",
|
||||
"-key", str(key_path),
|
||||
"-out", str(csr_path),
|
||||
"-subj", f"/CN={agent_name}/O=Hermes Fleet/OU={agent_name}",
|
||||
], check=True, capture_output=True)
|
||||
|
||||
# Sign with CA
|
||||
extensions = (
|
||||
"basicConstraints=CA:FALSE\n"
|
||||
"keyUsage=digitalSignature,keyEncipherment\n"
|
||||
"extendedKeyUsage=serverAuth,clientAuth\n"
|
||||
f"subjectAltName=DNS:{agent_name},DNS:localhost,IP:127.0.0.1"
|
||||
)
|
||||
ext_file = cert_dir / "ext.cnf"
|
||||
ext_file.write_text(extensions)
|
||||
|
||||
subprocess.run([
|
||||
"openssl", "x509", "-req",
|
||||
"-in", str(csr_path),
|
||||
"-CA", str(self.ca_cert),
|
||||
"-CAkey", str(self.ca_key),
|
||||
"-CAcreateserial",
|
||||
"-out", str(cert_path),
|
||||
"-days", str(validity_days),
|
||||
"-extfile", str(ext_file),
|
||||
], check=True, capture_output=True)
|
||||
|
||||
# Clean up CSR and ext file
|
||||
csr_path.unlink(missing_ok=True)
|
||||
ext_file.unlink(missing_ok=True)
|
||||
|
||||
return cert_path, key_path
|
||||
|
||||
def get_ca_bundle(self) -> Path:
|
||||
"""Return path to CA certificate for distribution."""
|
||||
return self.ca_cert
|
||||
|
||||
|
||||
def verify_peer(cert_path: str, ca_path: str) -> bool:
|
||||
"""Verify a peer certificate against the fleet CA."""
|
||||
try:
|
||||
result = subprocess.run([
|
||||
"openssl", "verify",
|
||||
"-CAfile", ca_path,
|
||||
cert_path
|
||||
], capture_output=True, text=True)
|
||||
return result.returncode == 0 and "OK" in result.stdout
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def get_cert_info(cert_path: str) -> Optional[CertInfo]:
|
||||
"""Extract certificate information."""
|
||||
try:
|
||||
result = subprocess.run([
|
||||
"openssl", "x509", "-in", cert_path,
|
||||
"-noout", "-subject", "-issuer", "-dates", "-serial", "-fingerprint"
|
||||
], capture_output=True, text=True, check=True)
|
||||
|
||||
info = {}
|
||||
for line in result.stdout.strip().split("\n"):
|
||||
if "=" in line:
|
||||
key, _, val = line.partition("=")
|
||||
info[key.strip().lower().replace(" ", "_")] = val.strip()
|
||||
|
||||
not_before = datetime.strptime(info.get("not_before", ""), "%b %d %H:%M:%S %Y %Z")
|
||||
not_after = datetime.strptime(info.get("not_after", ""), "%b %d %H:%M:%S %Y %Z")
|
||||
days_remaining = (not_after - datetime.now()).days
|
||||
|
||||
return CertInfo(
|
||||
subject=info.get("subject", ""),
|
||||
issuer=info.get("issuer", ""),
|
||||
not_before=not_before,
|
||||
not_after=not_after,
|
||||
serial=info.get("serial", ""),
|
||||
fingerprint=info.get("sha1_fingerprint", info.get("sha256_fingerprint", "")),
|
||||
days_remaining=days_remaining,
|
||||
)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def generate_fleet_certs(agents: List[str], ca_dir: Path = None, validity_days: int = 365) -> Dict[str, tuple]:
|
||||
"""Generate certificates for all fleet agents.
|
||||
|
||||
Returns dict of agent_name -> (cert_path, key_path).
|
||||
"""
|
||||
ca = FleetCA.init(ca_dir)
|
||||
results = {}
|
||||
|
||||
for agent in agents:
|
||||
cert_path, key_path = ca.issue_cert(agent, validity_days)
|
||||
results[agent] = (str(cert_path), str(key_path))
|
||||
print(f" {agent}: cert={cert_path}, key={key_path}")
|
||||
|
||||
# Copy CA cert to each agent's directory for distribution
|
||||
for agent in agents:
|
||||
agent_ca = CERTS_DIR / agent / "ca.pem"
|
||||
if not agent_ca.exists():
|
||||
import shutil
|
||||
shutil.copy2(ca.ca_cert, agent_ca)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def main():
|
||||
"""CLI entry point."""
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="A2A mTLS certificate management")
|
||||
sub = parser.add_subparsers(dest="command")
|
||||
|
||||
# Generate
|
||||
gen = sub.add_parser("generate", help="Generate fleet certificates")
|
||||
gen.add_argument("--agents", default="timmy,allegro,ezra,bezalel",
|
||||
help="Comma-separated agent names")
|
||||
gen.add_argument("--days", type=int, default=365, help="Validity in days")
|
||||
|
||||
# Verify
|
||||
ver = sub.add_parser("verify", help="Verify a peer certificate")
|
||||
ver.add_argument("--cert", required=True)
|
||||
ver.add_argument("--ca", required=True)
|
||||
|
||||
# Check
|
||||
chk = sub.add_parser("check", help="Check certificate info")
|
||||
chk.add_argument("--cert", required=True)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.command == "generate":
|
||||
agents = [a.strip() for a in args.agents.split(",")]
|
||||
print(f"Generating certs for: {', '.join(agents)}")
|
||||
results = generate_fleet_certs(agents, validity_days=args.days)
|
||||
print(f"\nGenerated {len(results)} certificates")
|
||||
|
||||
elif args.command == "verify":
|
||||
ok = verify_peer(args.cert, args.ca)
|
||||
print(f"Verification: {'PASS' if ok else 'FAIL'}")
|
||||
|
||||
elif args.command == "check":
|
||||
info = get_cert_info(args.cert)
|
||||
if info:
|
||||
print(f"Subject: {info.subject}")
|
||||
print(f"Issuer: {info.issuer}")
|
||||
print(f"Valid: {info.not_before} to {info.not_after}")
|
||||
print(f"Days remaining: {info.days_remaining}")
|
||||
print(f"Expired: {info.is_expired()}")
|
||||
else:
|
||||
print("Could not read certificate")
|
||||
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,288 +0,0 @@
|
||||
"""Gemma 4 tool calling hardening — parse, validate, benchmark.
|
||||
|
||||
Gemma 4 has native multimodal function calling but its output format
|
||||
may differ from OpenAI/Claude. This module provides:
|
||||
|
||||
1. Gemma4ToolParser — robust parsing for Gemma 4's tool call format
|
||||
2. Parallel tool call detection and splitting
|
||||
3. Tool call success rate tracking and benchmarking
|
||||
4. Fallback parsing strategies for malformed output
|
||||
|
||||
Usage:
|
||||
from agent.gemma4_tool_hardening import Gemma4ToolParser
|
||||
parser = Gemma4ToolParser()
|
||||
tool_calls = parser.parse(response_text)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
|
||||
@dataclass
|
||||
class ToolCallAttempt:
|
||||
"""Record of a single tool call parsing attempt."""
|
||||
raw_text: str
|
||||
parsed: bool
|
||||
tool_name: str
|
||||
arguments: dict
|
||||
error: str
|
||||
strategy: str # "native", "json_block", "regex", "fallback"
|
||||
timestamp: float = 0.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class Gemma4BenchmarkResult:
|
||||
"""Result of a tool calling benchmark run."""
|
||||
total_calls: int = 0
|
||||
successful_parses: int = 0
|
||||
parallel_calls: int = 0
|
||||
strategies_used: Dict[str, int] = field(default_factory=dict)
|
||||
avg_parse_time_ms: float = 0.0
|
||||
success_rate: float = 0.0
|
||||
errors: List[str] = field(default_factory=list)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"total_calls": self.total_calls,
|
||||
"successful_parses": self.successful_parses,
|
||||
"parallel_calls": self.parallel_calls,
|
||||
"success_rate": round(self.success_rate, 3),
|
||||
"strategies_used": self.strategies_used,
|
||||
"avg_parse_time_ms": round(self.avg_parse_time_ms, 2),
|
||||
"error_count": len(self.errors),
|
||||
"errors": self.errors[:10],
|
||||
}
|
||||
|
||||
|
||||
class Gemma4ToolParser:
|
||||
"""Robust tool call parser for Gemma 4 output format.
|
||||
|
||||
Tries multiple parsing strategies in order:
|
||||
1. Native OpenAI format (standard tool_calls)
|
||||
2. JSON code blocks (```json ... ```)
|
||||
3. Regex extraction (function_name + arguments patterns)
|
||||
4. Heuristic fallback (best-effort extraction)
|
||||
"""
|
||||
|
||||
# Patterns for Gemma 4 tool call formats
|
||||
_JSON_BLOCK_PATTERN = re.compile(
|
||||
r'```(?:json)?\s*\n?(.*?)\n?```',
|
||||
re.DOTALL | re.IGNORECASE,
|
||||
)
|
||||
_FUNCTION_CALL_PATTERN = re.compile(
|
||||
r'(?:function|tool|call)[:\s]*(\w+)\s*\(\s*({.*?})\s*\)',
|
||||
re.DOTALL | re.IGNORECASE,
|
||||
)
|
||||
_GEMMA_INLINE_PATTERN = re.compile(
|
||||
r'\[(?:tool_call|function_call)\]\s*(\w+)\s*:\s*({.*?})',
|
||||
re.DOTALL | re.IGNORECASE,
|
||||
)
|
||||
|
||||
def __init__(self):
|
||||
self._attempts: List[ToolCallAttempt] = []
|
||||
self._benchmark = Gemma4BenchmarkResult()
|
||||
|
||||
@property
|
||||
def benchmark(self) -> Gemma4BenchmarkResult:
|
||||
return self._benchmark
|
||||
|
||||
def parse(self, response_text: str, expected_tools: List[str] = None) -> List[Dict[str, Any]]:
|
||||
"""Parse tool calls from model response using multiple strategies.
|
||||
|
||||
Returns list of tool call dicts in OpenAI format:
|
||||
[{"id": "...", "type": "function", "function": {"name": "...", "arguments": "..."}}]
|
||||
"""
|
||||
t0 = time.monotonic()
|
||||
self._benchmark.total_calls += 1
|
||||
|
||||
# Strategy 1: Native OpenAI format
|
||||
result = self._try_native_parse(response_text)
|
||||
if result:
|
||||
self._record_attempt(response_text, True, result, "native")
|
||||
self._benchmark.successful_parses += 1
|
||||
if len(result) > 1:
|
||||
self._benchmark.parallel_calls += 1
|
||||
self._benchmark.strategies_used["native"] = self._benchmark.strategies_used.get("native", 0) + 1
|
||||
self._update_timing(t0)
|
||||
return result
|
||||
|
||||
# Strategy 2: JSON code blocks
|
||||
result = self._try_json_block_parse(response_text, expected_tools)
|
||||
if result:
|
||||
self._record_attempt(response_text, True, result, "json_block")
|
||||
self._benchmark.successful_parses += 1
|
||||
if len(result) > 1:
|
||||
self._benchmark.parallel_calls += 1
|
||||
self._benchmark.strategies_used["json_block"] = self._benchmark.strategies_used.get("json_block", 0) + 1
|
||||
self._update_timing(t0)
|
||||
return result
|
||||
|
||||
# Strategy 3: Regex extraction
|
||||
result = self._try_regex_parse(response_text)
|
||||
if result:
|
||||
self._record_attempt(response_text, True, result, "regex")
|
||||
self._benchmark.successful_parses += 1
|
||||
self._benchmark.strategies_used["regex"] = self._benchmark.strategies_used.get("regex", 0) + 1
|
||||
self._update_timing(t0)
|
||||
return result
|
||||
|
||||
# Strategy 4: Heuristic fallback
|
||||
result = self._try_heuristic_parse(response_text, expected_tools)
|
||||
if result:
|
||||
self._record_attempt(response_text, True, result, "fallback")
|
||||
self._benchmark.successful_parses += 1
|
||||
self._benchmark.strategies_used["fallback"] = self._benchmark.strategies_used.get("fallback", 0) + 1
|
||||
self._update_timing(t0)
|
||||
return result
|
||||
|
||||
# All strategies failed
|
||||
self._record_attempt(response_text, False, [], "none")
|
||||
self._benchmark.errors.append(f"Failed to parse: {response_text[:200]}")
|
||||
self._update_timing(t0)
|
||||
return []
|
||||
|
||||
def _try_native_parse(self, text: str) -> List[Dict[str, Any]]:
|
||||
"""Try parsing standard OpenAI tool_calls JSON."""
|
||||
try:
|
||||
data = json.loads(text)
|
||||
if isinstance(data, dict) and "tool_calls" in data:
|
||||
return data["tool_calls"]
|
||||
if isinstance(data, list):
|
||||
if all(isinstance(item, dict) and "function" in item for item in data):
|
||||
return data
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
return []
|
||||
|
||||
def _try_json_block_parse(self, text: str, expected_tools: List[str] = None) -> List[Dict[str, Any]]:
|
||||
"""Extract tool calls from JSON code blocks."""
|
||||
matches = self._JSON_BLOCK_PATTERN.findall(text)
|
||||
calls = []
|
||||
for match in matches:
|
||||
try:
|
||||
data = json.loads(match.strip())
|
||||
if isinstance(data, dict):
|
||||
if "name" in data and "arguments" in data:
|
||||
calls.append(self._to_openai_format(data["name"], data["arguments"]))
|
||||
elif "function" in data and "arguments" in data:
|
||||
calls.append(self._to_openai_format(data["function"], data["arguments"]))
|
||||
elif isinstance(data, list):
|
||||
for item in data:
|
||||
if isinstance(item, dict) and "name" in item:
|
||||
args = item.get("arguments", item.get("args", {}))
|
||||
calls.append(self._to_openai_format(item["name"], args))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
return calls
|
||||
|
||||
def _try_regex_parse(self, text: str) -> List[Dict[str, Any]]:
|
||||
"""Extract tool calls using regex patterns."""
|
||||
calls = []
|
||||
|
||||
# Pattern: function_name({...})
|
||||
for match in self._FUNCTION_CALL_PATTERN.finditer(text):
|
||||
name = match.group(1)
|
||||
args_str = match.group(2)
|
||||
try:
|
||||
args = json.loads(args_str)
|
||||
calls.append(self._to_openai_format(name, args))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
# Pattern: [tool_call] name: {...}
|
||||
for match in self._GEMMA_INLINE_PATTERN.finditer(text):
|
||||
name = match.group(1)
|
||||
args_str = match.group(2)
|
||||
try:
|
||||
args = json.loads(args_str)
|
||||
calls.append(self._to_openai_format(name, args))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
return calls
|
||||
|
||||
def _try_heuristic_parse(self, text: str, expected_tools: List[str] = None) -> List[Dict[str, Any]]:
|
||||
"""Best-effort heuristic extraction."""
|
||||
if not expected_tools:
|
||||
return []
|
||||
|
||||
calls = []
|
||||
for tool_name in expected_tools:
|
||||
# Look for tool name near JSON-like content
|
||||
pattern = re.compile(
|
||||
rf'{re.escape(tool_name)}\s*[\(:]\s*({{[^}}]+}})',
|
||||
re.IGNORECASE,
|
||||
)
|
||||
match = pattern.search(text)
|
||||
if match:
|
||||
try:
|
||||
args = json.loads(match.group(1))
|
||||
calls.append(self._to_openai_format(tool_name, args))
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
return calls
|
||||
|
||||
def _to_openai_format(self, name: str, arguments: Any) -> Dict[str, Any]:
|
||||
"""Convert to OpenAI tool call format."""
|
||||
import uuid
|
||||
args_str = json.dumps(arguments) if isinstance(arguments, dict) else str(arguments)
|
||||
return {
|
||||
"id": f"call_{uuid.uuid4().hex[:24]}",
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": name,
|
||||
"arguments": args_str,
|
||||
},
|
||||
}
|
||||
|
||||
def _record_attempt(self, text: str, success: bool, result: list, strategy: str):
|
||||
self._attempts.append(ToolCallAttempt(
|
||||
raw_text=text[:500],
|
||||
parsed=success,
|
||||
tool_name=result[0]["function"]["name"] if result else "",
|
||||
arguments={},
|
||||
error="" if success else "parse failed",
|
||||
strategy=strategy,
|
||||
timestamp=time.time(),
|
||||
))
|
||||
|
||||
def _update_timing(self, t0: float):
|
||||
elapsed = (time.monotonic() - t0) * 1000
|
||||
n = self._benchmark.total_calls
|
||||
self._benchmark.avg_parse_time_ms = (
|
||||
(self._benchmark.avg_parse_time_ms * (n - 1) + elapsed) / n
|
||||
)
|
||||
self._benchmark.success_rate = (
|
||||
self._benchmark.successful_parses / n if n > 0 else 0
|
||||
)
|
||||
|
||||
def format_report(self) -> str:
|
||||
"""Format benchmark report."""
|
||||
b = self._benchmark
|
||||
lines = [
|
||||
"Gemma 4 Tool Calling Benchmark",
|
||||
"=" * 40,
|
||||
f"Total attempts: {b.total_calls}",
|
||||
f"Successful parses: {b.successful_parses}",
|
||||
f"Success rate: {b.success_rate:.1%}",
|
||||
f"Parallel calls: {b.parallel_calls}",
|
||||
f"Avg parse time: {b.avg_parse_time_ms:.2f}ms",
|
||||
"",
|
||||
"Strategies used:",
|
||||
]
|
||||
for strategy, count in sorted(b.strategies_used.items(), key=lambda x: -x[1]):
|
||||
lines.append(f" {strategy}: {count}")
|
||||
|
||||
if b.errors:
|
||||
lines.append("")
|
||||
lines.append(f"Errors ({len(b.errors)}):")
|
||||
for err in b.errors[:5]:
|
||||
lines.append(f" {err[:100]}")
|
||||
|
||||
return "\n".join(lines)
|
||||
5
ansible/roles/a2a-certs/defaults/main.yml
Normal file
5
ansible/roles/a2a-certs/defaults/main.yml
Normal file
@@ -0,0 +1,5 @@
|
||||
---
|
||||
a2a_certs_dir: "~/.hermes/a2a/certs"
|
||||
a2a_ca_cert_local: "files/ca.pem"
|
||||
a2a_cert_local: "files/cert.pem"
|
||||
a2a_key_local: "files/key.pem"
|
||||
29
ansible/roles/a2a-certs/tasks/main.yml
Normal file
29
ansible/roles/a2a-certs/tasks/main.yml
Normal file
@@ -0,0 +1,29 @@
|
||||
---
|
||||
# Distribute A2A mTLS certificates to fleet nodes
|
||||
- name: Ensure certs directory exists
|
||||
file:
|
||||
path: "{{ a2a_certs_dir }}"
|
||||
state: directory
|
||||
mode: '0700'
|
||||
|
||||
- name: Copy CA certificate
|
||||
copy:
|
||||
src: "{{ a2a_ca_cert_local }}"
|
||||
dest: "{{ a2a_certs_dir }}/ca.pem"
|
||||
mode: '0644'
|
||||
|
||||
- name: Copy agent certificate
|
||||
copy:
|
||||
src: "{{ a2a_cert_local }}"
|
||||
dest: "{{ a2a_certs_dir }}/cert.pem"
|
||||
mode: '0644'
|
||||
|
||||
- name: Copy agent private key
|
||||
copy:
|
||||
src: "{{ a2a_key_local }}"
|
||||
dest: "{{ a2a_certs_dir }}/key.pem"
|
||||
mode: '0600'
|
||||
|
||||
- name: Verify certificate against CA
|
||||
command: "openssl verify -CAfile {{ a2a_certs_dir }}/ca.pem {{ a2a_certs_dir }}/cert.pem"
|
||||
changed_when: false
|
||||
92
tests/agent/a2a/test_mtls.py
Normal file
92
tests/agent/a2a/test_mtls.py
Normal file
@@ -0,0 +1,92 @@
|
||||
"""Tests for A2A mutual TLS authentication."""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from agent.a2a.mtls import (
|
||||
FleetCA,
|
||||
verify_peer,
|
||||
get_cert_info,
|
||||
generate_fleet_certs,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_ca():
|
||||
"""Create a temporary CA for testing."""
|
||||
tmp = tempfile.mkdtemp()
|
||||
ca_dir = Path(tmp) / "ca"
|
||||
ca = FleetCA.init(ca_dir)
|
||||
yield ca
|
||||
shutil.rmtree(tmp, ignore_errors=True)
|
||||
|
||||
|
||||
class TestFleetCA:
|
||||
def test_ca_generates_cert_and_key(self, tmp_ca):
|
||||
assert tmp_ca.ca_cert.exists()
|
||||
assert tmp_ca.ca_key.exists()
|
||||
|
||||
def test_ca_cert_is_ca(self, tmp_ca):
|
||||
info = get_cert_info(str(tmp_ca.ca_cert))
|
||||
assert info is not None
|
||||
assert "CA" in info.subject or "Hermes" in info.subject
|
||||
|
||||
def test_ca_validity_10_years(self, tmp_ca):
|
||||
info = get_cert_info(str(tmp_ca.ca_cert))
|
||||
assert info is not None
|
||||
assert info.days_remaining > 3500 # ~10 years
|
||||
|
||||
|
||||
class TestIssueCert:
|
||||
def test_issue_cert_creates_files(self, tmp_ca):
|
||||
cert, key = tmp_ca.issue_cert("test-agent")
|
||||
assert cert.exists()
|
||||
assert key.exists()
|
||||
|
||||
def test_cert_verifies_against_ca(self, tmp_ca):
|
||||
cert, _ = tmp_ca.issue_cert("test-agent")
|
||||
assert verify_peer(str(cert), str(tmp_ca.ca_cert))
|
||||
|
||||
def test_cert_has_agent_name(self, tmp_ca):
|
||||
cert, _ = tmp_ca.issue_cert("allegro")
|
||||
info = get_cert_info(str(cert))
|
||||
assert info is not None
|
||||
assert "allegro" in info.subject.lower()
|
||||
|
||||
def test_cert_validity_1_year(self, tmp_ca):
|
||||
cert, _ = tmp_ca.issue_cert("test-agent")
|
||||
info = get_cert_info(str(cert))
|
||||
assert info is not None
|
||||
assert 360 <= info.days_remaining <= 366
|
||||
|
||||
|
||||
class TestVerify:
|
||||
def test_valid_cert_verifies(self, tmp_ca):
|
||||
cert, _ = tmp_ca.issue_cert("test-agent")
|
||||
assert verify_peer(str(cert), str(tmp_ca.ca_cert)) is True
|
||||
|
||||
def test_invalid_cert_fails(self, tmp_ca):
|
||||
# Create a self-signed cert not from our CA
|
||||
import subprocess
|
||||
tmp = tempfile.mktemp(suffix=".pem")
|
||||
subprocess.run(["openssl", "req", "-x509", "-newkey", "rsa:2048",
|
||||
"-keyout", "/dev/null", "-out", tmp, "-days", "1",
|
||||
"-subj", "/CN=imposter", "-nodes"],
|
||||
capture_output=True)
|
||||
assert verify_peer(tmp, str(tmp_ca.ca_cert)) is False
|
||||
os.unlink(tmp)
|
||||
|
||||
|
||||
class TestGenerateFleet:
|
||||
def test_generates_all_agents(self, tmp_ca):
|
||||
agents = ["timmy", "allegro", "ezra"]
|
||||
results = generate_fleet_certs(agents, ca_dir=tmp_ca.ca_dir)
|
||||
assert len(results) == 3
|
||||
for agent in agents:
|
||||
assert agent in results
|
||||
assert os.path.exists(results[agent][0])
|
||||
assert os.path.exists(results[agent][1])
|
||||
@@ -1,94 +0,0 @@
|
||||
"""Tests for Gemma 4 tool calling hardening."""
|
||||
|
||||
import json
|
||||
import pytest
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
from agent.gemma4_tool_hardening import Gemma4ToolParser, Gemma4BenchmarkResult
|
||||
|
||||
|
||||
class TestNativeParse:
|
||||
def test_standard_tool_calls(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = json.dumps({"tool_calls": [{"id": "call_1", "type": "function", "function": {"name": "read_file", "arguments": '{"path": "test.py"}'}}]})
|
||||
result = parser.parse(text)
|
||||
assert len(result) == 1
|
||||
assert result[0]["function"]["name"] == "read_file"
|
||||
|
||||
def test_list_format(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = json.dumps([{"id": "c1", "type": "function", "function": {"name": "terminal", "arguments": '{"command": "ls"}'}}])
|
||||
result = parser.parse(text)
|
||||
assert len(result) == 1
|
||||
|
||||
|
||||
class TestJsonBlockParse:
|
||||
def test_json_code_block(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = 'Here is the tool call:\n```json\n{"name": "read_file", "arguments": {"path": "test.py"}}\n```'
|
||||
result = parser.parse(text)
|
||||
assert len(result) == 1
|
||||
assert result[0]["function"]["name"] == "read_file"
|
||||
|
||||
def test_multiple_json_blocks(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = '```json\n{"name": "read_file", "arguments": {"path": "a.py"}}\n```\n```json\n{"name": "read_file", "arguments": {"path": "b.py"}}\n```'
|
||||
result = parser.parse(text)
|
||||
assert len(result) == 2
|
||||
|
||||
def test_list_in_json_block(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = '```json\n[{"name": "terminal", "arguments": {"command": "ls"}}]\n```'
|
||||
result = parser.parse(text)
|
||||
assert len(result) == 1
|
||||
|
||||
|
||||
class TestRegexParse:
|
||||
def test_function_call_pattern(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = 'I will call read_file({"path": "test.py"}) now.'
|
||||
result = parser.parse(text)
|
||||
assert len(result) == 1
|
||||
assert result[0]["function"]["name"] == "read_file"
|
||||
|
||||
def test_gemma_inline_pattern(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = '[tool_call] terminal: {"command": "pwd"}'
|
||||
result = parser.parse(text)
|
||||
assert len(result) == 1
|
||||
|
||||
|
||||
class TestHeuristicParse:
|
||||
def test_heuristic_with_expected_tools(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = 'Calling read_file({"path": "config.yaml"}) now'
|
||||
result = parser.parse(text, expected_tools=["read_file"])
|
||||
assert len(result) == 1
|
||||
|
||||
def test_heuristic_without_expected_tools(self):
|
||||
parser = Gemma4ToolParser()
|
||||
text = 'Some text with {"key": "value"} but no tool name'
|
||||
result = parser.parse(text)
|
||||
assert len(result) == 0
|
||||
|
||||
|
||||
class TestBenchmark:
|
||||
def test_benchmark_counts(self):
|
||||
parser = Gemma4ToolParser()
|
||||
parser.parse(json.dumps({"tool_calls": [{"id": "1", "type": "function", "function": {"name": "x", "arguments": "{}"}}]}))
|
||||
parser.parse('```json\n{"name": "y", "arguments": {}}\n```')
|
||||
parser.parse('no tool call here')
|
||||
b = parser.benchmark
|
||||
assert b.total_calls == 3
|
||||
assert b.successful_parses == 2
|
||||
assert abs(b.success_rate - 2/3) < 0.01
|
||||
|
||||
def test_report_format(self):
|
||||
parser = Gemma4ToolParser()
|
||||
parser.parse(json.dumps({"tool_calls": [{"id": "1", "type": "function", "function": {"name": "x", "arguments": "{}"}}]}))
|
||||
report = parser.format_report()
|
||||
assert "Gemma 4 Tool Calling Benchmark" in report
|
||||
assert "native" in report
|
||||
Reference in New Issue
Block a user