Files
the-nexus/bin/a2a_delegate.py
Alexander Whitestone bb9758c4d2
Some checks failed
CI / test (pull_request) Failing after 31s
Review Approval Gate / verify-review (pull_request) Failing after 4s
CI / validate (pull_request) Failing after 30s
feat: implement A2A protocol for fleet-wizard delegation (#1122)
Implements Google Agent2Agent Protocol v1.0 with full fleet integration:

## Phase 1 - Agent Card & Discovery
- Agent Card types with JSON serialization (camelCase, Part discrimination by key)
- Card generation from YAML config (~/.hermes/agent_card.yaml)
- Fleet registry with LocalFileRegistry + GiteaRegistry backends
- Discovery by skill ID or tag

## Phase 2 - Task Delegation
- Async A2A client with JSON-RPC SendMessage/GetTask/ListTasks/CancelTask
- FastAPI server with pluggable task handlers (skill-routed)
- CLI tool (bin/a2a_delegate.py) for fleet delegation
- Broadcast to multiple agents in parallel

## Phase 3 - Security & Reliability
- Bearer token + API key auth (configurable per agent)
- Retry logic (max 3 retries, 30s timeout)
- Audit logging for all inter-agent requests
- Error handling per A2A spec (-32001 to -32009 codes)

## Test Coverage
- 37 tests covering types, card building, registry, server integration
- Auth (required + success), handler routing, error handling

Files:
- nexus/a2a/ (types.py, card.py, client.py, server.py, registry.py)
- bin/a2a_delegate.py (CLI)
- config/ (agent_card.example.yaml, fleet_agents.json)
- docs/A2A_PROTOCOL.md
- tests/test_a2a.py (37 tests, all passing)
2026-04-13 18:31:05 -04:00

242 lines
7.2 KiB
Python

#!/usr/bin/env python3
"""
A2A Delegate — CLI tool for fleet task delegation.
Usage:
# List available fleet agents
python -m bin.a2a_delegate list
# Discover agents with a specific skill
python -m bin.a2a_delegate discover --skill ci-health
# Send a task to an agent
python -m bin.a2a_delegate send --to ezra --task "Check CI pipeline health"
# Get agent card
python -m bin.a2a_delegate card --agent ezra
"""
from __future__ import annotations
import argparse
import asyncio
import json
import logging
import sys
from pathlib import Path
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("a2a-delegate")
def cmd_list(args):
"""List all registered fleet agents."""
from nexus.a2a.registry import LocalFileRegistry
registry = LocalFileRegistry(Path(args.registry))
agents = registry.list_agents()
if not agents:
print("No agents registered.")
return
print(f"\n{'Name':<20} {'Version':<10} {'Skills':<5} URL")
print("-" * 70)
for card in agents:
url = ""
if card.supported_interfaces:
url = card.supported_interfaces[0].url
print(
f"{card.name:<20} {card.version:<10} "
f"{len(card.skills):<5} {url}"
)
print()
def cmd_discover(args):
"""Discover agents by skill or tag."""
from nexus.a2a.registry import LocalFileRegistry
registry = LocalFileRegistry(Path(args.registry))
agents = registry.list_agents(skill=args.skill, tag=args.tag)
if not agents:
print("No matching agents found.")
return
for card in agents:
print(f"\n{card.name} (v{card.version})")
print(f" {card.description}")
if card.supported_interfaces:
print(f" Endpoint: {card.supported_interfaces[0].url}")
for skill in card.skills:
tags_str = ", ".join(skill.tags) if skill.tags else ""
print(f" [{skill.id}] {skill.name}{skill.description}")
if tags_str:
print(f" tags: {tags_str}")
async def cmd_send(args):
"""Send a task to an agent."""
from nexus.a2a.card import load_card_config
from nexus.a2a.client import A2AClient, A2AClientConfig
from nexus.a2a.registry import LocalFileRegistry
from nexus.a2a.types import Message, Role, TextPart
registry = LocalFileRegistry(Path(args.registry))
target = registry.get(args.to)
if not target:
print(f"Agent '{args.to}' not found in registry.")
sys.exit(1)
if not target.supported_interfaces:
print(f"Agent '{args.to}' has no endpoint configured.")
sys.exit(1)
endpoint = target.supported_interfaces[0].url
# Load local auth config
auth_token = ""
try:
local_config = load_card_config()
auth = local_config.get("auth", {})
import os
token_env = auth.get("token_env", "A2A_AUTH_TOKEN")
auth_token = os.environ.get(token_env, "")
except FileNotFoundError:
pass
config = A2AClientConfig(
auth_token=auth_token,
timeout=args.timeout,
max_retries=args.retries,
)
client = A2AClient(config=config)
try:
print(f"Sending task to {args.to} ({endpoint})...")
print(f"Task: {args.task}")
print()
message = Message(
role=Role.USER,
parts=[TextPart(text=args.task)],
metadata={"targetSkill": args.skill} if args.skill else {},
)
task = await client.send_message(endpoint, message)
print(f"Task ID: {task.id}")
print(f"State: {task.status.state.value}")
if args.wait:
print("Waiting for completion...")
task = await client.wait_for_completion(
endpoint, task.id,
poll_interval=args.poll_interval,
max_wait=args.timeout,
)
print(f"\nFinal state: {task.status.state.value}")
for artifact in task.artifacts:
for part in artifact.parts:
if isinstance(part, TextPart):
print(f"\n--- {artifact.name or 'result'} ---")
print(part.text)
# Audit log
if args.audit:
print("\n--- Audit Log ---")
for entry in client.get_audit_log():
print(json.dumps(entry, indent=2))
finally:
await client.close()
async def cmd_card(args):
"""Fetch and display a remote agent's card."""
from nexus.a2a.client import A2AClient, A2AClientConfig
from nexus.a2a.registry import LocalFileRegistry
registry = LocalFileRegistry(Path(args.registry))
target = registry.get(args.agent)
if not target:
print(f"Agent '{args.agent}' not found in registry.")
sys.exit(1)
if not target.supported_interfaces:
print(f"Agent '{args.agent}' has no endpoint.")
sys.exit(1)
base_url = target.supported_interfaces[0].url
# Strip /a2a/v1 suffix to get base
for suffix in ["/a2a/v1", "/rpc"]:
if base_url.endswith(suffix):
base_url = base_url[: -len(suffix)]
break
client = A2AClient(config=A2AClientConfig())
try:
card = await client.get_agent_card(base_url)
print(json.dumps(card.to_dict(), indent=2))
finally:
await client.close()
def main():
parser = argparse.ArgumentParser(
description="A2A Fleet Delegation Tool"
)
parser.add_argument(
"--registry",
default="config/fleet_agents.json",
help="Path to fleet registry JSON (default: config/fleet_agents.json)",
)
sub = parser.add_subparsers(dest="command")
# list
sub.add_parser("list", help="List registered agents")
# discover
p_discover = sub.add_parser("discover", help="Discover agents by skill/tag")
p_discover.add_argument("--skill", help="Filter by skill ID")
p_discover.add_argument("--tag", help="Filter by skill tag")
# send
p_send = sub.add_parser("send", help="Send a task to an agent")
p_send.add_argument("--to", required=True, help="Target agent name")
p_send.add_argument("--task", required=True, help="Task text")
p_send.add_argument("--skill", help="Target skill ID")
p_send.add_argument("--wait", action="store_true", help="Wait for completion")
p_send.add_argument("--timeout", type=float, default=30.0, help="Timeout in seconds")
p_send.add_argument("--retries", type=int, default=3, help="Max retries")
p_send.add_argument("--poll-interval", type=float, default=2.0, help="Poll interval")
p_send.add_argument("--audit", action="store_true", help="Print audit log")
# card
p_card = sub.add_parser("card", help="Fetch remote agent card")
p_card.add_argument("--agent", required=True, help="Agent name")
args = parser.parse_args()
if args.command == "list":
cmd_list(args)
elif args.command == "discover":
cmd_discover(args)
elif args.command == "send":
asyncio.run(cmd_send(args))
elif args.command == "card":
asyncio.run(cmd_card(args))
else:
parser.print_help()
if __name__ == "__main__":
main()