Implements Google Agent2Agent Protocol v1.0 with full fleet integration: ## Phase 1 - Agent Card & Discovery - Agent Card types with JSON serialization (camelCase, Part discrimination by key) - Card generation from YAML config (~/.hermes/agent_card.yaml) - Fleet registry with LocalFileRegistry + GiteaRegistry backends - Discovery by skill ID or tag ## Phase 2 - Task Delegation - Async A2A client with JSON-RPC SendMessage/GetTask/ListTasks/CancelTask - FastAPI server with pluggable task handlers (skill-routed) - CLI tool (bin/a2a_delegate.py) for fleet delegation - Broadcast to multiple agents in parallel ## Phase 3 - Security & Reliability - Bearer token + API key auth (configurable per agent) - Retry logic (max 3 retries, 30s timeout) - Audit logging for all inter-agent requests - Error handling per A2A spec (-32001 to -32009 codes) ## Test Coverage - 37 tests covering types, card building, registry, server integration - Auth (required + success), handler routing, error handling Files: - nexus/a2a/ (types.py, card.py, client.py, server.py, registry.py) - bin/a2a_delegate.py (CLI) - config/ (agent_card.example.yaml, fleet_agents.json) - docs/A2A_PROTOCOL.md - tests/test_a2a.py (37 tests, all passing)
265 lines
8.5 KiB
Python
265 lines
8.5 KiB
Python
"""
|
|
A2A Registry — fleet-wide agent discovery.
|
|
|
|
Provides two registry backends:
|
|
1. LocalFileRegistry: reads/writes agent cards to a JSON file
|
|
(default: config/fleet_agents.json)
|
|
2. GiteaRegistry: stores agent cards as a Gitea repo file
|
|
(for distributed fleet discovery)
|
|
|
|
Usage:
|
|
registry = LocalFileRegistry()
|
|
registry.register(my_card)
|
|
agents = registry.list_agents(skill="ci-health")
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from nexus.a2a.types import AgentCard
|
|
|
|
logger = logging.getLogger("nexus.a2a.registry")
|
|
|
|
|
|
class LocalFileRegistry:
|
|
"""
|
|
File-based agent card registry.
|
|
|
|
Stores all fleet agent cards in a single JSON file.
|
|
Suitable for single-node or read-heavy workloads.
|
|
"""
|
|
|
|
def __init__(self, path: Path = Path("config/fleet_agents.json")):
|
|
self.path = path
|
|
self._cards: dict[str, AgentCard] = {}
|
|
self._load()
|
|
|
|
def _load(self):
|
|
"""Load registry from disk."""
|
|
if self.path.exists():
|
|
try:
|
|
with open(self.path) as f:
|
|
data = json.load(f)
|
|
for card_data in data.get("agents", []):
|
|
card = AgentCard.from_dict(card_data)
|
|
self._cards[card.name.lower()] = card
|
|
logger.info(
|
|
f"Loaded {len(self._cards)} agents from {self.path}"
|
|
)
|
|
except (json.JSONDecodeError, KeyError) as e:
|
|
logger.error(f"Failed to load registry from {self.path}: {e}")
|
|
|
|
def _save(self):
|
|
"""Persist registry to disk."""
|
|
self.path.parent.mkdir(parents=True, exist_ok=True)
|
|
data = {
|
|
"version": 1,
|
|
"agents": [card.to_dict() for card in self._cards.values()],
|
|
}
|
|
with open(self.path, "w") as f:
|
|
json.dump(data, f, indent=2)
|
|
logger.debug(f"Saved {len(self._cards)} agents to {self.path}")
|
|
|
|
def register(self, card: AgentCard) -> None:
|
|
"""Register or update an agent card."""
|
|
self._cards[card.name.lower()] = card
|
|
self._save()
|
|
logger.info(f"Registered agent: {card.name}")
|
|
|
|
def unregister(self, name: str) -> bool:
|
|
"""Remove an agent from the registry."""
|
|
key = name.lower()
|
|
if key in self._cards:
|
|
del self._cards[key]
|
|
self._save()
|
|
logger.info(f"Unregistered agent: {name}")
|
|
return True
|
|
return False
|
|
|
|
def get(self, name: str) -> Optional[AgentCard]:
|
|
"""Get an agent card by name."""
|
|
return self._cards.get(name.lower())
|
|
|
|
def list_agents(
|
|
self,
|
|
skill: Optional[str] = None,
|
|
tag: Optional[str] = None,
|
|
) -> list[AgentCard]:
|
|
"""
|
|
List all registered agents, optionally filtered by skill or tag.
|
|
|
|
Args:
|
|
skill: Filter to agents that have this skill ID
|
|
tag: Filter to agents that have this tag on any skill
|
|
"""
|
|
agents = list(self._cards.values())
|
|
|
|
if skill:
|
|
agents = [
|
|
a for a in agents
|
|
if any(s.id == skill for s in a.skills)
|
|
]
|
|
|
|
if tag:
|
|
agents = [
|
|
a for a in agents
|
|
if any(tag in s.tags for s in a.skills)
|
|
]
|
|
|
|
return agents
|
|
|
|
def get_endpoint(self, name: str) -> Optional[str]:
|
|
"""Get the first supported interface URL for an agent."""
|
|
card = self.get(name)
|
|
if card and card.supported_interfaces:
|
|
return card.supported_interfaces[0].url
|
|
return None
|
|
|
|
def dump(self) -> dict:
|
|
"""Dump full registry as a dict."""
|
|
return {
|
|
"version": 1,
|
|
"agents": [card.to_dict() for card in self._cards.values()],
|
|
}
|
|
|
|
|
|
class GiteaRegistry:
|
|
"""
|
|
Gitea-backed agent registry.
|
|
|
|
Stores fleet agent cards in a Gitea repository file for
|
|
distributed discovery across VPS nodes.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
gitea_url: str,
|
|
repo: str,
|
|
token: str,
|
|
file_path: str = "config/fleet_agents.json",
|
|
):
|
|
self.gitea_url = gitea_url.rstrip("/")
|
|
self.repo = repo
|
|
self.token = token
|
|
self.file_path = file_path
|
|
self._cards: dict[str, AgentCard] = {}
|
|
|
|
def _api_url(self, endpoint: str) -> str:
|
|
return f"{self.gitea_url}/api/v1/repos/{self.repo}/{endpoint}"
|
|
|
|
def _headers(self) -> dict:
|
|
return {
|
|
"Authorization": f"token {self.token}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
async def load(self) -> None:
|
|
"""Fetch agent cards from Gitea."""
|
|
try:
|
|
import aiohttp
|
|
url = self._api_url(f"contents/{self.file_path}")
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url, headers=self._headers()) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
import base64
|
|
content = base64.b64decode(data["content"]).decode()
|
|
registry = json.loads(content)
|
|
for card_data in registry.get("agents", []):
|
|
card = AgentCard.from_dict(card_data)
|
|
self._cards[card.name.lower()] = card
|
|
logger.info(
|
|
f"Loaded {len(self._cards)} agents from Gitea"
|
|
)
|
|
elif resp.status == 404:
|
|
logger.info("No fleet registry file in Gitea yet")
|
|
else:
|
|
logger.error(
|
|
f"Gitea fetch failed: {resp.status}"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to load from Gitea: {e}")
|
|
|
|
async def save(self, message: str = "Update fleet registry") -> None:
|
|
"""Write agent cards to Gitea."""
|
|
try:
|
|
import aiohttp
|
|
content = json.dumps(
|
|
{"version": 1, "agents": [c.to_dict() for c in self._cards.values()]},
|
|
indent=2,
|
|
)
|
|
import base64
|
|
encoded = base64.b64encode(content.encode()).decode()
|
|
|
|
# Check if file exists (need SHA for update)
|
|
url = self._api_url(f"contents/{self.file_path}")
|
|
sha = None
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url, headers=self._headers()) as resp:
|
|
if resp.status == 200:
|
|
existing = await resp.json()
|
|
sha = existing.get("sha")
|
|
|
|
payload = {
|
|
"message": message,
|
|
"content": encoded,
|
|
}
|
|
if sha:
|
|
payload["sha"] = sha
|
|
|
|
async with session.put(
|
|
url, headers=self._headers(), json=payload
|
|
) as resp:
|
|
if resp.status in (200, 201):
|
|
logger.info("Fleet registry saved to Gitea")
|
|
else:
|
|
body = await resp.text()
|
|
logger.error(
|
|
f"Gitea save failed: {resp.status} — {body}"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to save to Gitea: {e}")
|
|
|
|
def register(self, card: AgentCard) -> None:
|
|
"""Register an agent (local update; call save() to persist)."""
|
|
self._cards[card.name.lower()] = card
|
|
|
|
def unregister(self, name: str) -> bool:
|
|
key = name.lower()
|
|
if key in self._cards:
|
|
del self._cards[key]
|
|
return True
|
|
return False
|
|
|
|
def get(self, name: str) -> Optional[AgentCard]:
|
|
return self._cards.get(name.lower())
|
|
|
|
def list_agents(
|
|
self,
|
|
skill: Optional[str] = None,
|
|
tag: Optional[str] = None,
|
|
) -> list[AgentCard]:
|
|
agents = list(self._cards.values())
|
|
if skill:
|
|
agents = [a for a in agents if any(s.id == skill for s in a.skills)]
|
|
if tag:
|
|
agents = [a for a in agents if any(tag in s.tags for s in a.skills)]
|
|
return agents
|
|
|
|
|
|
# --- Convenience ---
|
|
|
|
def discover_agents(
|
|
path: Path = Path("config/fleet_agents.json"),
|
|
skill: Optional[str] = None,
|
|
tag: Optional[str] = None,
|
|
) -> list[AgentCard]:
|
|
"""One-shot discovery from local file."""
|
|
registry = LocalFileRegistry(path)
|
|
return registry.list_agents(skill=skill, tag=tag)
|