Implements Google Agent2Agent Protocol v1.0 with full fleet integration: ## Phase 1 - Agent Card & Discovery - Agent Card types with JSON serialization (camelCase, Part discrimination by key) - Card generation from YAML config (~/.hermes/agent_card.yaml) - Fleet registry with LocalFileRegistry + GiteaRegistry backends - Discovery by skill ID or tag ## Phase 2 - Task Delegation - Async A2A client with JSON-RPC SendMessage/GetTask/ListTasks/CancelTask - FastAPI server with pluggable task handlers (skill-routed) - CLI tool (bin/a2a_delegate.py) for fleet delegation - Broadcast to multiple agents in parallel ## Phase 3 - Security & Reliability - Bearer token + API key auth (configurable per agent) - Retry logic (max 3 retries, 30s timeout) - Audit logging for all inter-agent requests - Error handling per A2A spec (-32001 to -32009 codes) ## Test Coverage - 37 tests covering types, card building, registry, server integration - Auth (required + success), handler routing, error handling Files: - nexus/a2a/ (types.py, card.py, client.py, server.py, registry.py) - bin/a2a_delegate.py (CLI) - config/ (agent_card.example.yaml, fleet_agents.json) - docs/A2A_PROTOCOL.md - tests/test_a2a.py (37 tests, all passing)
242 lines
7.2 KiB
Python
242 lines
7.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
A2A Delegate — CLI tool for fleet task delegation.
|
|
|
|
Usage:
|
|
# List available fleet agents
|
|
python -m bin.a2a_delegate list
|
|
|
|
# Discover agents with a specific skill
|
|
python -m bin.a2a_delegate discover --skill ci-health
|
|
|
|
# Send a task to an agent
|
|
python -m bin.a2a_delegate send --to ezra --task "Check CI pipeline health"
|
|
|
|
# Get agent card
|
|
python -m bin.a2a_delegate card --agent ezra
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
logger = logging.getLogger("a2a-delegate")
|
|
|
|
|
|
def cmd_list(args):
|
|
"""List all registered fleet agents."""
|
|
from nexus.a2a.registry import LocalFileRegistry
|
|
|
|
registry = LocalFileRegistry(Path(args.registry))
|
|
agents = registry.list_agents()
|
|
|
|
if not agents:
|
|
print("No agents registered.")
|
|
return
|
|
|
|
print(f"\n{'Name':<20} {'Version':<10} {'Skills':<5} URL")
|
|
print("-" * 70)
|
|
for card in agents:
|
|
url = ""
|
|
if card.supported_interfaces:
|
|
url = card.supported_interfaces[0].url
|
|
print(
|
|
f"{card.name:<20} {card.version:<10} "
|
|
f"{len(card.skills):<5} {url}"
|
|
)
|
|
print()
|
|
|
|
|
|
def cmd_discover(args):
|
|
"""Discover agents by skill or tag."""
|
|
from nexus.a2a.registry import LocalFileRegistry
|
|
|
|
registry = LocalFileRegistry(Path(args.registry))
|
|
agents = registry.list_agents(skill=args.skill, tag=args.tag)
|
|
|
|
if not agents:
|
|
print("No matching agents found.")
|
|
return
|
|
|
|
for card in agents:
|
|
print(f"\n{card.name} (v{card.version})")
|
|
print(f" {card.description}")
|
|
if card.supported_interfaces:
|
|
print(f" Endpoint: {card.supported_interfaces[0].url}")
|
|
for skill in card.skills:
|
|
tags_str = ", ".join(skill.tags) if skill.tags else ""
|
|
print(f" [{skill.id}] {skill.name} — {skill.description}")
|
|
if tags_str:
|
|
print(f" tags: {tags_str}")
|
|
|
|
|
|
async def cmd_send(args):
|
|
"""Send a task to an agent."""
|
|
from nexus.a2a.card import load_card_config
|
|
from nexus.a2a.client import A2AClient, A2AClientConfig
|
|
from nexus.a2a.registry import LocalFileRegistry
|
|
from nexus.a2a.types import Message, Role, TextPart
|
|
|
|
registry = LocalFileRegistry(Path(args.registry))
|
|
target = registry.get(args.to)
|
|
|
|
if not target:
|
|
print(f"Agent '{args.to}' not found in registry.")
|
|
sys.exit(1)
|
|
|
|
if not target.supported_interfaces:
|
|
print(f"Agent '{args.to}' has no endpoint configured.")
|
|
sys.exit(1)
|
|
|
|
endpoint = target.supported_interfaces[0].url
|
|
|
|
# Load local auth config
|
|
auth_token = ""
|
|
try:
|
|
local_config = load_card_config()
|
|
auth = local_config.get("auth", {})
|
|
import os
|
|
token_env = auth.get("token_env", "A2A_AUTH_TOKEN")
|
|
auth_token = os.environ.get(token_env, "")
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
config = A2AClientConfig(
|
|
auth_token=auth_token,
|
|
timeout=args.timeout,
|
|
max_retries=args.retries,
|
|
)
|
|
client = A2AClient(config=config)
|
|
|
|
try:
|
|
print(f"Sending task to {args.to} ({endpoint})...")
|
|
print(f"Task: {args.task}")
|
|
print()
|
|
|
|
message = Message(
|
|
role=Role.USER,
|
|
parts=[TextPart(text=args.task)],
|
|
metadata={"targetSkill": args.skill} if args.skill else {},
|
|
)
|
|
|
|
task = await client.send_message(endpoint, message)
|
|
print(f"Task ID: {task.id}")
|
|
print(f"State: {task.status.state.value}")
|
|
|
|
if args.wait:
|
|
print("Waiting for completion...")
|
|
task = await client.wait_for_completion(
|
|
endpoint, task.id,
|
|
poll_interval=args.poll_interval,
|
|
max_wait=args.timeout,
|
|
)
|
|
print(f"\nFinal state: {task.status.state.value}")
|
|
for artifact in task.artifacts:
|
|
for part in artifact.parts:
|
|
if isinstance(part, TextPart):
|
|
print(f"\n--- {artifact.name or 'result'} ---")
|
|
print(part.text)
|
|
|
|
# Audit log
|
|
if args.audit:
|
|
print("\n--- Audit Log ---")
|
|
for entry in client.get_audit_log():
|
|
print(json.dumps(entry, indent=2))
|
|
|
|
finally:
|
|
await client.close()
|
|
|
|
|
|
async def cmd_card(args):
|
|
"""Fetch and display a remote agent's card."""
|
|
from nexus.a2a.client import A2AClient, A2AClientConfig
|
|
from nexus.a2a.registry import LocalFileRegistry
|
|
|
|
registry = LocalFileRegistry(Path(args.registry))
|
|
target = registry.get(args.agent)
|
|
|
|
if not target:
|
|
print(f"Agent '{args.agent}' not found in registry.")
|
|
sys.exit(1)
|
|
|
|
if not target.supported_interfaces:
|
|
print(f"Agent '{args.agent}' has no endpoint.")
|
|
sys.exit(1)
|
|
|
|
base_url = target.supported_interfaces[0].url
|
|
# Strip /a2a/v1 suffix to get base
|
|
for suffix in ["/a2a/v1", "/rpc"]:
|
|
if base_url.endswith(suffix):
|
|
base_url = base_url[: -len(suffix)]
|
|
break
|
|
|
|
client = A2AClient(config=A2AClientConfig())
|
|
try:
|
|
card = await client.get_agent_card(base_url)
|
|
print(json.dumps(card.to_dict(), indent=2))
|
|
finally:
|
|
await client.close()
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="A2A Fleet Delegation Tool"
|
|
)
|
|
parser.add_argument(
|
|
"--registry",
|
|
default="config/fleet_agents.json",
|
|
help="Path to fleet registry JSON (default: config/fleet_agents.json)",
|
|
)
|
|
|
|
sub = parser.add_subparsers(dest="command")
|
|
|
|
# list
|
|
sub.add_parser("list", help="List registered agents")
|
|
|
|
# discover
|
|
p_discover = sub.add_parser("discover", help="Discover agents by skill/tag")
|
|
p_discover.add_argument("--skill", help="Filter by skill ID")
|
|
p_discover.add_argument("--tag", help="Filter by skill tag")
|
|
|
|
# send
|
|
p_send = sub.add_parser("send", help="Send a task to an agent")
|
|
p_send.add_argument("--to", required=True, help="Target agent name")
|
|
p_send.add_argument("--task", required=True, help="Task text")
|
|
p_send.add_argument("--skill", help="Target skill ID")
|
|
p_send.add_argument("--wait", action="store_true", help="Wait for completion")
|
|
p_send.add_argument("--timeout", type=float, default=30.0, help="Timeout in seconds")
|
|
p_send.add_argument("--retries", type=int, default=3, help="Max retries")
|
|
p_send.add_argument("--poll-interval", type=float, default=2.0, help="Poll interval")
|
|
p_send.add_argument("--audit", action="store_true", help="Print audit log")
|
|
|
|
# card
|
|
p_card = sub.add_parser("card", help="Fetch remote agent card")
|
|
p_card.add_argument("--agent", required=True, help="Agent name")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.command == "list":
|
|
cmd_list(args)
|
|
elif args.command == "discover":
|
|
cmd_discover(args)
|
|
elif args.command == "send":
|
|
asyncio.run(cmd_send(args))
|
|
elif args.command == "card":
|
|
asyncio.run(cmd_card(args))
|
|
else:
|
|
parser.print_help()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|