Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
985488bcbe feat: add A2A task delegation over mTLS (#804)
All checks were successful
Lint / lint (pull_request) Successful in 11s
2026-04-22 11:14:26 -04:00
Alexander Whitestone
524868d4f4 test: add red coverage for A2A task delegation (#804) 2026-04-22 11:09:18 -04:00
9 changed files with 607 additions and 620 deletions

View File

@@ -29,6 +29,8 @@ import logging
import os
import ssl
import threading
import time
import uuid
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from typing import Any, Callable, Dict, Optional
@@ -441,3 +443,244 @@ class A2AMTLSClient:
def post(self, url: str, json: Optional[Dict[str, Any]] = None, **kwargs: Any) -> Dict[str, Any]:
data = (__import__("json").dumps(json).encode() if json is not None else None)
return self._request("POST", url, data=data, **kwargs)
# ---------------------------------------------------------------------------
# Structured A2A task delegation over mTLS
# ---------------------------------------------------------------------------
_TERMINAL_TASK_STATES = {"completed", "failed", "canceled", "rejected"}
def _iso_now() -> str:
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
def _task_status(state: str, message: str) -> Dict[str, Any]:
return {
"state": state,
"message": message,
"timestamp": _iso_now(),
}
def _coerce_artifact(result: Any) -> Dict[str, Any]:
if isinstance(result, dict):
if "text" in result:
return result
if "artifact" in result and isinstance(result["artifact"], dict):
return result["artifact"]
return {"text": str(result)}
def _build_task_record(task_id: str, task: str, requester: Optional[str], metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
return {
"taskId": task_id,
"task": task,
"requester": requester,
"metadata": metadata or {},
"artifacts": [],
"status": _task_status("submitted", "Task submitted"),
}
def _default_agent_card(host: str, port: int) -> Dict[str, Any]:
base_url = f"https://{host}:{port}"
try:
from agent.agent_card import build_agent_card
from dataclasses import asdict
card = asdict(build_agent_card())
except Exception as exc: # pragma: no cover - fallback only exercised when card build breaks
logger.warning("Falling back to minimal agent card: %s", exc)
card = {
"name": os.environ.get("HERMES_AGENT_NAME", "hermes"),
"description": "Hermes A2A task server",
"version": "unknown",
}
card["url"] = base_url
card["a2aTaskEndpoint"] = f"{base_url}/a2a/rpc"
return card
def _default_local_hermes_executor(task_payload: Dict[str, Any]) -> Dict[str, Any]:
task_text = str(task_payload.get("task", "")).strip()
if not task_text:
return {"text": ""}
from run_agent import AIAgent
agent = AIAgent(quiet_mode=True)
result = agent.chat(task_text)
return {
"text": result,
"metadata": {"executor": "local-hermes"},
}
class A2ATaskServer:
"""JSON-RPC A2A task server running over the routing mTLS server."""
def __init__(
self,
cert: str | Path,
key: str | Path,
ca: str | Path,
host: str = "127.0.0.1",
port: int = 9443,
executor: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None,
card_factory: Optional[Callable[[], Dict[str, Any]]] = None,
) -> None:
self.host = host
self.port = port
self._server = A2AMTLSServer(cert=cert, key=key, ca=ca, host=host, port=port)
self._executor = executor or _default_local_hermes_executor
self._card_factory = card_factory or (lambda: _default_agent_card(self.host, self.port))
self._tasks: Dict[str, Dict[str, Any]] = {}
self._lock = threading.Lock()
self._server.add_route("/.well-known/agent-card.json", self._handle_agent_card)
self._server.add_route("/agent-card.json", self._handle_agent_card)
self._server.add_route("/a2a/rpc", self._handle_rpc)
def __enter__(self) -> "A2ATaskServer":
self.start()
return self
def __exit__(self, *_: Any) -> None:
self.stop()
def start(self) -> None:
self._server.start()
def stop(self) -> None:
self._server.stop()
def _handle_agent_card(self, payload: Dict[str, Any], *, peer_cn: str | None = None) -> Dict[str, Any]:
return self._card_factory()
def _handle_rpc(self, payload: Dict[str, Any], *, peer_cn: str | None = None) -> Dict[str, Any]:
req_id = payload.get("id")
if payload.get("jsonrpc") != "2.0":
return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32600, "message": "invalid jsonrpc version"}}
method = payload.get("method")
params = payload.get("params") or {}
try:
if method == "tasks/send":
result = self._rpc_send_task(params, peer_cn=peer_cn)
elif method == "tasks/get":
result = self._rpc_get_task(params)
else:
return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": f"unknown method: {method}"}}
except Exception as exc:
logger.exception("A2A task RPC failed: %s", exc)
return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32000, "message": str(exc)}}
return {"jsonrpc": "2.0", "id": req_id, "result": result}
def _rpc_send_task(self, params: Dict[str, Any], *, peer_cn: str | None = None) -> Dict[str, Any]:
task_text = str(params.get("task", "")).strip()
if not task_text:
raise ValueError("task is required")
task_id = params.get("taskId") or uuid.uuid4().hex
requester = params.get("requester") or peer_cn
metadata = dict(params.get("metadata") or {})
if peer_cn:
metadata.setdefault("peer_cn", peer_cn)
record = _build_task_record(task_id, task_text, requester, metadata)
with self._lock:
self._tasks[task_id] = record
worker = threading.Thread(target=self._run_task, args=(task_id,), daemon=True, name=f"a2a-task-{task_id[:8]}")
worker.start()
return self._copy_task(task_id)
def _rpc_get_task(self, params: Dict[str, Any]) -> Dict[str, Any]:
task_id = str(params.get("taskId", "")).strip()
if not task_id:
raise ValueError("taskId is required")
return self._copy_task(task_id)
def _copy_task(self, task_id: str) -> Dict[str, Any]:
with self._lock:
if task_id not in self._tasks:
raise KeyError(f"unknown taskId: {task_id}")
return json.loads(json.dumps(self._tasks[task_id]))
def _run_task(self, task_id: str) -> None:
with self._lock:
task = self._tasks[task_id]
task["status"] = _task_status("working", "Task is running")
task_payload = {
"taskId": task["taskId"],
"task": task["task"],
"requester": task.get("requester"),
"metadata": dict(task.get("metadata") or {}),
}
try:
result = self._executor(task_payload)
artifact = _coerce_artifact(result)
with self._lock:
task = self._tasks[task_id]
task["artifacts"] = [artifact]
task["status"] = _task_status("completed", "Task completed")
except Exception as exc:
with self._lock:
task = self._tasks[task_id]
task["status"] = _task_status("failed", f"Task failed: {exc}")
class A2ATaskClient(A2AMTLSClient):
"""Client helper for A2A JSON-RPC task send/get flows."""
def discover_card(self, base_url: str) -> Dict[str, Any]:
return self.get(f"{base_url.rstrip('/')}/.well-known/agent-card.json")
def _rpc_call(self, base_url: str, method: str, params: Dict[str, Any]) -> Dict[str, Any]:
payload = {
"jsonrpc": "2.0",
"id": uuid.uuid4().hex,
"method": method,
"params": params,
}
response = self.post(f"{base_url.rstrip('/')}/a2a/rpc", json=payload)
if "error" in response:
error = response["error"]
raise RuntimeError(error.get("message") or str(error))
return response.get("result", {})
def send_task(
self,
base_url: str,
*,
task: str,
requester: str | None = None,
metadata: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
return self._rpc_call(
base_url,
"tasks/send",
{
"task": task,
"requester": requester,
"metadata": metadata or {},
},
)
def get_task(self, base_url: str, task_id: str) -> Dict[str, Any]:
return self._rpc_call(base_url, "tasks/get", {"taskId": task_id})
def wait_for_task(
self,
base_url: str,
task_id: str,
*,
timeout: float = 30.0,
poll_interval: float = 0.5,
) -> Dict[str, Any]:
deadline = time.monotonic() + timeout
while True:
task = self.get_task(base_url, task_id)
state = str(((task.get("status") or {}).get("state") or "")).lower()
if state in _TERMINAL_TASK_STATES:
return task
if time.monotonic() >= deadline:
raise TimeoutError(f"Timed out waiting for task {task_id}")
time.sleep(poll_interval)

View File

@@ -1,66 +0,0 @@
# Morning Review Packet Status — #949
Generated: 2026-04-22T14:57:44.332419+00:00
Epic: [EPIC: Morning review packet — Hermes harness features landed 2026-04-21](https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/949)
## Summary
- Child QA issues tracked: 13
- Open child issues: 11
- Closed child issues: 2
- Open child issues already backed by PRs: 7
- Open child issues still unowned on forge: 4
## Child QA Matrix
| Issue | State | Open PRs | Title |
|------:|-------|----------|-------|
| #950 | open | — | [QA] Verify AI Gateway provider UX + attribution headers |
| #951 | open | — | [QA] Verify transport abstraction + AnthropicTransport wiring |
| #952 | open | — | [QA] Verify CLI voice beep toggle |
| #953 | open | [#1020](https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/pulls/1020) | [QA] Verify bundled skill scripts run out of the box |
| #954 | open | [#1021](https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/pulls/1021) | [QA] Verify maps skill guest_house / camp_site / bakery expansion |
| #955 | open | — | [QA] Verify KittenTTS local provider end-to-end |
| #956 | open | [#1018](https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/pulls/1018) | [QA] Verify numbered keyboard shortcuts for approval + clarify prompts |
| #957 | open | [#1015](https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/pulls/1015) | [QA] Verify optional adversarial-ux-test skill catalog flow |
| #958 | open | [#1016](https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/pulls/1016) | [QA] Verify /usage account limits in CLI + gateway |
| #959 | open | [#1014](https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/pulls/1014) | [QA] Verify OpenCode-Go curated catalog additions |
| #960 | open | [#1017](https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/pulls/1017) | [QA] Verify patch 'did you mean?' suggestions |
| #961 | closed | — | [QA] Verify web dashboard update/restart action buttons |
| #962 | closed | — | [QA] Verify hardcoded-home path guard on burn/921 branch |
## Drift Signals
forge/main is still catching up to the upstream packet.
Active PR-backed child lanes:
- #953 -> #1020 ([QA] Verify bundled skill scripts run out of the box)
- #954 -> #1021 ([QA] Verify maps skill guest_house / camp_site / bakery expansion)
- #956 -> #1018 ([QA] Verify numbered keyboard shortcuts for approval + clarify prompts)
- #957 -> #1015 ([QA] Verify optional adversarial-ux-test skill catalog flow)
- #958 -> #1016 ([QA] Verify /usage account limits in CLI + gateway)
- #959 -> #1014 ([QA] Verify OpenCode-Go curated catalog additions)
- #960 -> #1017 ([QA] Verify patch 'did you mean?' suggestions)
## Unowned Open QA Issues
- #950 [QA] Verify AI Gateway provider UX + attribution headers
- #951 [QA] Verify transport abstraction + AnthropicTransport wiring
- #952 [QA] Verify CLI voice beep toggle
- #955 [QA] Verify KittenTTS local provider end-to-end
## Decomposition Follow-Ups
- #965 [open] [EPIC: Morning review packet — Hermes harness features landed 2026-04-21] Phase 1: Landscape Analysis & Scaffolding
- #966 [open] [EPIC: Morning review packet — Hermes harness features landed 2026-04-21] Phase 2: Core Logic Implementation
- #967 [closed] [EPIC: Morning review packet — Hermes harness features landed 2026-04-21] Phase 3: Poka-yoke Integration & Fleet Verification
## Conclusion
Refs #949 only. This epic remains open until every child QA issue has a truthful PASS/FAIL outcome, attached evidence, and any upstream/main versus forge/main drift is resolved or explicitly documented.
## Regeneration
```bash
python3 scripts/morning_review_packet_status.py --fetch-live --json-out docs/morning-review-packet-2026-04-21.snapshot.json --markdown-out docs/morning-review-packet-2026-04-21-status.md
```

View File

@@ -1,172 +0,0 @@
{
"generated_at": "2026-04-22T14:57:44.332419+00:00",
"repo": "Timmy_Foundation/hermes-agent",
"epic": {
"number": 949,
"title": "EPIC: Morning review packet \u2014 Hermes harness features landed 2026-04-21",
"state": "open",
"html_url": "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/949"
},
"children": [
{
"number": 950,
"title": "[QA] Verify AI Gateway provider UX + attribution headers",
"state": "open",
"html_url": "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/950",
"open_prs": []
},
{
"number": 951,
"title": "[QA] Verify transport abstraction + AnthropicTransport wiring",
"state": "open",
"html_url": "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/951",
"open_prs": []
},
{
"number": 952,
"title": "[QA] Verify CLI voice beep toggle",
"state": "open",
"html_url": "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/952",
"open_prs": []
},
{
"number": 953,
"title": "[QA] Verify bundled skill scripts run out of the box",
"state": "open",
"html_url": "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/953",
"open_prs": [
{
"number": 1020,
"title": "fix: ship bundled skill scripts executable",
"head": "fix/953",
"url": "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/pulls/1020"
}
]
},
{
"number": 954,
"title": "[QA] Verify maps skill guest_house / camp_site / bakery expansion",
"state": "open",
"html_url": "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/954",
"open_prs": [
{
"number": 1021,
"title": "feat: sync maps skill and verify guest_house/camp_site/bakery (#954)",
"head": "fix/954",
"url": "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/pulls/1021"
}
]
},
{
"number": 955,
"title": "[QA] Verify KittenTTS local provider end-to-end",
"state": "open",
"html_url": "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/955",
"open_prs": []
},
{
"number": 956,
"title": "[QA] Verify numbered keyboard shortcuts for approval + clarify prompts",
"state": "open",
"html_url": "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/956",
"open_prs": [
{
"number": 1018,
"title": "fix: add numbered approval and clarify shortcuts (#956)",
"head": "fix/956",
"url": "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/pulls/1018"
}
]
},
{
"number": 957,
"title": "[QA] Verify optional adversarial-ux-test skill catalog flow",
"state": "open",
"html_url": "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/957",
"open_prs": [
{
"number": 1015,
"title": "feat(skills): backport adversarial-ux-test optional skill",
"head": "fix/957",
"url": "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/pulls/1015"
}
]
},
{
"number": 958,
"title": "[QA] Verify /usage account limits in CLI + gateway",
"state": "open",
"html_url": "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/958",
"open_prs": [
{
"number": 1016,
"title": "fix: restore /usage account limits in CLI + gateway (#958)",
"head": "fix/958",
"url": "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/pulls/1016"
}
]
},
{
"number": 959,
"title": "[QA] Verify OpenCode-Go curated catalog additions",
"state": "open",
"html_url": "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/959",
"open_prs": [
{
"number": 1014,
"title": "fix(opencode-go): restore curated catalog additions",
"head": "fix/959",
"url": "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/pulls/1014"
}
]
},
{
"number": 960,
"title": "[QA] Verify patch 'did you mean?' suggestions",
"state": "open",
"html_url": "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/960",
"open_prs": [
{
"number": 1017,
"title": "fix(patch): port and verify did-you-mean suggestions (#960)",
"head": "fix/960",
"url": "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/pulls/1017"
}
]
},
{
"number": 961,
"title": "[QA] Verify web dashboard update/restart action buttons",
"state": "closed",
"html_url": "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/961",
"open_prs": []
},
{
"number": 962,
"title": "[QA] Verify hardcoded-home path guard on burn/921 branch",
"state": "closed",
"html_url": "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/962",
"open_prs": []
}
],
"decomposition_issues": [
{
"number": 965,
"title": "[EPIC: Morning review packet \u2014 Hermes harness features landed 2026-04-21] Phase 1: Landscape Analysis & Scaffolding",
"state": "open",
"html_url": "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/965"
},
{
"number": 966,
"title": "[EPIC: Morning review packet \u2014 Hermes harness features landed 2026-04-21] Phase 2: Core Logic Implementation",
"state": "open",
"html_url": "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/966"
},
{
"number": 967,
"title": "[EPIC: Morning review packet \u2014 Hermes harness features landed 2026-04-21] Phase 3: Poka-yoke Integration & Fleet Verification",
"state": "closed",
"html_url": "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/967"
}
]
}

132
hermes_cli/a2a_cmd.py Normal file
View File

@@ -0,0 +1,132 @@
"""CLI helpers for A2A task delegation."""
from __future__ import annotations
import json
import os
import re
import sys
import time
from pathlib import Path
from typing import Any
from agent.a2a_mtls import A2ATaskClient, A2ATaskServer
from hermes_cli.config import get_hermes_home
def _registry_path() -> Path:
return get_hermes_home() / "a2a_agents.json"
def _default_identity_paths() -> tuple[str, str, str]:
hermes_home = get_hermes_home()
agent_name = os.environ.get("HERMES_AGENT_NAME", "hermes").lower()
cert = os.environ.get(
"HERMES_A2A_CERT",
str(hermes_home / "pki" / "agents" / agent_name / f"{agent_name}.crt"),
)
key = os.environ.get(
"HERMES_A2A_KEY",
str(hermes_home / "pki" / "agents" / agent_name / f"{agent_name}.key"),
)
ca = os.environ.get(
"HERMES_A2A_CA",
str(hermes_home / "pki" / "ca" / "fleet-ca.crt"),
)
return cert, key, ca
def load_agent_registry(path: Path | None = None) -> dict[str, Any]:
registry_path = path or _registry_path()
if not registry_path.exists():
return {}
return json.loads(registry_path.read_text(encoding="utf-8"))
def resolve_agent_url(agent: str, *, registry_path: Path | None = None) -> str:
key = re.sub(r"[^A-Za-z0-9]+", "_", agent).upper()
env_value = os.getenv(f"HERMES_A2A_{key}_URL")
if env_value:
return env_value
registry = load_agent_registry(registry_path)
entry = registry.get(agent)
if isinstance(entry, str) and entry:
return entry
if isinstance(entry, dict):
url = entry.get("url") or entry.get("base_url") or entry.get("card_url")
if url:
return str(url)
if agent.startswith("https://") or agent.startswith("http://"):
return agent
raise SystemExit(f"Unknown A2A agent '{agent}'. Set HERMES_A2A_{key}_URL or add it to {_registry_path()}.")
def _print(data: dict[str, Any]) -> None:
print(json.dumps(data, indent=2, ensure_ascii=False))
def cmd_send(args) -> None:
base_url = args.url or resolve_agent_url(args.agent)
cert, key, ca = args.cert, args.key, args.ca
if not (cert and key and ca):
cert, key, ca = _default_identity_paths()
client = A2ATaskClient(cert=cert, key=key, ca=ca)
card = client.discover_card(base_url)
task = client.send_task(
base_url,
task=args.task,
requester=args.requester,
metadata={"agent": args.agent},
)
if args.wait:
task = client.wait_for_task(
base_url,
task["taskId"],
timeout=args.timeout,
poll_interval=args.poll_interval,
)
_print({
"agent": args.agent,
"url": base_url,
"card": card,
"task": task,
})
def cmd_status(args) -> None:
base_url = args.url or resolve_agent_url(args.agent)
cert, key, ca = args.cert, args.key, args.ca
if not (cert and key and ca):
cert, key, ca = _default_identity_paths()
client = A2ATaskClient(cert=cert, key=key, ca=ca)
task = client.get_task(base_url, args.task_id)
_print({"agent": args.agent, "url": base_url, "task": task})
def cmd_serve(args) -> None:
cert, key, ca = args.cert, args.key, args.ca
if not (cert and key and ca):
cert, key, ca = _default_identity_paths()
server = A2ATaskServer(cert=cert, key=key, ca=ca, host=args.host, port=args.port)
server.start()
print(f"A2A task server listening on https://{args.host}:{args.port}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
server.stop()
def cmd_a2a(args) -> None:
command = getattr(args, "a2a_command", None) or "send"
if command == "send":
cmd_send(args)
return
if command == "status":
cmd_status(args)
return
if command == "serve":
cmd_serve(args)
return
raise SystemExit(f"Unknown a2a command: {command}")

View File

@@ -173,6 +173,13 @@ from hermes_constants import OPENROUTER_BASE_URL
logger = logging.getLogger(__name__)
def cmd_a2a(args):
"""Dispatch A2A CLI subcommands lazily to avoid heavy imports at startup."""
from hermes_cli.a2a_cmd import cmd_a2a as _cmd_a2a
return _cmd_a2a(args)
def _relative_time(ts) -> str:
"""Format a timestamp as relative time (e.g., '2h ago', 'yesterday')."""
if not ts:
@@ -4781,6 +4788,45 @@ For more help on a command:
gateway_parser.set_defaults(func=cmd_gateway)
# =========================================================================
# a2a command
# =========================================================================
a2a_parser = subparsers.add_parser(
"a2a",
help="A2A task delegation over mutual TLS",
description="Send, inspect, and serve structured A2A tasks between Hermes agents",
)
a2a_subparsers = a2a_parser.add_subparsers(dest="a2a_command")
a2a_send = a2a_subparsers.add_parser("send", help="Send an A2A task to another agent")
a2a_send.add_argument("--agent", required=True, help="Agent alias or URL (for example: allegro)")
a2a_send.add_argument("--task", required=True, help="Task text to delegate")
a2a_send.add_argument("--url", help="Explicit base URL for the remote agent")
a2a_send.add_argument("--requester", default=None, help="Requester label included in task metadata")
a2a_send.add_argument("--wait", action="store_true", help="Poll until the task reaches a terminal state")
a2a_send.add_argument("--timeout", type=float, default=30.0, help="Wait timeout in seconds (default: 30)")
a2a_send.add_argument("--poll-interval", type=float, default=0.5, help="Polling interval in seconds while waiting (default: 0.5)")
a2a_send.add_argument("--cert", default=None, help="Client certificate path (defaults from HERMES_A2A_CERT)")
a2a_send.add_argument("--key", default=None, help="Client private key path (defaults from HERMES_A2A_KEY)")
a2a_send.add_argument("--ca", default=None, help="Fleet CA certificate path (defaults from HERMES_A2A_CA)")
a2a_status = a2a_subparsers.add_parser("status", help="Fetch the current status of an A2A task")
a2a_status.add_argument("--agent", required=True, help="Agent alias or URL (for example: allegro)")
a2a_status.add_argument("--task-id", required=True, help="Task identifier returned by a2a send")
a2a_status.add_argument("--url", help="Explicit base URL for the remote agent")
a2a_status.add_argument("--cert", default=None, help="Client certificate path (defaults from HERMES_A2A_CERT)")
a2a_status.add_argument("--key", default=None, help="Client private key path (defaults from HERMES_A2A_KEY)")
a2a_status.add_argument("--ca", default=None, help="Fleet CA certificate path (defaults from HERMES_A2A_CA)")
a2a_serve = a2a_subparsers.add_parser("serve", help="Run the local A2A task server")
a2a_serve.add_argument("--host", default=os.environ.get("HERMES_A2A_HOST", "127.0.0.1"), help="Bind host (default: HERMES_A2A_HOST or 127.0.0.1)")
a2a_serve.add_argument("--port", type=int, default=int(os.environ.get("HERMES_A2A_PORT", "9443")), help="Bind port (default: HERMES_A2A_PORT or 9443)")
a2a_serve.add_argument("--cert", default=None, help="Server certificate path (defaults from HERMES_A2A_CERT)")
a2a_serve.add_argument("--key", default=None, help="Server private key path (defaults from HERMES_A2A_KEY)")
a2a_serve.add_argument("--ca", default=None, help="Fleet CA certificate path (defaults from HERMES_A2A_CA)")
a2a_parser.set_defaults(func=cmd_a2a)
# =========================================================================
# setup command
# =========================================================================

View File

@@ -1,288 +0,0 @@
#!/usr/bin/env python3
"""Generate a grounded status report for hermes-agent morning review packet epic #949."""
from __future__ import annotations
import argparse
import base64
import json
import os
import re
import ssl
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
BASE_API = "https://forge.alexanderwhitestone.com/api/v1"
REPO = "Timmy_Foundation/hermes-agent"
TOKEN_PATH = Path("~/.config/gitea/token").expanduser()
DEFAULT_JSON_OUT = Path("docs/morning-review-packet-2026-04-21.snapshot.json")
DEFAULT_MARKDOWN_OUT = Path("docs/morning-review-packet-2026-04-21-status.md")
def extract_issue_numbers(text: str) -> list[int]:
seen: set[int] = set()
numbers: list[int] = []
for match in re.finditer(r"#(\d+)", text or ""):
num = int(match.group(1))
if num not in seen:
seen.add(num)
numbers.append(num)
return numbers
def _auth_headers(token: str) -> list[dict[str, str]]:
basic = base64.b64encode(f"{token}:".encode()).decode()
return [
{"Authorization": f"token {token}", "Accept": "application/json"},
{"Authorization": f"Basic {basic}", "Accept": "application/json"},
]
def api_get(path: str, *, headers_options: list[dict[str, str]] | None = None) -> Any:
token = TOKEN_PATH.read_text(encoding="utf-8").strip()
headers_options = headers_options or _auth_headers(token)
ctx = ssl.create_default_context()
url = f"{BASE_API}{path}"
last_error: Exception | None = None
for headers in headers_options:
try:
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, context=ctx, timeout=30) as resp:
return json.loads(resp.read().decode())
except Exception as exc: # pragma: no cover - exercised via live CLI use
last_error = exc
raise RuntimeError(f"GET {url} failed: {last_error}")
def issue_pr_matches(pr: dict[str, Any], issue_num: int) -> bool:
title = pr.get("title") or ""
body = pr.get("body") or ""
head = (pr.get("head") or {}).get("ref") or ""
exact_ref = re.compile(rf"(?<!\d)#{issue_num}(?!\d)")
body_ref = re.compile(rf"(?i)(closes|close|fixes|fix|resolves|resolve|refs|ref)\s+#?{issue_num}(?!\d)")
branch_variants = {
f"fix/{issue_num}",
f"issue-{issue_num}",
f"burn/{issue_num}",
f"fix/issue-{issue_num}",
}
return bool(
exact_ref.search(title)
or exact_ref.search(body)
or body_ref.search(body)
or head in branch_variants
)
def fetch_open_prs(*, headers_options: list[dict[str, str]]) -> list[dict[str, Any]]:
prs: list[dict[str, Any]] = []
page = 1
while True:
batch = api_get(
f"/repos/{REPO}/pulls?state=open&limit=100&page={page}",
headers_options=headers_options,
)
if not batch:
break
prs.extend(batch)
if len(batch) < 100:
break
page += 1
return prs
def fetch_live_snapshot(epic_issue_num: int = 949) -> dict[str, Any]:
token = TOKEN_PATH.read_text(encoding="utf-8").strip()
headers_options = _auth_headers(token)
epic = api_get(f"/repos/{REPO}/issues/{epic_issue_num}", headers_options=headers_options)
comments = api_get(f"/repos/{REPO}/issues/{epic_issue_num}/comments", headers_options=headers_options)
child_numbers = [n for n in extract_issue_numbers(epic.get("body") or "") if n != epic_issue_num]
decomposition_numbers = [
n
for comment in comments
for n in extract_issue_numbers(comment.get("body") or "")
if n not in child_numbers and n != epic_issue_num
]
open_prs = fetch_open_prs(headers_options=headers_options)
children = []
for number in child_numbers:
issue = api_get(f"/repos/{REPO}/issues/{number}", headers_options=headers_options)
matching_prs = [
{
"number": pr["number"],
"title": pr["title"],
"head": pr.get("head", {}).get("ref", ""),
"url": pr["html_url"],
}
for pr in open_prs
if issue_pr_matches(pr, number)
]
children.append(
{
"number": issue["number"],
"title": issue["title"],
"state": issue["state"],
"html_url": issue["html_url"],
"open_prs": matching_prs,
}
)
decomposition_issues = []
for number in decomposition_numbers:
issue = api_get(f"/repos/{REPO}/issues/{number}", headers_options=headers_options)
decomposition_issues.append(
{
"number": issue["number"],
"title": issue["title"],
"state": issue["state"],
"html_url": issue["html_url"],
}
)
return {
"generated_at": datetime.now(timezone.utc).isoformat(),
"repo": REPO,
"epic": {
"number": epic["number"],
"title": epic["title"],
"state": epic["state"],
"html_url": epic["html_url"],
},
"children": children,
"decomposition_issues": decomposition_issues,
}
def summarize_snapshot(snapshot: dict[str, Any]) -> dict[str, int]:
children = snapshot.get("children", [])
open_children = [issue for issue in children if issue.get("state") == "open"]
closed_children = [issue for issue in children if issue.get("state") == "closed"]
open_with_pr = [issue for issue in open_children if issue.get("open_prs")]
open_without_pr = [issue for issue in open_children if not issue.get("open_prs")]
return {
"total_children": len(children),
"open_children": len(open_children),
"closed_children": len(closed_children),
"open_with_pr": len(open_with_pr),
"open_without_pr": len(open_without_pr),
}
def render_markdown(snapshot: dict[str, Any]) -> str:
epic = snapshot["epic"]
children = snapshot.get("children", [])
summary = summarize_snapshot(snapshot)
open_with_pr = [issue for issue in children if issue.get("state") == "open" and issue.get("open_prs")]
open_without_pr = [issue for issue in children if issue.get("state") == "open" and not issue.get("open_prs")]
decomposition = snapshot.get("decomposition_issues", [])
lines = [
f"# Morning Review Packet Status — #{epic['number']}",
"",
f"Generated: {snapshot.get('generated_at', '')}",
f"Epic: [{epic['title']}]({epic.get('html_url', '')})",
"",
"## Summary",
"",
f"- Child QA issues tracked: {summary['total_children']}",
f"- Open child issues: {summary['open_children']}",
f"- Closed child issues: {summary['closed_children']}",
f"- Open child issues already backed by PRs: {summary['open_with_pr']}",
f"- Open child issues still unowned on forge: {summary['open_without_pr']}",
"",
"## Child QA Matrix",
"",
"| Issue | State | Open PRs | Title |",
"|------:|-------|----------|-------|",
]
for issue in children:
rendered_prs = []
for pr in issue.get("open_prs", []):
pr_num = pr.get("number", "?")
pr_url = pr.get("url") or pr.get("html_url") or ""
rendered_prs.append(f"[#{pr_num}]({pr_url})" if pr_url else f"#{pr_num}")
pr_text = ", ".join(rendered_prs) or ""
lines.append(
f"| #{issue['number']} | {issue['state']} | {pr_text} | {issue['title']} |"
)
lines.extend([
"",
"## Drift Signals",
"",
"forge/main is still catching up to the upstream packet.",
])
if open_with_pr:
lines.append("")
lines.append("Active PR-backed child lanes:")
for issue in open_with_pr:
pr_numbers = ", ".join(f"#{pr['number']}" for pr in issue.get("open_prs", []))
lines.append(f"- #{issue['number']} -> {pr_numbers} ({issue['title']})")
if open_without_pr:
lines.extend([
"",
"## Unowned Open QA Issues",
"",
])
for issue in open_without_pr:
lines.append(f"- #{issue['number']} {issue['title']}")
if decomposition:
lines.extend([
"",
"## Decomposition Follow-Ups",
"",
])
for issue in decomposition:
lines.append(f"- #{issue['number']} [{issue['state']}] {issue['title']}")
lines.extend([
"",
"## Conclusion",
"",
"Refs #949 only. This epic remains open until every child QA issue has a truthful PASS/FAIL outcome, attached evidence, and any upstream/main versus forge/main drift is resolved or explicitly documented.",
"",
"## Regeneration",
"",
"```bash",
"python3 scripts/morning_review_packet_status.py --fetch-live --json-out docs/morning-review-packet-2026-04-21.snapshot.json --markdown-out docs/morning-review-packet-2026-04-21-status.md",
"```",
])
return "\n".join(lines) + "\n"
def write_json(path: Path, data: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8")
def main() -> None:
parser = argparse.ArgumentParser(description="Generate grounded status docs for epic #949")
parser.add_argument("--fetch-live", action="store_true", help="Fetch the current packet state from Forge")
parser.add_argument("--snapshot", type=Path, help="Read a local JSON snapshot instead of hitting the API")
parser.add_argument("--json-out", type=Path, default=DEFAULT_JSON_OUT, help="Path to write JSON snapshot")
parser.add_argument("--markdown-out", type=Path, default=DEFAULT_MARKDOWN_OUT, help="Path to write markdown report")
args = parser.parse_args()
if args.fetch_live or not args.snapshot:
snapshot = fetch_live_snapshot()
else:
snapshot = json.loads(args.snapshot.read_text(encoding="utf-8"))
write_json(args.json_out, snapshot)
args.markdown_out.parent.mkdir(parents=True, exist_ok=True)
args.markdown_out.write_text(render_markdown(snapshot), encoding="utf-8")
print(args.markdown_out)
if __name__ == "__main__":
main()

View File

@@ -572,3 +572,94 @@ class TestA2AMTLSServerAndClient:
assert not errors, f"Concurrent connection errors: {errors}"
assert len(results) == 3
@_requires_crypto
class TestA2ATaskServerAndClient:
"""Structured A2A task send/get flow over mTLS."""
@pytest.fixture(autouse=True)
def _pki(self, tmp_path):
ca_dir = tmp_path / "ca"
ca_dir.mkdir()
self.ca_crt, self.ca_key = _make_ca_keypair(ca_dir)
agent_dir = tmp_path / "agents"
agent_dir.mkdir()
self.srv_crt, self.srv_key = _make_agent_keypair(
agent_dir, "timmy", self.ca_crt, self.ca_key
)
self.cli_crt, self.cli_key = _make_agent_keypair(
agent_dir, "allegro", self.ca_crt, self.ca_key
)
@pytest.fixture()
def task_server(self):
from agent.a2a_mtls import A2ATaskServer
gate = threading.Event()
def analyze_executor(task: dict[str, object]) -> dict[str, object]:
gate.wait(timeout=2)
text = str(task.get("task", ""))
return {
"text": f"analysis:{text}",
"metadata": {"tool": "local-hermes-stub"},
}
port = _find_free_port()
server = A2ATaskServer(
cert=self.srv_crt,
key=self.srv_key,
ca=self.ca_crt,
host="127.0.0.1",
port=port,
executor=analyze_executor,
)
with server:
time.sleep(0.1)
yield server, port, gate
def test_task_send_get_and_completion_flow(self, task_server):
from agent.a2a_mtls import A2ATaskClient
server, port, gate = task_server
client = A2ATaskClient(cert=self.cli_crt, key=self.cli_key, ca=self.ca_crt)
base_url = f"https://127.0.0.1:{port}"
card = client.discover_card(base_url)
assert card["name"]
submitted = client.send_task(base_url, task="Analyze README.md", requester="timmy")
assert submitted["status"]["state"] in {"submitted", "working"}
in_flight = client.get_task(base_url, submitted["taskId"])
assert in_flight["status"]["state"] in {"submitted", "working"}
gate.set()
completed = client.wait_for_task(base_url, submitted["taskId"], timeout=5.0, poll_interval=0.05)
assert completed["status"]["state"] == "completed"
assert completed["artifacts"][0]["text"] == "analysis:Analyze README.md"
def test_failed_executor_marks_task_failed(self):
from agent.a2a_mtls import A2ATaskClient, A2ATaskServer
def failing_executor(task: dict[str, object]) -> dict[str, object]:
raise RuntimeError("boom")
port = _find_free_port()
server = A2ATaskServer(
cert=self.srv_crt,
key=self.srv_key,
ca=self.ca_crt,
host="127.0.0.1",
port=port,
executor=failing_executor,
)
with server:
time.sleep(0.1)
client = A2ATaskClient(cert=self.cli_crt, key=self.cli_key, ca=self.ca_crt)
base_url = f"https://127.0.0.1:{port}"
submitted = client.send_task(base_url, task="explode", requester="timmy")
failed = client.wait_for_task(base_url, submitted["taskId"], timeout=5.0, poll_interval=0.05)
assert failed["status"]["state"] == "failed"
assert "boom" in failed["status"]["message"]

View File

@@ -0,0 +1,95 @@
from __future__ import annotations
import argparse
import json
from pathlib import Path
from unittest.mock import patch
import pytest
def test_cmd_send_uses_registry_and_waits_for_terminal_task(tmp_path, monkeypatch, capsys):
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
(hermes_home / "a2a_agents.json").write_text(
json.dumps({"allegro": {"url": "https://127.0.0.1:9443"}}),
encoding="utf-8",
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
from hermes_cli.a2a_cmd import cmd_a2a
class FakeClient:
def __init__(self, **kwargs):
self.kwargs = kwargs
def discover_card(self, base_url: str):
assert base_url == "https://127.0.0.1:9443"
return {"name": "allegro", "url": base_url}
def send_task(self, base_url: str, *, task: str, requester: str | None = None, metadata=None):
assert task == "analyze README"
return {"taskId": "task-123", "status": {"state": "submitted"}}
def wait_for_task(self, base_url: str, task_id: str, *, timeout: float, poll_interval: float):
assert task_id == "task-123"
return {
"taskId": task_id,
"status": {"state": "completed"},
"artifacts": [{"text": "README looks healthy"}],
}
args = argparse.Namespace(
a2a_command="send",
agent="allegro",
task="analyze README",
url=None,
wait=True,
timeout=5.0,
poll_interval=0.01,
requester="timmy",
cert="cert.pem",
key="key.pem",
ca="ca.pem",
)
with patch("hermes_cli.a2a_cmd.A2ATaskClient", FakeClient):
cmd_a2a(args)
result = json.loads(capsys.readouterr().out)
assert result["agent"] == "allegro"
assert result["card"]["name"] == "allegro"
assert result["task"]["status"]["state"] == "completed"
assert result["task"]["artifacts"][0]["text"] == "README looks healthy"
def test_resolve_agent_url_supports_env_override(monkeypatch):
monkeypatch.setenv("HERMES_A2A_ALLEGRO_URL", "https://fleet-allegro:9443")
from hermes_cli.a2a_cmd import resolve_agent_url
assert resolve_agent_url("allegro") == "https://fleet-allegro:9443"
def test_cmd_send_requires_known_agent(tmp_path, monkeypatch):
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
from hermes_cli.a2a_cmd import cmd_a2a
args = argparse.Namespace(
a2a_command="send",
agent="unknown",
task="do work",
url=None,
wait=False,
timeout=5.0,
poll_interval=0.05,
requester=None,
cert="cert.pem",
key="key.pem",
ca="ca.pem",
)
with pytest.raises(SystemExit):
cmd_a2a(args)

View File

@@ -1,94 +0,0 @@
"""Tests for the morning review packet status report generator."""
from __future__ import annotations
import importlib.util
from pathlib import Path
SCRIPT_PATH = Path(__file__).resolve().parents[1] / "scripts" / "morning_review_packet_status.py"
DOC_PATH = Path(__file__).resolve().parents[1] / "docs" / "morning-review-packet-2026-04-21-status.md"
def load_module():
assert SCRIPT_PATH.exists(), f"missing status script: {SCRIPT_PATH}"
spec = importlib.util.spec_from_file_location("morning_review_packet_status_test", SCRIPT_PATH)
module = importlib.util.module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(module)
return module
def sample_snapshot():
return {
"epic": {"number": 949, "title": "Morning review packet", "state": "open"},
"children": [
{
"number": 950,
"title": "Verify AI Gateway provider UX + attribution headers",
"state": "open",
"open_prs": [],
},
{
"number": 954,
"title": "Verify maps skill guest_house / camp_site / bakery expansion",
"state": "open",
"open_prs": [
{"number": 1021, "head": "fix/954", "title": "feat: sync maps skill and verify guest_house/camp_site/bakery (#954)"}
],
},
{
"number": 961,
"title": "Verify web dashboard update/restart action buttons",
"state": "closed",
"open_prs": [],
},
],
"decomposition_issues": [
{"number": 965, "title": "Phase 1: Landscape Analysis & Scaffolding", "state": "open"},
{"number": 967, "title": "Phase 3: Poka-yoke Integration & Fleet Verification", "state": "closed"},
],
}
def test_extract_child_issue_numbers_from_epic_body():
module = load_module()
body = """
- [ ] #950 one
- [ ] #951 two
- [ ] #962 three
"""
assert module.extract_issue_numbers(body) == [950, 951, 962]
def test_summarize_snapshot_counts_open_closed_and_pr_backing():
module = load_module()
summary = module.summarize_snapshot(sample_snapshot())
assert summary["total_children"] == 3
assert summary["open_children"] == 2
assert summary["closed_children"] == 1
assert summary["open_with_pr"] == 1
assert summary["open_without_pr"] == 1
def test_render_markdown_includes_issue_matrix_and_drift_sections():
module = load_module()
md = module.render_markdown(sample_snapshot())
assert "# Morning Review Packet Status — #949" in md
assert "## Child QA Matrix" in md
assert "#950" in md
assert "#954" in md
assert "#1021" in md
assert "## Unowned Open QA Issues" in md
assert "## Drift Signals" in md
assert "forge/main is still catching up to the upstream packet" in md
def test_committed_status_doc_exists_and_mentions_live_examples():
assert DOC_PATH.exists(), f"missing generated status doc: {DOC_PATH}"
text = DOC_PATH.read_text(encoding="utf-8")
assert "# Morning Review Packet Status — #949" in text
assert "#954" in text
assert "#1021" in text
assert "#950" in text