Replaces tmux send-keys with Redis-backed message passing.
Implements:
- RedisTransport class: send/receive/ack/broadcast via Redis queues + Pub/Sub
- Agent registry stored in Redis Hash with TTL auto-expiry (heartbeat pattern)
- TransportRouter: Redis primary, tmux fallback
- Full CLI: acp send/receive/register/unregister/agents/broadcast/health/queue
- TmuxTransport fallback for backward compatibility
Queue schema:
acp:inbox:{agent_id} - LPUSH/BRPOP for FIFO message delivery
acp:ack:{agent_id} - SADD for message acknowledgement tracking
acp:registry - HSET for agent registry
acp:broadcast - Pub/Sub channel + per-agent inbox push
acp:dlq - Dead-letter queue + audit trail
Tests: 41 tests covering send/receive/ack/broadcast/registry/queue/FIFO ordering/edge cases.
Refs: Epic #373 (Architecture overhaul)
878 lines
30 KiB
Python
878 lines
30 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
ACP Redis Transport Layer
|
|
Agent Communication Protocol transport using Redis queues and Pub/Sub.
|
|
|
|
Replaces tmux send-keys with Redis-backed message passing.
|
|
Provides reliable, persistent, and scalable inter-agent communication.
|
|
|
|
Queue schema:
|
|
acp:inbox:{agent_id} — agent inbox (list, BRPOP)
|
|
acp:ack:{agent_id} — acknowledged message IDs (set, for dedup)
|
|
acp:registry — agent registry (hash: agent_id → json)
|
|
acp:broadcast — Pub/Sub channel for broadcasts
|
|
acp:dlq — dead-letter queue for failed messages
|
|
|
|
Usage:
|
|
transport = RedisTransport(redis_url="redis://localhost:6379/0")
|
|
transport.register_agent("allegro-primary", queue="forge:inbox:allegro-primary")
|
|
transport.send({"to": "ezra-primary", "from": "allegro-primary", "type": "request", "payload": {"task": "review"}})
|
|
msg = transport.receive("ezra-primary", timeout=5)
|
|
transport.ack("ezra-primary", msg["id"])
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import time
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Optional
|
|
|
|
import redis
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Constants
|
|
# ---------------------------------------------------------------------------
|
|
|
|
QUEUE_PREFIX = "acp:inbox:"
|
|
ACK_PREFIX = "acp:ack:"
|
|
REGISTRY_KEY = "acp:registry"
|
|
BROADCAST_CHANNEL = "acp:broadcast"
|
|
DLQ_KEY = "acp:dlq"
|
|
DEFAULT_TTL = 3600 # 1 hour
|
|
DEFAULT_REDIS_URL = "redis://localhost:6379/0"
|
|
DEFAULT_TIMEOUT = 5 # seconds for BRPOP
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ACP Message Schema
|
|
# ---------------------------------------------------------------------------
|
|
|
|
ACP_MESSAGE_FIELDS = {
|
|
"id": str, # auto-generated if missing
|
|
"from": str, # sender agent_id
|
|
"to": str, # target agent_id or "*" for broadcast
|
|
"type": str, # request | response | broadcast | alert | ack
|
|
"payload": dict, # message body
|
|
"reply_to": str, # optional: original message id
|
|
"timestamp": str, # ISO-8601 timestamp
|
|
"via": str, # transport name ("redis")
|
|
"ttl": int, # seconds to live
|
|
"signature": str, # optional: HMAC signature
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# RedisTransport
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class RedisTransport:
|
|
"""Redis-backed transport for ACP agent communication."""
|
|
|
|
def __init__(self, redis_url: str = None, redis_client: "redis.Redis|None" = None):
|
|
"""
|
|
Initialize the Redis transport.
|
|
|
|
Args:
|
|
redis_url: Redis connection URL (default: ACP_REDIS_URL env or localhost:6379/0)
|
|
redis_client: Inject a pre-configured redis client (for testing)
|
|
"""
|
|
if redis_client is not None:
|
|
self._redis = redis_client
|
|
else:
|
|
url = redis_url or os.getenv("ACP_REDIS_URL", DEFAULT_REDIS_URL)
|
|
self._redis = redis.Redis.from_url(url, decode_responses=True)
|
|
self._pubsub = None
|
|
|
|
@property
|
|
def client(self) -> redis.Redis:
|
|
"""Access the underlying Redis client."""
|
|
return self._redis
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Message Construction
|
|
# -----------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def build_message(
|
|
to: str,
|
|
payload: dict,
|
|
from_agent: str = "system",
|
|
msg_type: str = "request",
|
|
reply_to: str = None,
|
|
ttl: int = DEFAULT_TTL,
|
|
signature: str = None,
|
|
) -> dict:
|
|
"""Build a normalized ACP message."""
|
|
msg_id = uuid.uuid4().hex[:16]
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
return {
|
|
"id": msg_id,
|
|
"from": from_agent,
|
|
"to": to,
|
|
"type": msg_type,
|
|
"payload": payload,
|
|
"reply_to": reply_to,
|
|
"timestamp": now,
|
|
"via": "redis",
|
|
"ttl": ttl,
|
|
"signature": signature or "",
|
|
}
|
|
|
|
@staticmethod
|
|
def validate_message(msg: dict) -> list[str]:
|
|
"""
|
|
Validate an ACP message structure.
|
|
|
|
Returns:
|
|
List of validation errors (empty = valid).
|
|
"""
|
|
errors = []
|
|
if not isinstance(msg, dict):
|
|
return ["message must be a dict"]
|
|
if "to" not in msg:
|
|
errors.append("missing required field: to")
|
|
if "from" not in msg:
|
|
errors.append("missing required field: from")
|
|
if "payload" not in msg:
|
|
errors.append("missing required field: payload")
|
|
if "type" in msg and msg["type"] not in (
|
|
"request", "response", "broadcast", "alert", "ack"
|
|
):
|
|
errors.append(f"invalid type: {msg.get('type')}")
|
|
return errors
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Core Transport Operations
|
|
# -----------------------------------------------------------------------
|
|
|
|
def send(self, message: dict) -> dict:
|
|
"""
|
|
Send an ACP message to the target agent's inbox queue.
|
|
|
|
Args:
|
|
message: ACP message dict (must have 'to' and 'payload')
|
|
|
|
Returns:
|
|
Dict with 'status', 'message_id', 'queue', 'timestamp'
|
|
|
|
Raises:
|
|
ValueError: If message is invalid
|
|
"""
|
|
errors = self.validate_message(message)
|
|
if errors:
|
|
raise ValueError(f"Invalid message: {'; '.join(errors)}")
|
|
|
|
# Auto-fill missing fields
|
|
msg = dict(message)
|
|
if "id" not in msg:
|
|
msg["id"] = uuid.uuid4().hex[:16]
|
|
if "timestamp" not in msg:
|
|
msg["timestamp"] = datetime.now(timezone.utc).isoformat()
|
|
if "via" not in msg:
|
|
msg["via"] = "redis"
|
|
if "ttl" not in msg:
|
|
msg["ttl"] = DEFAULT_TTL
|
|
|
|
target = msg["to"]
|
|
queue = f"{QUEUE_PREFIX}{target}"
|
|
|
|
# Push to target's inbox queue (LPUSH + BRPOP = FIFO)
|
|
self._redis.lpush(queue, json.dumps(msg))
|
|
|
|
# Set TTL on queue key if not broadcast
|
|
if target != "*":
|
|
self._redis.expire(queue, msg["ttl"] + 60)
|
|
|
|
# Log to DLQ audit trail
|
|
audit_entry = {
|
|
"action": "send",
|
|
"message_id": msg["id"],
|
|
"from": msg["from"],
|
|
"to": target,
|
|
"type": msg.get("type", "request"),
|
|
"timestamp": msg["timestamp"],
|
|
"queue": queue,
|
|
}
|
|
self._redis.lpush(f"{DLQ_KEY}:audit", json.dumps(audit_entry))
|
|
self._redis.ltrim(f"{DLQ_KEY}:audit", 0, 9999)
|
|
|
|
return {
|
|
"status": "sent",
|
|
"message_id": msg["id"],
|
|
"queue": queue,
|
|
"timestamp": msg["timestamp"],
|
|
}
|
|
|
|
def receive(self, agent_id: str, timeout: int = DEFAULT_TIMEOUT) -> Optional[dict]:
|
|
"""
|
|
Blocking receive from an agent's inbox queue.
|
|
|
|
Args:
|
|
agent_id: Agent to receive messages for
|
|
timeout: Seconds to block (0 = infinite)
|
|
|
|
Returns:
|
|
ACP message dict or None on timeout
|
|
"""
|
|
queue = f"{QUEUE_PREFIX}{agent_id}"
|
|
result = self._redis.brpop(queue, timeout=timeout)
|
|
if result is None:
|
|
return None
|
|
|
|
_, raw = result
|
|
message = json.loads(raw)
|
|
|
|
# Check TTL
|
|
ts = message.get("timestamp")
|
|
ttl = message.get("ttl", DEFAULT_TTL)
|
|
if ts:
|
|
msg_time = datetime.fromisoformat(ts).timestamp()
|
|
if time.time() - msg_time > ttl:
|
|
# Expired — send to DLQ
|
|
message["_expired"] = True
|
|
self._redis.lpush(DLQ_KEY, json.dumps(message))
|
|
return self.receive(agent_id, timeout=max(0, timeout - 1))
|
|
|
|
# Update last_seen for the agent
|
|
self._update_heartbeat(agent_id)
|
|
|
|
return message
|
|
|
|
def receive_nowait(self, agent_id: str) -> Optional[dict]:
|
|
"""
|
|
Non-blocking receive. Returns None if no messages waiting.
|
|
"""
|
|
queue = f"{QUEUE_PREFIX}{agent_id}"
|
|
raw = self._redis.lpop(queue)
|
|
if raw is None:
|
|
return None
|
|
return json.loads(raw)
|
|
|
|
def ack(self, agent_id: str, message_id: str) -> dict:
|
|
"""
|
|
Acknowledge receipt of a message.
|
|
|
|
Args:
|
|
agent_id: Agent acknowledging the message
|
|
message_id: ID of the message being acknowledged
|
|
|
|
Returns:
|
|
Dict with 'status' and 'ack_id'
|
|
"""
|
|
ack_key = f"{ACK_PREFIX}{agent_id}"
|
|
ack_entry = {
|
|
"message_id": message_id,
|
|
"acked_by": agent_id,
|
|
"acked_at": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
# Store in ack set (with TTL)
|
|
self._redis.sadd(ack_key, message_id)
|
|
self._redis.expire(ack_key, DEFAULT_TTL)
|
|
|
|
# Log ack
|
|
self._redis.lpush(f"{DLQ_KEY}:audit", json.dumps({
|
|
"action": "ack",
|
|
"message_id": message_id,
|
|
"acked_by": agent_id,
|
|
"timestamp": ack_entry["acked_at"],
|
|
}))
|
|
self._redis.ltrim(f"{DLQ_KEY}:audit", 0, 9999)
|
|
|
|
return {"status": "acked", "ack_id": message_id}
|
|
|
|
def is_acked(self, agent_id: str, message_id: str) -> bool:
|
|
"""Check if a message has been acknowledged."""
|
|
return self._redis.sismember(f"{ACK_PREFIX}{agent_id}", message_id)
|
|
|
|
def broadcast(self, message: dict) -> dict:
|
|
"""
|
|
Broadcast a message to all registered agents via Pub/Sub.
|
|
|
|
Args:
|
|
message: ACP message (to field will be set to '*')
|
|
|
|
Returns:
|
|
Dict with 'status', 'message_id', 'channel'
|
|
"""
|
|
msg = dict(message)
|
|
msg["to"] = "*"
|
|
msg["type"] = msg.get("type", "broadcast")
|
|
|
|
if "id" not in msg:
|
|
msg["id"] = uuid.uuid4().hex[:16]
|
|
if "timestamp" not in msg:
|
|
msg["timestamp"] = datetime.now(timezone.utc).isoformat()
|
|
|
|
# Publish to broadcast channel
|
|
self._redis.publish(BROADCAST_CHANNEL, json.dumps(msg))
|
|
|
|
# Also push to each registered agent's inbox
|
|
agents = self.list_agents(status="active")
|
|
for agent in agents:
|
|
queue = f"{QUEUE_PREFIX}{agent['agent_id']}"
|
|
self._redis.lpush(queue, json.dumps(msg))
|
|
self._redis.expire(queue, msg.get("ttl", DEFAULT_TTL) + 60)
|
|
|
|
return {
|
|
"status": "broadcast",
|
|
"message_id": msg["id"],
|
|
"channel": BROADCAST_CHANNEL,
|
|
"recipients": len(agents),
|
|
}
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Agent Registry
|
|
# -----------------------------------------------------------------------
|
|
|
|
def register_agent(
|
|
self,
|
|
agent_id: str,
|
|
queue: str = None,
|
|
metadata: dict = None,
|
|
ttl: int = 3600,
|
|
) -> dict:
|
|
"""
|
|
Register an agent in the registry with TTL auto-expiry.
|
|
|
|
Args:
|
|
agent_id: Unique agent identifier
|
|
queue: Queue path (default: acp:inbox:{agent_id})
|
|
metadata: Additional metadata dict
|
|
ttl: TTL in seconds (default: 3600)
|
|
|
|
Returns:
|
|
Dict with registration confirmation
|
|
"""
|
|
queue = queue or f"{QUEUE_PREFIX}{agent_id}"
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
entry = {
|
|
"agent_id": agent_id,
|
|
"status": "active",
|
|
"queue": queue,
|
|
"registered_at": now,
|
|
"last_seen": now,
|
|
"metadata": json.dumps(metadata or {}),
|
|
"ttl": ttl,
|
|
}
|
|
self._redis.hset(REGISTRY_KEY, agent_id, json.dumps(entry))
|
|
self._redis.expire(REGISTRY_KEY, ttl * 2) # Registry key TTL
|
|
|
|
# Also set a per-agent TTL key for heartbeat tracking
|
|
heartbeat_key = f"acp:heartbeat:{agent_id}"
|
|
self._redis.setex(heartbeat_key, ttl, now)
|
|
|
|
return {"status": "registered", "agent_id": agent_id, "queue": queue, "ttl": ttl}
|
|
|
|
def unregister_agent(self, agent_id: str) -> dict:
|
|
"""Remove an agent from the registry."""
|
|
removed = self._redis.hdel(REGISTRY_KEY, agent_id)
|
|
self._redis.delete(f"acp:heartbeat:{agent_id}")
|
|
self._redis.delete(f"{ACK_PREFIX}{agent_id}")
|
|
return {"status": "unregistered" if removed else "not_found", "agent_id": agent_id}
|
|
|
|
def get_agent(self, agent_id: str) -> Optional[dict]:
|
|
"""Get agent info from registry."""
|
|
raw = self._redis.hget(REGISTRY_KEY, agent_id)
|
|
if raw is None:
|
|
return None
|
|
entry = json.loads(raw)
|
|
|
|
# Check heartbeat TTL for liveness
|
|
heartbeat_key = f"acp:heartbeat:{agent_id}"
|
|
if self._redis.exists(heartbeat_key):
|
|
entry["status"] = "active"
|
|
else:
|
|
entry["status"] = "stale"
|
|
|
|
return entry
|
|
|
|
def list_agents(self, status: str = None) -> list[dict]:
|
|
"""
|
|
List all registered agents.
|
|
|
|
Args:
|
|
status: Filter by status ('active', 'stale', or None for all)
|
|
|
|
Returns:
|
|
List of agent info dicts
|
|
"""
|
|
all_entries = self._redis.hgetall(REGISTRY_KEY)
|
|
agents = []
|
|
for agent_id, raw in all_entries.items():
|
|
entry = json.loads(raw)
|
|
|
|
# Refresh status from heartbeat
|
|
heartbeat_key = f"acp:heartbeat:{agent_id}"
|
|
if self._redis.exists(heartbeat_key):
|
|
entry["status"] = "active"
|
|
else:
|
|
entry["status"] = "stale"
|
|
|
|
if status is None or entry["status"] == status:
|
|
agents.append(entry)
|
|
|
|
return sorted(agents, key=lambda a: a.get("registered_at", ""))
|
|
|
|
def _update_heartbeat(self, agent_id: str) -> None:
|
|
"""Update agent's last_seen timestamp and refresh heartbeat TTL."""
|
|
heartbeat_key = f"acp:heartbeat:{agent_id}"
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
|
|
# Get current TTL from registry entry
|
|
raw = self._redis.hget(REGISTRY_KEY, agent_id)
|
|
if raw:
|
|
entry = json.loads(raw)
|
|
ttl = entry.get("ttl", DEFAULT_TTL)
|
|
entry["last_seen"] = now
|
|
self._redis.hset(REGISTRY_KEY, agent_id, json.dumps(entry))
|
|
self._redis.setex(heartbeat_key, ttl, now)
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Queue Management
|
|
# -----------------------------------------------------------------------
|
|
|
|
def queue_length(self, agent_id: str) -> int:
|
|
"""Get the number of pending messages for an agent."""
|
|
return self._redis.llen(f"{QUEUE_PREFIX}{agent_id}")
|
|
|
|
def queue_peek(self, agent_id: str, count: int = 10) -> list[dict]:
|
|
"""Peek at messages in queue without consuming them."""
|
|
queue = f"{QUEUE_PREFIX}{agent_id}"
|
|
raw_messages = self._redis.lrange(queue, 0, count - 1)
|
|
return [json.loads(m) for m in raw_messages]
|
|
|
|
def queue_purge(self, agent_id: str) -> int:
|
|
"""Purge all messages from an agent's inbox. Returns count purged."""
|
|
queue = f"{QUEUE_PREFIX}{agent_id}"
|
|
length = self._redis.llen(queue)
|
|
self._redis.delete(queue)
|
|
return length
|
|
|
|
def dlq_contents(self, limit: int = 50) -> list[dict]:
|
|
"""Get dead-letter queue contents."""
|
|
raw = self._redis.lrange(DLQ_KEY, 0, limit - 1)
|
|
return [json.loads(m) for m in raw]
|
|
|
|
def audit_log(self, limit: int = 100) -> list[dict]:
|
|
"""Get recent audit log entries."""
|
|
raw = self._redis.lrange(f"{DLQ_KEY}:audit", 0, limit - 1)
|
|
return [json.loads(m) for m in raw]
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Pub/Sub Subscriber
|
|
# -----------------------------------------------------------------------
|
|
|
|
def subscribe_broadcast(self, agent_id: str) -> "BroadcastSubscriber":
|
|
"""
|
|
Subscribe to broadcast messages for an agent.
|
|
|
|
Returns a BroadcastSubscriber context manager that yields messages.
|
|
"""
|
|
return BroadcastSubscriber(self._redis, agent_id)
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Health / Diagnostics
|
|
# -----------------------------------------------------------------------
|
|
|
|
def health(self) -> dict:
|
|
"""Health check — verifies Redis connectivity and returns stats."""
|
|
try:
|
|
pong = self._redis.ping()
|
|
agents = self.list_agents()
|
|
return {
|
|
"status": "healthy" if pong else "unreachable",
|
|
"redis_ping": pong,
|
|
"agents_registered": len(agents),
|
|
"agents_active": len([a for a in agents if a["status"] == "active"]),
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
except redis.ConnectionError as e:
|
|
return {
|
|
"status": "unhealthy",
|
|
"error": str(e),
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Broadcast Subscriber (Pub/Sub context manager)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class BroadcastSubscriber:
|
|
"""Context manager for subscribing to ACP broadcast channel."""
|
|
|
|
def __init__(self, redis_client: redis.Redis, agent_id: str):
|
|
self._redis = redis_client
|
|
self._agent_id = agent_id
|
|
self._pubsub = None
|
|
|
|
def __enter__(self):
|
|
self._pubsub = self._redis.pubsub()
|
|
self._pubsub.subscribe(BROADCAST_CHANNEL)
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
if self._pubsub:
|
|
self._pubsub.unsubscribe()
|
|
self._pubsub.close()
|
|
|
|
def listen(self, timeout: float = 5.0) -> Optional[dict]:
|
|
"""
|
|
Listen for the next broadcast message.
|
|
|
|
Args:
|
|
timeout: Seconds to wait (None = forever)
|
|
|
|
Returns:
|
|
ACP message dict or None on timeout
|
|
"""
|
|
if not self._pubsub:
|
|
raise RuntimeError("Not in context manager")
|
|
msg = self._pubsub.get_message(timeout=timeout, ignore_subscribe_messages=True)
|
|
if msg and msg["type"] == "message":
|
|
return json.loads(msg["data"])
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Transport Interface (abstract base)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TransportInterface:
|
|
"""
|
|
Abstract transport interface that all ACP transports must implement.
|
|
This serves as documentation and reference — Python duck typing is used.
|
|
"""
|
|
|
|
def send(self, message: dict) -> dict:
|
|
"""Send a message. Returns delivery receipt."""
|
|
raise NotImplementedError
|
|
|
|
def receive(self, agent_id: str, timeout: int = 5) -> Optional[dict]:
|
|
"""Receive next message for agent. Returns message or None."""
|
|
raise NotImplementedError
|
|
|
|
def ack(self, agent_id: str, message_id: str) -> dict:
|
|
"""Acknowledge a received message."""
|
|
raise NotImplementedError
|
|
|
|
def broadcast(self, message: dict) -> dict:
|
|
"""Broadcast to all agents."""
|
|
raise NotImplementedError
|
|
|
|
def register_agent(self, agent_id: str, **kwargs) -> dict:
|
|
"""Register an agent."""
|
|
raise NotImplementedError
|
|
|
|
def unregister_agent(self, agent_id: str) -> dict:
|
|
"""Unregister an agent."""
|
|
raise NotImplementedError
|
|
|
|
def list_agents(self, **kwargs) -> list[dict]:
|
|
"""List registered agents."""
|
|
raise NotImplementedError
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tmux Fallback Transport
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TmuxTransport:
|
|
"""
|
|
Fallback transport using tmux send-keys.
|
|
Preserved for backward compatibility when Redis is unavailable.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._agents: dict[str, dict] = {}
|
|
|
|
def send(self, message: dict) -> dict:
|
|
"""Send via tmux send-keys (fallback)."""
|
|
import subprocess
|
|
target = message.get("to", "")
|
|
pane = message.get("pane", target)
|
|
payload = json.dumps(message)
|
|
|
|
cmd = ["tmux", "send-keys", "-t", pane, payload, "Enter"]
|
|
try:
|
|
subprocess.run(cmd, check=True, capture_output=True, timeout=5)
|
|
return {
|
|
"status": "sent",
|
|
"message_id": message.get("id", "tmux-fallback"),
|
|
"via": "tmux",
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
except (subprocess.CalledProcessError, FileNotFoundError) as e:
|
|
return {
|
|
"status": "failed",
|
|
"error": str(e),
|
|
"via": "tmux",
|
|
}
|
|
|
|
def receive(self, agent_id: str, timeout: int = 5) -> Optional[dict]:
|
|
"""Tmux fallback cannot do blocking receive."""
|
|
return None
|
|
|
|
def ack(self, agent_id: str, message_id: str) -> dict:
|
|
return {"status": "acked", "ack_id": message_id, "via": "tmux"}
|
|
|
|
def broadcast(self, message: dict) -> dict:
|
|
return {"status": "unsupported", "via": "tmux", "error": "broadcast not supported in tmux mode"}
|
|
|
|
def register_agent(self, agent_id: str, **kwargs) -> dict:
|
|
self._agents[agent_id] = {"status": "active", "via": "tmux"}
|
|
return {"status": "registered", "agent_id": agent_id, "via": "tmux"}
|
|
|
|
def unregister_agent(self, agent_id: str) -> dict:
|
|
self._agents.pop(agent_id, None)
|
|
return {"status": "unregistered", "agent_id": agent_id, "via": "tmux"}
|
|
|
|
def list_agents(self, **kwargs) -> list[dict]:
|
|
return [{"agent_id": k, **v} for k, v in self._agents.items()]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Transport Router (Redis primary, tmux fallback)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TransportRouter:
|
|
"""
|
|
Routes ACP messages through Redis (primary) or tmux (fallback).
|
|
|
|
Selection:
|
|
- Explicit: message['via'] == 'redis' | 'tmux'
|
|
- Auto: try Redis first, fall back to tmux on connection error
|
|
"""
|
|
|
|
def __init__(self, redis_url: str = None):
|
|
self.redis_transport = RedisTransport(redis_url=redis_url)
|
|
self.tmux_transport = TmuxTransport()
|
|
self._redis_available = True
|
|
|
|
def _check_redis(self) -> bool:
|
|
"""Check if Redis is reachable."""
|
|
try:
|
|
self._redis_transport.client.ping()
|
|
self._redis_available = True
|
|
return True
|
|
except redis.ConnectionError:
|
|
self._redis_available = False
|
|
return False
|
|
|
|
def route(self, message: dict) -> dict:
|
|
"""
|
|
Route message through best available transport.
|
|
|
|
Priority: redis (primary) → tmux (fallback)
|
|
"""
|
|
via = message.get("via")
|
|
|
|
if via == "redis":
|
|
return self.redis_transport.send(message)
|
|
elif via == "tmux":
|
|
return self.tmux_transport.send(message)
|
|
|
|
# Auto-route: prefer Redis
|
|
if self._check_redis():
|
|
message["via"] = "redis"
|
|
return self.redis_transport.send(message)
|
|
else:
|
|
message["via"] = "tmux"
|
|
return self.tmux_transport.send(message)
|
|
|
|
def get_transport(self, via: str = None):
|
|
"""Get a specific transport by name."""
|
|
if via == "redis":
|
|
return self.redis_transport
|
|
elif via == "tmux":
|
|
return self.tmux_transport
|
|
|
|
# Auto
|
|
if self._check_redis():
|
|
return self.redis_transport
|
|
return self.tmux_transport
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main():
|
|
"""CLI entry point for ACP Redis transport operations."""
|
|
import argparse
|
|
import sys
|
|
|
|
parser = argparse.ArgumentParser(
|
|
prog="acp",
|
|
description="ACP Bridge — Agent Communication Protocol (Redis transport)",
|
|
)
|
|
subparsers = parser.add_subparsers(dest="command", help="Available commands")
|
|
|
|
# --- send ---
|
|
send_parser = subparsers.add_parser("send", help="Send an ACP message")
|
|
send_parser.add_argument("target", help="Target agent (e.g., agent:pane)")
|
|
send_parser.add_argument("message", help="Message payload (JSON string or plain text)")
|
|
send_parser.add_argument("--via", choices=["redis", "tmux"], default="redis",
|
|
help="Transport to use (default: redis)")
|
|
send_parser.add_argument("--type", default="request", choices=["request", "response", "alert", "broadcast"],
|
|
help="Message type (default: request)")
|
|
send_parser.add_argument("--from", dest="sender", default="system",
|
|
help="Sender agent ID (default: system)")
|
|
send_parser.add_argument("--ttl", type=int, default=3600,
|
|
help="TTL in seconds (default: 3600)")
|
|
|
|
# --- register ---
|
|
reg_parser = subparsers.add_parser("register", help="Register an agent")
|
|
reg_parser.add_argument("agent_id", help="Agent identifier")
|
|
reg_parser.add_argument("--queue", help="Queue path (default: acp:inbox:{agent_id})")
|
|
reg_parser.add_argument("--ttl", type=int, default=3600, help="Registration TTL (default: 3600s)")
|
|
reg_parser.add_argument("--metadata", help="JSON metadata string")
|
|
|
|
# --- unregister ---
|
|
unreg_parser = subparsers.add_parser("unregister", help="Unregister an agent")
|
|
unreg_parser.add_argument("agent_id", help="Agent identifier")
|
|
|
|
# --- agents ---
|
|
agents_parser = subparsers.add_parser("agents", help="List registered agents")
|
|
agents_parser.add_argument("--status", choices=["active", "stale"], help="Filter by status")
|
|
|
|
# --- receive ---
|
|
recv_parser = subparsers.add_parser("receive", help="Receive messages for an agent")
|
|
recv_parser.add_argument("agent_id", help="Agent identifier")
|
|
recv_parser.add_argument("--timeout", type=int, default=5, help="Block timeout in seconds")
|
|
|
|
# --- broadcast ---
|
|
bcast_parser = subparsers.add_parser("broadcast", help="Broadcast to all agents")
|
|
bcast_parser.add_argument("message", help="Message payload")
|
|
bcast_parser.add_argument("--from", dest="sender", default="system")
|
|
bcast_parser.add_argument("--type", default="broadcast")
|
|
|
|
# --- health ---
|
|
subparsers.add_parser("health", help="Check transport health")
|
|
|
|
# --- queue ---
|
|
queue_parser = subparsers.add_parser("queue", help="Queue operations")
|
|
queue_parser.add_argument("agent_id", help="Agent identifier")
|
|
queue_parser.add_argument("action", choices=["length", "peek", "purge"],
|
|
help="Queue action")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.command:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
transport = RedisTransport()
|
|
|
|
try:
|
|
if args.command == "send":
|
|
# Parse message payload
|
|
try:
|
|
payload = json.loads(args.message)
|
|
except (json.JSONDecodeError, TypeError):
|
|
payload = {"text": args.message}
|
|
|
|
msg = transport.build_message(
|
|
to=args.target,
|
|
payload=payload,
|
|
from_agent=args.sender,
|
|
msg_type=args.type,
|
|
ttl=args.ttl,
|
|
)
|
|
|
|
if args.via == "redis":
|
|
result = transport.send(msg)
|
|
else:
|
|
router = TransportRouter()
|
|
result = router.route(msg)
|
|
|
|
print(json.dumps(result, indent=2))
|
|
|
|
elif args.command == "register":
|
|
metadata = {}
|
|
if args.metadata:
|
|
try:
|
|
metadata = json.loads(args.metadata)
|
|
except json.JSONDecodeError:
|
|
print(f"Error: invalid JSON metadata", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
result = transport.register_agent(
|
|
agent_id=args.agent_id,
|
|
queue=args.queue,
|
|
metadata=metadata,
|
|
ttl=args.ttl,
|
|
)
|
|
print(json.dumps(result, indent=2))
|
|
|
|
elif args.command == "unregister":
|
|
result = transport.unregister_agent(args.agent_id)
|
|
print(json.dumps(result, indent=2))
|
|
|
|
elif args.command == "agents":
|
|
agents = transport.list_agents(status=args.status)
|
|
if not agents:
|
|
print("No agents registered.")
|
|
else:
|
|
print(f"{'AGENT ID':<30} {'STATUS':<10} {'QUEUE':<40} {'LAST SEEN'}")
|
|
print("-" * 100)
|
|
for a in agents:
|
|
queue = a.get("queue", "")
|
|
status = a.get("status", "unknown")
|
|
last = a.get("last_seen", "never")
|
|
print(f"{a['agent_id']:<30} {status:<10} {queue:<40} {last}")
|
|
|
|
elif args.command == "receive":
|
|
msg = transport.receive(args.agent_id, timeout=args.timeout)
|
|
if msg:
|
|
print(json.dumps(msg, indent=2))
|
|
else:
|
|
print("No messages (timeout).", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
elif args.command == "broadcast":
|
|
try:
|
|
payload = json.loads(args.message)
|
|
except (json.JSONDecodeError, TypeError):
|
|
payload = {"text": args.message}
|
|
|
|
msg = transport.build_message(
|
|
to="*",
|
|
payload=payload,
|
|
from_agent=args.sender,
|
|
msg_type=args.type,
|
|
)
|
|
result = transport.broadcast(msg)
|
|
print(json.dumps(result, indent=2))
|
|
|
|
elif args.command == "health":
|
|
result = transport.health()
|
|
print(json.dumps(result, indent=2))
|
|
sys.exit(0 if result.get("status") == "healthy" else 1)
|
|
|
|
elif args.command == "queue":
|
|
if args.action == "length":
|
|
length = transport.queue_length(args.agent_id)
|
|
print(json.dumps({"agent_id": args.agent_id, "queue_length": length}))
|
|
elif args.action == "peek":
|
|
msgs = transport.queue_peek(args.agent_id)
|
|
print(json.dumps(msgs, indent=2))
|
|
elif args.action == "purge":
|
|
count = transport.queue_purge(args.agent_id)
|
|
print(json.dumps({"agent_id": args.agent_id, "purged": count}))
|
|
|
|
except redis.ConnectionError as e:
|
|
print(f"Redis connection error: {e}", file=sys.stderr)
|
|
sys.exit(2)
|
|
except ValueError as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
except KeyboardInterrupt:
|
|
sys.exit(130)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|