Compare commits

...

1 Commits

Author SHA1 Message Date
Allegro
a058d6a5a9 feat(acp): Redis transport layer for Agent Communication Protocol
Replaces tmux send-keys with Redis-backed message passing.

Implements:
- RedisTransport class: send/receive/ack/broadcast via Redis queues + Pub/Sub
- Agent registry stored in Redis Hash with TTL auto-expiry (heartbeat pattern)
- TransportRouter: Redis primary, tmux fallback
- Full CLI: acp send/receive/register/unregister/agents/broadcast/health/queue
- TmuxTransport fallback for backward compatibility

Queue schema:
  acp:inbox:{agent_id}  - LPUSH/BRPOP for FIFO message delivery
  acp:ack:{agent_id}    - SADD for message acknowledgement tracking
  acp:registry          - HSET for agent registry
  acp:broadcast          - Pub/Sub channel + per-agent inbox push
  acp:dlq               - Dead-letter queue + audit trail

Tests: 41 tests covering send/receive/ack/broadcast/registry/queue/FIFO ordering/edge cases.

Refs: Epic #373 (Architecture overhaul)
2026-04-16 23:12:48 +00:00
2 changed files with 1430 additions and 0 deletions

View File

@@ -0,0 +1,877 @@
#!/usr/bin/env python3
"""
ACP Redis Transport Layer
Agent Communication Protocol transport using Redis queues and Pub/Sub.
Replaces tmux send-keys with Redis-backed message passing.
Provides reliable, persistent, and scalable inter-agent communication.
Queue schema:
acp:inbox:{agent_id} — agent inbox (list, BRPOP)
acp:ack:{agent_id} — acknowledged message IDs (set, for dedup)
acp:registry — agent registry (hash: agent_id → json)
acp:broadcast — Pub/Sub channel for broadcasts
acp:dlq — dead-letter queue for failed messages
Usage:
transport = RedisTransport(redis_url="redis://localhost:6379/0")
transport.register_agent("allegro-primary", queue="forge:inbox:allegro-primary")
transport.send({"to": "ezra-primary", "from": "allegro-primary", "type": "request", "payload": {"task": "review"}})
msg = transport.receive("ezra-primary", timeout=5)
transport.ack("ezra-primary", msg["id"])
"""
import json
import os
import time
import uuid
from datetime import datetime, timezone
from typing import Any, Optional
import redis
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
QUEUE_PREFIX = "acp:inbox:"
ACK_PREFIX = "acp:ack:"
REGISTRY_KEY = "acp:registry"
BROADCAST_CHANNEL = "acp:broadcast"
DLQ_KEY = "acp:dlq"
DEFAULT_TTL = 3600 # 1 hour
DEFAULT_REDIS_URL = "redis://localhost:6379/0"
DEFAULT_TIMEOUT = 5 # seconds for BRPOP
# ---------------------------------------------------------------------------
# ACP Message Schema
# ---------------------------------------------------------------------------
ACP_MESSAGE_FIELDS = {
"id": str, # auto-generated if missing
"from": str, # sender agent_id
"to": str, # target agent_id or "*" for broadcast
"type": str, # request | response | broadcast | alert | ack
"payload": dict, # message body
"reply_to": str, # optional: original message id
"timestamp": str, # ISO-8601 timestamp
"via": str, # transport name ("redis")
"ttl": int, # seconds to live
"signature": str, # optional: HMAC signature
}
# ---------------------------------------------------------------------------
# RedisTransport
# ---------------------------------------------------------------------------
class RedisTransport:
"""Redis-backed transport for ACP agent communication."""
def __init__(self, redis_url: str = None, redis_client: "redis.Redis|None" = None):
"""
Initialize the Redis transport.
Args:
redis_url: Redis connection URL (default: ACP_REDIS_URL env or localhost:6379/0)
redis_client: Inject a pre-configured redis client (for testing)
"""
if redis_client is not None:
self._redis = redis_client
else:
url = redis_url or os.getenv("ACP_REDIS_URL", DEFAULT_REDIS_URL)
self._redis = redis.Redis.from_url(url, decode_responses=True)
self._pubsub = None
@property
def client(self) -> redis.Redis:
"""Access the underlying Redis client."""
return self._redis
# -----------------------------------------------------------------------
# Message Construction
# -----------------------------------------------------------------------
@staticmethod
def build_message(
to: str,
payload: dict,
from_agent: str = "system",
msg_type: str = "request",
reply_to: str = None,
ttl: int = DEFAULT_TTL,
signature: str = None,
) -> dict:
"""Build a normalized ACP message."""
msg_id = uuid.uuid4().hex[:16]
now = datetime.now(timezone.utc).isoformat()
return {
"id": msg_id,
"from": from_agent,
"to": to,
"type": msg_type,
"payload": payload,
"reply_to": reply_to,
"timestamp": now,
"via": "redis",
"ttl": ttl,
"signature": signature or "",
}
@staticmethod
def validate_message(msg: dict) -> list[str]:
"""
Validate an ACP message structure.
Returns:
List of validation errors (empty = valid).
"""
errors = []
if not isinstance(msg, dict):
return ["message must be a dict"]
if "to" not in msg:
errors.append("missing required field: to")
if "from" not in msg:
errors.append("missing required field: from")
if "payload" not in msg:
errors.append("missing required field: payload")
if "type" in msg and msg["type"] not in (
"request", "response", "broadcast", "alert", "ack"
):
errors.append(f"invalid type: {msg.get('type')}")
return errors
# -----------------------------------------------------------------------
# Core Transport Operations
# -----------------------------------------------------------------------
def send(self, message: dict) -> dict:
"""
Send an ACP message to the target agent's inbox queue.
Args:
message: ACP message dict (must have 'to' and 'payload')
Returns:
Dict with 'status', 'message_id', 'queue', 'timestamp'
Raises:
ValueError: If message is invalid
"""
errors = self.validate_message(message)
if errors:
raise ValueError(f"Invalid message: {'; '.join(errors)}")
# Auto-fill missing fields
msg = dict(message)
if "id" not in msg:
msg["id"] = uuid.uuid4().hex[:16]
if "timestamp" not in msg:
msg["timestamp"] = datetime.now(timezone.utc).isoformat()
if "via" not in msg:
msg["via"] = "redis"
if "ttl" not in msg:
msg["ttl"] = DEFAULT_TTL
target = msg["to"]
queue = f"{QUEUE_PREFIX}{target}"
# Push to target's inbox queue (LPUSH + BRPOP = FIFO)
self._redis.lpush(queue, json.dumps(msg))
# Set TTL on queue key if not broadcast
if target != "*":
self._redis.expire(queue, msg["ttl"] + 60)
# Log to DLQ audit trail
audit_entry = {
"action": "send",
"message_id": msg["id"],
"from": msg["from"],
"to": target,
"type": msg.get("type", "request"),
"timestamp": msg["timestamp"],
"queue": queue,
}
self._redis.lpush(f"{DLQ_KEY}:audit", json.dumps(audit_entry))
self._redis.ltrim(f"{DLQ_KEY}:audit", 0, 9999)
return {
"status": "sent",
"message_id": msg["id"],
"queue": queue,
"timestamp": msg["timestamp"],
}
def receive(self, agent_id: str, timeout: int = DEFAULT_TIMEOUT) -> Optional[dict]:
"""
Blocking receive from an agent's inbox queue.
Args:
agent_id: Agent to receive messages for
timeout: Seconds to block (0 = infinite)
Returns:
ACP message dict or None on timeout
"""
queue = f"{QUEUE_PREFIX}{agent_id}"
result = self._redis.brpop(queue, timeout=timeout)
if result is None:
return None
_, raw = result
message = json.loads(raw)
# Check TTL
ts = message.get("timestamp")
ttl = message.get("ttl", DEFAULT_TTL)
if ts:
msg_time = datetime.fromisoformat(ts).timestamp()
if time.time() - msg_time > ttl:
# Expired — send to DLQ
message["_expired"] = True
self._redis.lpush(DLQ_KEY, json.dumps(message))
return self.receive(agent_id, timeout=max(0, timeout - 1))
# Update last_seen for the agent
self._update_heartbeat(agent_id)
return message
def receive_nowait(self, agent_id: str) -> Optional[dict]:
"""
Non-blocking receive. Returns None if no messages waiting.
"""
queue = f"{QUEUE_PREFIX}{agent_id}"
raw = self._redis.lpop(queue)
if raw is None:
return None
return json.loads(raw)
def ack(self, agent_id: str, message_id: str) -> dict:
"""
Acknowledge receipt of a message.
Args:
agent_id: Agent acknowledging the message
message_id: ID of the message being acknowledged
Returns:
Dict with 'status' and 'ack_id'
"""
ack_key = f"{ACK_PREFIX}{agent_id}"
ack_entry = {
"message_id": message_id,
"acked_by": agent_id,
"acked_at": datetime.now(timezone.utc).isoformat(),
}
# Store in ack set (with TTL)
self._redis.sadd(ack_key, message_id)
self._redis.expire(ack_key, DEFAULT_TTL)
# Log ack
self._redis.lpush(f"{DLQ_KEY}:audit", json.dumps({
"action": "ack",
"message_id": message_id,
"acked_by": agent_id,
"timestamp": ack_entry["acked_at"],
}))
self._redis.ltrim(f"{DLQ_KEY}:audit", 0, 9999)
return {"status": "acked", "ack_id": message_id}
def is_acked(self, agent_id: str, message_id: str) -> bool:
"""Check if a message has been acknowledged."""
return self._redis.sismember(f"{ACK_PREFIX}{agent_id}", message_id)
def broadcast(self, message: dict) -> dict:
"""
Broadcast a message to all registered agents via Pub/Sub.
Args:
message: ACP message (to field will be set to '*')
Returns:
Dict with 'status', 'message_id', 'channel'
"""
msg = dict(message)
msg["to"] = "*"
msg["type"] = msg.get("type", "broadcast")
if "id" not in msg:
msg["id"] = uuid.uuid4().hex[:16]
if "timestamp" not in msg:
msg["timestamp"] = datetime.now(timezone.utc).isoformat()
# Publish to broadcast channel
self._redis.publish(BROADCAST_CHANNEL, json.dumps(msg))
# Also push to each registered agent's inbox
agents = self.list_agents(status="active")
for agent in agents:
queue = f"{QUEUE_PREFIX}{agent['agent_id']}"
self._redis.lpush(queue, json.dumps(msg))
self._redis.expire(queue, msg.get("ttl", DEFAULT_TTL) + 60)
return {
"status": "broadcast",
"message_id": msg["id"],
"channel": BROADCAST_CHANNEL,
"recipients": len(agents),
}
# -----------------------------------------------------------------------
# Agent Registry
# -----------------------------------------------------------------------
def register_agent(
self,
agent_id: str,
queue: str = None,
metadata: dict = None,
ttl: int = 3600,
) -> dict:
"""
Register an agent in the registry with TTL auto-expiry.
Args:
agent_id: Unique agent identifier
queue: Queue path (default: acp:inbox:{agent_id})
metadata: Additional metadata dict
ttl: TTL in seconds (default: 3600)
Returns:
Dict with registration confirmation
"""
queue = queue or f"{QUEUE_PREFIX}{agent_id}"
now = datetime.now(timezone.utc).isoformat()
entry = {
"agent_id": agent_id,
"status": "active",
"queue": queue,
"registered_at": now,
"last_seen": now,
"metadata": json.dumps(metadata or {}),
"ttl": ttl,
}
self._redis.hset(REGISTRY_KEY, agent_id, json.dumps(entry))
self._redis.expire(REGISTRY_KEY, ttl * 2) # Registry key TTL
# Also set a per-agent TTL key for heartbeat tracking
heartbeat_key = f"acp:heartbeat:{agent_id}"
self._redis.setex(heartbeat_key, ttl, now)
return {"status": "registered", "agent_id": agent_id, "queue": queue, "ttl": ttl}
def unregister_agent(self, agent_id: str) -> dict:
"""Remove an agent from the registry."""
removed = self._redis.hdel(REGISTRY_KEY, agent_id)
self._redis.delete(f"acp:heartbeat:{agent_id}")
self._redis.delete(f"{ACK_PREFIX}{agent_id}")
return {"status": "unregistered" if removed else "not_found", "agent_id": agent_id}
def get_agent(self, agent_id: str) -> Optional[dict]:
"""Get agent info from registry."""
raw = self._redis.hget(REGISTRY_KEY, agent_id)
if raw is None:
return None
entry = json.loads(raw)
# Check heartbeat TTL for liveness
heartbeat_key = f"acp:heartbeat:{agent_id}"
if self._redis.exists(heartbeat_key):
entry["status"] = "active"
else:
entry["status"] = "stale"
return entry
def list_agents(self, status: str = None) -> list[dict]:
"""
List all registered agents.
Args:
status: Filter by status ('active', 'stale', or None for all)
Returns:
List of agent info dicts
"""
all_entries = self._redis.hgetall(REGISTRY_KEY)
agents = []
for agent_id, raw in all_entries.items():
entry = json.loads(raw)
# Refresh status from heartbeat
heartbeat_key = f"acp:heartbeat:{agent_id}"
if self._redis.exists(heartbeat_key):
entry["status"] = "active"
else:
entry["status"] = "stale"
if status is None or entry["status"] == status:
agents.append(entry)
return sorted(agents, key=lambda a: a.get("registered_at", ""))
def _update_heartbeat(self, agent_id: str) -> None:
"""Update agent's last_seen timestamp and refresh heartbeat TTL."""
heartbeat_key = f"acp:heartbeat:{agent_id}"
now = datetime.now(timezone.utc).isoformat()
# Get current TTL from registry entry
raw = self._redis.hget(REGISTRY_KEY, agent_id)
if raw:
entry = json.loads(raw)
ttl = entry.get("ttl", DEFAULT_TTL)
entry["last_seen"] = now
self._redis.hset(REGISTRY_KEY, agent_id, json.dumps(entry))
self._redis.setex(heartbeat_key, ttl, now)
# -----------------------------------------------------------------------
# Queue Management
# -----------------------------------------------------------------------
def queue_length(self, agent_id: str) -> int:
"""Get the number of pending messages for an agent."""
return self._redis.llen(f"{QUEUE_PREFIX}{agent_id}")
def queue_peek(self, agent_id: str, count: int = 10) -> list[dict]:
"""Peek at messages in queue without consuming them."""
queue = f"{QUEUE_PREFIX}{agent_id}"
raw_messages = self._redis.lrange(queue, 0, count - 1)
return [json.loads(m) for m in raw_messages]
def queue_purge(self, agent_id: str) -> int:
"""Purge all messages from an agent's inbox. Returns count purged."""
queue = f"{QUEUE_PREFIX}{agent_id}"
length = self._redis.llen(queue)
self._redis.delete(queue)
return length
def dlq_contents(self, limit: int = 50) -> list[dict]:
"""Get dead-letter queue contents."""
raw = self._redis.lrange(DLQ_KEY, 0, limit - 1)
return [json.loads(m) for m in raw]
def audit_log(self, limit: int = 100) -> list[dict]:
"""Get recent audit log entries."""
raw = self._redis.lrange(f"{DLQ_KEY}:audit", 0, limit - 1)
return [json.loads(m) for m in raw]
# -----------------------------------------------------------------------
# Pub/Sub Subscriber
# -----------------------------------------------------------------------
def subscribe_broadcast(self, agent_id: str) -> "BroadcastSubscriber":
"""
Subscribe to broadcast messages for an agent.
Returns a BroadcastSubscriber context manager that yields messages.
"""
return BroadcastSubscriber(self._redis, agent_id)
# -----------------------------------------------------------------------
# Health / Diagnostics
# -----------------------------------------------------------------------
def health(self) -> dict:
"""Health check — verifies Redis connectivity and returns stats."""
try:
pong = self._redis.ping()
agents = self.list_agents()
return {
"status": "healthy" if pong else "unreachable",
"redis_ping": pong,
"agents_registered": len(agents),
"agents_active": len([a for a in agents if a["status"] == "active"]),
"timestamp": datetime.now(timezone.utc).isoformat(),
}
except redis.ConnectionError as e:
return {
"status": "unhealthy",
"error": str(e),
"timestamp": datetime.now(timezone.utc).isoformat(),
}
# ---------------------------------------------------------------------------
# Broadcast Subscriber (Pub/Sub context manager)
# ---------------------------------------------------------------------------
class BroadcastSubscriber:
"""Context manager for subscribing to ACP broadcast channel."""
def __init__(self, redis_client: redis.Redis, agent_id: str):
self._redis = redis_client
self._agent_id = agent_id
self._pubsub = None
def __enter__(self):
self._pubsub = self._redis.pubsub()
self._pubsub.subscribe(BROADCAST_CHANNEL)
return self
def __exit__(self, *args):
if self._pubsub:
self._pubsub.unsubscribe()
self._pubsub.close()
def listen(self, timeout: float = 5.0) -> Optional[dict]:
"""
Listen for the next broadcast message.
Args:
timeout: Seconds to wait (None = forever)
Returns:
ACP message dict or None on timeout
"""
if not self._pubsub:
raise RuntimeError("Not in context manager")
msg = self._pubsub.get_message(timeout=timeout, ignore_subscribe_messages=True)
if msg and msg["type"] == "message":
return json.loads(msg["data"])
return None
# ---------------------------------------------------------------------------
# Transport Interface (abstract base)
# ---------------------------------------------------------------------------
class TransportInterface:
"""
Abstract transport interface that all ACP transports must implement.
This serves as documentation and reference — Python duck typing is used.
"""
def send(self, message: dict) -> dict:
"""Send a message. Returns delivery receipt."""
raise NotImplementedError
def receive(self, agent_id: str, timeout: int = 5) -> Optional[dict]:
"""Receive next message for agent. Returns message or None."""
raise NotImplementedError
def ack(self, agent_id: str, message_id: str) -> dict:
"""Acknowledge a received message."""
raise NotImplementedError
def broadcast(self, message: dict) -> dict:
"""Broadcast to all agents."""
raise NotImplementedError
def register_agent(self, agent_id: str, **kwargs) -> dict:
"""Register an agent."""
raise NotImplementedError
def unregister_agent(self, agent_id: str) -> dict:
"""Unregister an agent."""
raise NotImplementedError
def list_agents(self, **kwargs) -> list[dict]:
"""List registered agents."""
raise NotImplementedError
# ---------------------------------------------------------------------------
# Tmux Fallback Transport
# ---------------------------------------------------------------------------
class TmuxTransport:
"""
Fallback transport using tmux send-keys.
Preserved for backward compatibility when Redis is unavailable.
"""
def __init__(self):
self._agents: dict[str, dict] = {}
def send(self, message: dict) -> dict:
"""Send via tmux send-keys (fallback)."""
import subprocess
target = message.get("to", "")
pane = message.get("pane", target)
payload = json.dumps(message)
cmd = ["tmux", "send-keys", "-t", pane, payload, "Enter"]
try:
subprocess.run(cmd, check=True, capture_output=True, timeout=5)
return {
"status": "sent",
"message_id": message.get("id", "tmux-fallback"),
"via": "tmux",
"timestamp": datetime.now(timezone.utc).isoformat(),
}
except (subprocess.CalledProcessError, FileNotFoundError) as e:
return {
"status": "failed",
"error": str(e),
"via": "tmux",
}
def receive(self, agent_id: str, timeout: int = 5) -> Optional[dict]:
"""Tmux fallback cannot do blocking receive."""
return None
def ack(self, agent_id: str, message_id: str) -> dict:
return {"status": "acked", "ack_id": message_id, "via": "tmux"}
def broadcast(self, message: dict) -> dict:
return {"status": "unsupported", "via": "tmux", "error": "broadcast not supported in tmux mode"}
def register_agent(self, agent_id: str, **kwargs) -> dict:
self._agents[agent_id] = {"status": "active", "via": "tmux"}
return {"status": "registered", "agent_id": agent_id, "via": "tmux"}
def unregister_agent(self, agent_id: str) -> dict:
self._agents.pop(agent_id, None)
return {"status": "unregistered", "agent_id": agent_id, "via": "tmux"}
def list_agents(self, **kwargs) -> list[dict]:
return [{"agent_id": k, **v} for k, v in self._agents.items()]
# ---------------------------------------------------------------------------
# Transport Router (Redis primary, tmux fallback)
# ---------------------------------------------------------------------------
class TransportRouter:
"""
Routes ACP messages through Redis (primary) or tmux (fallback).
Selection:
- Explicit: message['via'] == 'redis' | 'tmux'
- Auto: try Redis first, fall back to tmux on connection error
"""
def __init__(self, redis_url: str = None):
self.redis_transport = RedisTransport(redis_url=redis_url)
self.tmux_transport = TmuxTransport()
self._redis_available = True
def _check_redis(self) -> bool:
"""Check if Redis is reachable."""
try:
self._redis_transport.client.ping()
self._redis_available = True
return True
except redis.ConnectionError:
self._redis_available = False
return False
def route(self, message: dict) -> dict:
"""
Route message through best available transport.
Priority: redis (primary) → tmux (fallback)
"""
via = message.get("via")
if via == "redis":
return self.redis_transport.send(message)
elif via == "tmux":
return self.tmux_transport.send(message)
# Auto-route: prefer Redis
if self._check_redis():
message["via"] = "redis"
return self.redis_transport.send(message)
else:
message["via"] = "tmux"
return self.tmux_transport.send(message)
def get_transport(self, via: str = None):
"""Get a specific transport by name."""
if via == "redis":
return self.redis_transport
elif via == "tmux":
return self.tmux_transport
# Auto
if self._check_redis():
return self.redis_transport
return self.tmux_transport
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
"""CLI entry point for ACP Redis transport operations."""
import argparse
import sys
parser = argparse.ArgumentParser(
prog="acp",
description="ACP Bridge — Agent Communication Protocol (Redis transport)",
)
subparsers = parser.add_subparsers(dest="command", help="Available commands")
# --- send ---
send_parser = subparsers.add_parser("send", help="Send an ACP message")
send_parser.add_argument("target", help="Target agent (e.g., agent:pane)")
send_parser.add_argument("message", help="Message payload (JSON string or plain text)")
send_parser.add_argument("--via", choices=["redis", "tmux"], default="redis",
help="Transport to use (default: redis)")
send_parser.add_argument("--type", default="request", choices=["request", "response", "alert", "broadcast"],
help="Message type (default: request)")
send_parser.add_argument("--from", dest="sender", default="system",
help="Sender agent ID (default: system)")
send_parser.add_argument("--ttl", type=int, default=3600,
help="TTL in seconds (default: 3600)")
# --- register ---
reg_parser = subparsers.add_parser("register", help="Register an agent")
reg_parser.add_argument("agent_id", help="Agent identifier")
reg_parser.add_argument("--queue", help="Queue path (default: acp:inbox:{agent_id})")
reg_parser.add_argument("--ttl", type=int, default=3600, help="Registration TTL (default: 3600s)")
reg_parser.add_argument("--metadata", help="JSON metadata string")
# --- unregister ---
unreg_parser = subparsers.add_parser("unregister", help="Unregister an agent")
unreg_parser.add_argument("agent_id", help="Agent identifier")
# --- agents ---
agents_parser = subparsers.add_parser("agents", help="List registered agents")
agents_parser.add_argument("--status", choices=["active", "stale"], help="Filter by status")
# --- receive ---
recv_parser = subparsers.add_parser("receive", help="Receive messages for an agent")
recv_parser.add_argument("agent_id", help="Agent identifier")
recv_parser.add_argument("--timeout", type=int, default=5, help="Block timeout in seconds")
# --- broadcast ---
bcast_parser = subparsers.add_parser("broadcast", help="Broadcast to all agents")
bcast_parser.add_argument("message", help="Message payload")
bcast_parser.add_argument("--from", dest="sender", default="system")
bcast_parser.add_argument("--type", default="broadcast")
# --- health ---
subparsers.add_parser("health", help="Check transport health")
# --- queue ---
queue_parser = subparsers.add_parser("queue", help="Queue operations")
queue_parser.add_argument("agent_id", help="Agent identifier")
queue_parser.add_argument("action", choices=["length", "peek", "purge"],
help="Queue action")
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
transport = RedisTransport()
try:
if args.command == "send":
# Parse message payload
try:
payload = json.loads(args.message)
except (json.JSONDecodeError, TypeError):
payload = {"text": args.message}
msg = transport.build_message(
to=args.target,
payload=payload,
from_agent=args.sender,
msg_type=args.type,
ttl=args.ttl,
)
if args.via == "redis":
result = transport.send(msg)
else:
router = TransportRouter()
result = router.route(msg)
print(json.dumps(result, indent=2))
elif args.command == "register":
metadata = {}
if args.metadata:
try:
metadata = json.loads(args.metadata)
except json.JSONDecodeError:
print(f"Error: invalid JSON metadata", file=sys.stderr)
sys.exit(1)
result = transport.register_agent(
agent_id=args.agent_id,
queue=args.queue,
metadata=metadata,
ttl=args.ttl,
)
print(json.dumps(result, indent=2))
elif args.command == "unregister":
result = transport.unregister_agent(args.agent_id)
print(json.dumps(result, indent=2))
elif args.command == "agents":
agents = transport.list_agents(status=args.status)
if not agents:
print("No agents registered.")
else:
print(f"{'AGENT ID':<30} {'STATUS':<10} {'QUEUE':<40} {'LAST SEEN'}")
print("-" * 100)
for a in agents:
queue = a.get("queue", "")
status = a.get("status", "unknown")
last = a.get("last_seen", "never")
print(f"{a['agent_id']:<30} {status:<10} {queue:<40} {last}")
elif args.command == "receive":
msg = transport.receive(args.agent_id, timeout=args.timeout)
if msg:
print(json.dumps(msg, indent=2))
else:
print("No messages (timeout).", file=sys.stderr)
sys.exit(1)
elif args.command == "broadcast":
try:
payload = json.loads(args.message)
except (json.JSONDecodeError, TypeError):
payload = {"text": args.message}
msg = transport.build_message(
to="*",
payload=payload,
from_agent=args.sender,
msg_type=args.type,
)
result = transport.broadcast(msg)
print(json.dumps(result, indent=2))
elif args.command == "health":
result = transport.health()
print(json.dumps(result, indent=2))
sys.exit(0 if result.get("status") == "healthy" else 1)
elif args.command == "queue":
if args.action == "length":
length = transport.queue_length(args.agent_id)
print(json.dumps({"agent_id": args.agent_id, "queue_length": length}))
elif args.action == "peek":
msgs = transport.queue_peek(args.agent_id)
print(json.dumps(msgs, indent=2))
elif args.action == "purge":
count = transport.queue_purge(args.agent_id)
print(json.dumps({"agent_id": args.agent_id, "purged": count}))
except redis.ConnectionError as e:
print(f"Redis connection error: {e}", file=sys.stderr)
sys.exit(2)
except ValueError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
except KeyboardInterrupt:
sys.exit(130)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,553 @@
#!/usr/bin/env python3
"""
Tests for ACP Redis Transport Layer.
Uses fakeredis for isolated testing without a real Redis server.
Run: python -m pytest tests/test_acp_redis_transport.py -v
"""
import json
import os
import sys
import time
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch
import pytest
import fakeredis
# Add parent dir for imports
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "scripts"))
from acp_redis_transport import (
RedisTransport,
TmuxTransport,
TransportRouter,
BroadcastSubscriber,
QUEUE_PREFIX,
ACK_PREFIX,
REGISTRY_KEY,
BROADCAST_CHANNEL,
DLQ_KEY,
DEFAULT_TTL,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def fake_redis():
"""Create a fakeredis instance for testing."""
r = fakeredis.FakeRedis(decode_responses=True)
return r
@pytest.fixture
def transport(fake_redis):
"""Create a RedisTransport with fake Redis."""
return RedisTransport(redis_client=fake_redis)
@pytest.fixture
def registered_transport(transport):
"""Transport with two registered agents."""
transport.register_agent("allegro-primary", queue="forge:inbox:allegro-primary")
transport.register_agent("ezra-primary", queue="forge:inbox:ezra-primary")
return transport
# ===========================================================================
# 1. Message Construction & Validation (5 tests)
# ===========================================================================
class TestMessageConstruction:
def test_build_message_defaults(self, transport):
"""Test that build_message fills in all defaults."""
msg = transport.build_message(
to="ezra-primary",
payload={"task": "review PR #123"},
)
assert msg["to"] == "ezra-primary"
assert msg["from"] == "system"
assert msg["type"] == "request"
assert msg["payload"] == {"task": "review PR #123"}
assert msg["via"] == "redis"
assert msg["ttl"] == DEFAULT_TTL
assert len(msg["id"]) == 16
assert "T" in msg["timestamp"] # ISO format
def test_build_message_custom(self, transport):
"""Test custom message fields."""
msg = transport.build_message(
to="allegro-primary",
payload={"code": "diff"},
from_agent="ezra-primary",
msg_type="response",
reply_to="abc123",
ttl=60,
signature="sig456",
)
assert msg["from"] == "ezra-primary"
assert msg["type"] == "response"
assert msg["reply_to"] == "abc123"
assert msg["ttl"] == 60
assert msg["signature"] == "sig456"
def test_validate_valid_message(self, transport):
"""Validate a well-formed message."""
msg = {"to": "agent", "from": "system", "payload": {}, "type": "request"}
errors = transport.validate_message(msg)
assert errors == []
def test_validate_missing_fields(self, transport):
"""Validate catches missing required fields."""
errors = transport.validate_message({"to": "x"})
assert "missing required field: from" in errors
assert "missing required field: payload" in errors
def test_validate_invalid_type(self, transport):
"""Validate catches invalid message type."""
msg = {"to": "x", "from": "y", "payload": {}, "type": "explode"}
errors = transport.validate_message(msg)
assert any("invalid type" in e for e in errors)
# ===========================================================================
# 2. Send Operations (4 tests)
# ===========================================================================
class TestSendOperations:
def test_send_basic(self, registered_transport):
"""Send a message and verify it appears in target queue."""
t = registered_transport
msg = t.build_message(
to="ezra-primary",
payload={"task": "review"},
from_agent="allegro-primary",
)
result = t.send(msg)
assert result["status"] == "sent"
assert "message_id" in result
assert result["queue"] == f"{QUEUE_PREFIX}ezra-primary"
def test_send_invalid_raises(self, transport):
"""Sending invalid message raises ValueError."""
with pytest.raises(ValueError, match="Invalid message"):
transport.send({"to": "x"}) # missing from, payload
def test_send_fills_missing_fields(self, transport):
"""Send auto-fills id, timestamp, via."""
transport.register_agent("target-agent")
msg = {"to": "target-agent", "from": "src", "payload": {"k": "v"}}
result = transport.send(msg)
assert result["status"] == "sent"
# Verify in queue
peeked = transport.queue_peek("target-agent")
assert len(peeked) == 1
assert "id" in peeked[0]
assert "timestamp" in peeked[0]
def test_send_increments_audit(self, transport):
"""Sending creates an audit log entry."""
transport.register_agent("audit-target")
transport.send({"to": "audit-target", "from": "src", "payload": {}})
audit = transport.audit_log(limit=5)
send_entries = [a for a in audit if a["action"] == "send"]
assert len(send_entries) >= 1
# ===========================================================================
# 3. Receive Operations (3 tests)
# ===========================================================================
class TestReceiveOperations:
def test_receive_returns_sent_message(self, registered_transport):
"""Send then receive should return the same message."""
t = registered_transport
msg = t.build_message(to="ezra-primary", payload={"work": True}, from_agent="allegro")
t.send(msg)
received = t.receive("ezra-primary", timeout=1)
assert received is not None
assert received["payload"]["work"] is True
assert received["to"] == "ezra-primary"
def test_receive_timeout_returns_none(self, transport):
"""Receiving from empty queue returns None after timeout."""
transport.register_agent("lonely-agent")
# Use nowait which does lpop instead of brpop
result = transport.receive_nowait("lonely-agent")
assert result is None
def test_receive_with_short_timeout(self, transport):
"""Receiving with short timeout returns None on empty queue."""
transport.register_agent("lonely-agent-2")
# Mock brpop to return None (simulating timeout)
with patch.object(transport._redis, 'brpop', return_value=None):
result = transport.receive("lonely-agent-2", timeout=1)
assert result is None
def test_receive_nowait(self, transport):
"""Non-blocking receive works correctly."""
transport.register_agent("nb-agent")
assert transport.receive_nowait("nb-agent") is None
transport.send({"to": "nb-agent", "from": "x", "payload": {"n": 1}})
msg = transport.receive_nowait("nb-agent")
assert msg is not None
assert msg["payload"]["n"] == 1
# ===========================================================================
# 4. Acknowledgement (3 tests)
# ===========================================================================
class TestAcknowledgement:
def test_ack_basic(self, registered_transport):
"""Acknowledge a message."""
t = registered_transport
t.send({"to": "ezra-primary", "from": "allegro", "payload": {"id": 1}})
msg = t.receive("ezra-primary", timeout=1)
result = t.ack("ezra-primary", msg["id"])
assert result["status"] == "acked"
assert result["ack_id"] == msg["id"]
def test_is_acked_true(self, registered_transport):
"""is_acked returns True after ack."""
t = registered_transport
t.send({"to": "ezra-primary", "from": "allegro", "payload": {}})
msg = t.receive("ezra-primary", timeout=1)
assert not t.is_acked("ezra-primary", msg["id"])
t.ack("ezra-primary", msg["id"])
assert t.is_acked("ezra-primary", msg["id"])
def test_ack_logs_audit(self, transport):
"""Acknowledgement creates audit entry."""
transport.register_agent("ack-agent")
transport.send({"to": "ack-agent", "from": "x", "payload": {}})
msg = transport.receive("ack-agent", timeout=1)
transport.ack("ack-agent", msg["id"])
audit = transport.audit_log(limit=10)
ack_entries = [a for a in audit if a["action"] == "ack"]
assert len(ack_entries) >= 1
# ===========================================================================
# 5. Agent Registry (5 tests)
# ===========================================================================
class TestAgentRegistry:
def test_register_agent(self, transport):
"""Register a new agent."""
result = transport.register_agent("allegro-v2", queue="custom:queue")
assert result["status"] == "registered"
assert result["agent_id"] == "allegro-v2"
assert result["queue"] == "custom:queue"
def test_get_agent(self, transport):
"""Retrieve registered agent info."""
transport.register_agent("get-test", metadata={"version": "2.0"})
agent = transport.get_agent("get-test")
assert agent is not None
assert agent["agent_id"] == "get-test"
assert agent["status"] == "active"
def test_get_nonexistent_agent(self, transport):
"""Getting unregistered agent returns None."""
assert transport.get_agent("ghost-agent") is None
def test_list_agents(self, transport):
"""List all registered agents."""
transport.register_agent("a1")
transport.register_agent("a2")
transport.register_agent("a3")
agents = transport.list_agents()
ids = [a["agent_id"] for a in agents]
assert "a1" in ids
assert "a2" in ids
assert "a3" in ids
def test_unregister_agent(self, transport):
"""Unregister removes the agent."""
transport.register_agent("temp-agent")
assert transport.get_agent("temp-agent") is not None
result = transport.unregister_agent("temp-agent")
assert result["status"] == "unregistered"
assert transport.get_agent("temp-agent") is None
# ===========================================================================
# 6. Broadcast (2 tests)
# ===========================================================================
class TestBroadcast:
def test_broadcast_to_all_agents(self, transport):
"""Broadcast pushes to all registered agents."""
transport.register_agent("b1")
transport.register_agent("b2")
transport.register_agent("b3")
msg = transport.build_message(
to="*",
payload={"announcement": "system update"},
from_agent="admin",
msg_type="broadcast",
)
result = transport.broadcast(msg)
assert result["status"] == "broadcast"
assert result["recipients"] == 3
# Each agent should have the broadcast in their queue
for aid in ["b1", "b2", "b3"]:
received = transport.receive_nowait(aid)
assert received is not None
assert received["to"] == "*"
assert received["payload"]["announcement"] == "system update"
def test_broadcast_to_empty_registry(self, transport):
"""Broadcast with no registered agents succeeds (0 recipients)."""
msg = transport.build_message(to="*", payload={"info": "test"})
result = transport.broadcast(msg)
assert result["recipients"] == 0
# ===========================================================================
# 7. Queue Management (3 tests)
# ===========================================================================
class TestQueueManagement:
def test_queue_length(self, transport):
"""Queue length reflects pending messages."""
transport.register_agent("ql-agent")
assert transport.queue_length("ql-agent") == 0
for i in range(5):
transport.send({"to": "ql-agent", "from": "x", "payload": {"i": i}})
assert transport.queue_length("ql-agent") == 5
def test_queue_peek(self, transport):
"""Peek shows messages without consuming."""
transport.register_agent("peek-agent")
transport.send({"to": "peek-agent", "from": "x", "payload": {"a": 1}})
transport.send({"to": "peek-agent", "from": "x", "payload": {"b": 2}})
peeked = transport.queue_peek("peek-agent", count=10)
assert len(peeked) == 2
# Peek should not consume
assert transport.queue_length("peek-agent") == 2
def test_queue_purge(self, transport):
"""Purge clears the queue."""
transport.register_agent("purge-agent")
for i in range(3):
transport.send({"to": "purge-agent", "from": "x", "payload": {"i": i}})
count = transport.queue_purge("purge-agent")
assert count == 3
assert transport.queue_length("purge-agent") == 0
# ===========================================================================
# 8. Health & Diagnostics (2 tests)
# ===========================================================================
class TestHealth:
def test_health_healthy(self, transport):
"""Health check returns healthy when Redis is reachable."""
result = transport.health()
assert result["status"] == "healthy"
assert result["redis_ping"] is True
assert "timestamp" in result
def test_health_with_agents(self, transport):
"""Health includes agent counts."""
transport.register_agent("health-agent-1")
transport.register_agent("health-agent-2")
result = transport.health()
assert result["agents_registered"] >= 2
assert result["agents_active"] >= 2
# ===========================================================================
# 9. TmuxTransport Fallback (2 tests)
# ===========================================================================
class TestTmuxTransport:
def test_tmux_register_and_list(self):
"""Tmux transport supports register/list."""
t = TmuxTransport()
t.register_agent("tmux-agent-1")
t.register_agent("tmux-agent-2")
agents = t.list_agents()
assert len(agents) == 2
def test_tmux_broadcast_unsupported(self):
"""Tmux transport returns unsupported for broadcast."""
t = TmuxTransport()
result = t.broadcast({"to": "*", "from": "x", "payload": {}})
assert result["status"] == "unsupported"
def test_tmux_ack(self):
"""Tmux transport ack is a no-op."""
t = TmuxTransport()
result = t.ack("agent", "msg-id")
assert result["status"] == "acked"
assert result["via"] == "tmux"
# ===========================================================================
# 10. TransportRouter (2 tests)
# ===========================================================================
class TestTransportRouter:
def test_router_auto_selects_redis(self, fake_redis):
"""Router prefers Redis when available."""
transport = RedisTransport(redis_client=fake_redis)
with patch.object(TransportRouter, "__init__", lambda self, redis_url=None: None):
router = TransportRouter.__new__(TransportRouter)
router.redis_transport = transport
router.tmux_transport = TmuxTransport()
router._redis_available = True
msg = transport.build_message(to="test-agent", payload={"k": "v"})
msg["to"] = "test-agent"
transport.register_agent("test-agent")
# Direct send via redis transport
result = transport.send(msg)
assert result["status"] == "sent"
def test_router_explicit_tmux(self):
"""Router respects explicit via=tmux."""
with patch.object(TransportRouter, "__init__", lambda self, redis_url=None: None):
router = TransportRouter.__new__(TransportRouter)
router.redis_transport = MagicMock()
router.tmux_transport = TmuxTransport()
router._redis_available = True
router.tmux_transport.register_agent("tmux-target")
msg = {"to": "tmux-target", "from": "x", "payload": {"k": 1}, "via": "tmux"}
# TmuxTransport.send will try tmux send-keys which won't work in test,
# but we can verify it was called by checking the tmux transport
result = router.tmux_transport.send(msg)
assert result["status"] in ("sent", "failed")
# ===========================================================================
# 11. Dead Letter Queue (1 test)
# ===========================================================================
class TestDeadLetter:
def test_dlq_contents_empty(self, transport):
"""DLQ starts empty."""
dlq = transport.dlq_contents()
assert isinstance(dlq, list)
# ===========================================================================
# 12. Audit Log (2 tests)
# ===========================================================================
class TestAuditLog:
def test_audit_log_captures_actions(self, transport):
"""Audit log records send, ack, and register actions."""
transport.register_agent("audit-test")
transport.send({"to": "audit-test", "from": "x", "payload": {}})
msg = transport.receive("audit-test", timeout=1)
transport.ack("audit-test", msg["id"])
audit = transport.audit_log(limit=50)
actions = [a["action"] for a in audit]
assert "send" in actions
assert "ack" in actions
def test_audit_log_size_limit(self, transport):
"""Audit log is trimmed to 10000 entries."""
# Verify ltrim is called (can't easily test 10000 entries)
transport.register_agent("trim-test")
transport.send({"to": "trim-test", "from": "x", "payload": {}})
audit = transport.audit_log(limit=1)
assert len(audit) >= 1
# ===========================================================================
# 13. Multi-Message Ordering (1 test)
# ===========================================================================
class TestOrdering:
def test_fifo_order(self, transport):
"""Messages arrive in FIFO order."""
transport.register_agent("fifo-agent")
for i in range(10):
transport.send({"to": "fifo-agent", "from": "x", "payload": {"seq": i}})
for i in range(10):
msg = transport.receive("fifo-agent", timeout=1)
assert msg["payload"]["seq"] == i
# ===========================================================================
# 14. Edge Cases (3 tests)
# ===========================================================================
class TestEdgeCases:
def test_send_to_self(self, transport):
"""Agent can send messages to itself."""
transport.register_agent("self-agent")
msg = transport.build_message(
to="self-agent",
payload={"echo": True},
from_agent="self-agent",
)
transport.send(msg)
received = transport.receive("self-agent", timeout=1)
assert received["from"] == "self-agent"
assert received["to"] == "self-agent"
def test_empty_payload(self, transport):
"""Messages with empty payload are valid."""
transport.register_agent("empty-target")
result = transport.send({"to": "empty-target", "from": "x", "payload": {}})
assert result["status"] == "sent"
msg = transport.receive("empty-target", timeout=1)
assert msg["payload"] == {}
def test_special_chars_in_agent_id(self, transport):
"""Agent IDs with special chars work."""
special_id = "agent:v2.1@forge-01"
result = transport.register_agent(special_id)
assert result["status"] == "registered"
assert transport.get_agent(special_id) is not None
# ===========================================================================
# 15. Pub/Sub Subscriber (1 test)
# ===========================================================================
class TestPubSubSubscriber:
def test_broadcast_subscriber_context_manager(self, transport):
"""BroadcastSubscriber can be used as context manager."""
sub = transport.subscribe_broadcast("test-agent")
# Just verify it creates without error
assert sub is not None
assert hasattr(sub, "listen")