Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| fddf78ca38 | |||
| 5e539b5eca |
@@ -1,353 +0,0 @@
|
|||||||
"""Multi-Agent Conversation Bridge via Matrix.
|
|
||||||
|
|
||||||
Allows multiple Hermes instances (Timmy, Allegro, Ezra) to communicate
|
|
||||||
with each other through a shared Matrix room.
|
|
||||||
|
|
||||||
Usage:
|
|
||||||
from agent.matrix_bridge import MatrixBridge
|
|
||||||
|
|
||||||
bridge = MatrixBridge(agent_name="Timmy")
|
|
||||||
await bridge.connect()
|
|
||||||
await bridge.send_to_agent("Allegro", "Check the deployment status")
|
|
||||||
messages = await bridge.get_messages_from("Allegro")
|
|
||||||
"""
|
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
import re
|
|
||||||
import time
|
|
||||||
from dataclasses import dataclass, field
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import Any, Callable, Dict, List, Optional, Set
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
# Configuration
|
|
||||||
MATRIX_BRIDGE_ROOM = os.environ.get("MATRIX_BRIDGE_ROOM", "")
|
|
||||||
MATRIX_BRIDGE_ENABLED = os.environ.get("MATRIX_BRIDGE_ENABLED", "true").lower() == "true"
|
|
||||||
AGENT_NAME = os.environ.get("HERMES_AGENT_NAME", "Hermes")
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class AgentMessage:
|
|
||||||
"""A message from one agent to another."""
|
|
||||||
sender: str
|
|
||||||
recipient: str
|
|
||||||
content: str
|
|
||||||
timestamp: float = field(default_factory=time.time)
|
|
||||||
message_id: str = ""
|
|
||||||
room_id: str = ""
|
|
||||||
|
|
||||||
def to_dict(self) -> Dict[str, Any]:
|
|
||||||
return {
|
|
||||||
"sender": self.sender,
|
|
||||||
"recipient": self.recipient,
|
|
||||||
"content": self.content,
|
|
||||||
"timestamp": self.timestamp,
|
|
||||||
"message_id": self.message_id,
|
|
||||||
"room_id": self.room_id,
|
|
||||||
}
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_dict(cls, data: Dict[str, Any]) -> "AgentMessage":
|
|
||||||
return cls(**data)
|
|
||||||
|
|
||||||
|
|
||||||
class MatrixBridge:
|
|
||||||
"""Multi-agent conversation bridge via Matrix rooms.
|
|
||||||
|
|
||||||
Agents communicate by posting messages to a shared Matrix room
|
|
||||||
with a standard format: [@recipient] message content
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
agent_name: str = None,
|
|
||||||
room_id: str = None,
|
|
||||||
callback: Callable[[AgentMessage], None] = None,
|
|
||||||
):
|
|
||||||
self.agent_name = agent_name or AGENT_NAME
|
|
||||||
self.room_id = room_id or MATRIX_BRIDGE_ROOM
|
|
||||||
self.callback = callback
|
|
||||||
self._matrix_client = None
|
|
||||||
self._running = False
|
|
||||||
self._message_handlers: List[Callable[[AgentMessage], None]] = []
|
|
||||||
self._pending_messages: List[AgentMessage] = []
|
|
||||||
self._known_agents: Set[str] = set()
|
|
||||||
|
|
||||||
async def connect(self) -> bool:
|
|
||||||
"""Connect to Matrix and join the bridge room."""
|
|
||||||
if not MATRIX_BRIDGE_ENABLED:
|
|
||||||
logger.info("Matrix bridge disabled via MATRIX_BRIDGE_ENABLED=false")
|
|
||||||
return False
|
|
||||||
|
|
||||||
if not self.room_id:
|
|
||||||
logger.warning("No MATRIX_BRIDGE_ROOM configured — bridge disabled")
|
|
||||||
return False
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Import Matrix client
|
|
||||||
from mautrix.client import Client
|
|
||||||
from mautrix.types import RoomID, UserID
|
|
||||||
|
|
||||||
# Get credentials
|
|
||||||
homeserver = os.environ.get("MATRIX_HOMESERVER", "")
|
|
||||||
access_token = os.environ.get("MATRIX_ACCESS_TOKEN", "")
|
|
||||||
|
|
||||||
if not homeserver or not access_token:
|
|
||||||
logger.warning("Matrix credentials not configured — bridge disabled")
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Create client
|
|
||||||
self._matrix_client = Client(
|
|
||||||
mxid=UserID(f"@{self.agent_name}:{homeserver.split('//')[1]}"),
|
|
||||||
base_url=homeserver,
|
|
||||||
token=access_token,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Join room
|
|
||||||
await self._matrix_client.join_room(RoomID(self.room_id))
|
|
||||||
logger.info(f"Agent {self.agent_name} joined bridge room {self.room_id}")
|
|
||||||
|
|
||||||
# Register message handler
|
|
||||||
self._matrix_client.add_event_handler(self._on_message)
|
|
||||||
|
|
||||||
# Start sync
|
|
||||||
self._running = True
|
|
||||||
asyncio.create_task(self._sync_loop())
|
|
||||||
|
|
||||||
# Announce presence
|
|
||||||
await self._announce_presence()
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to connect to Matrix bridge: {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
async def disconnect(self) -> None:
|
|
||||||
"""Disconnect from the bridge."""
|
|
||||||
self._running = False
|
|
||||||
if self._matrix_client:
|
|
||||||
try:
|
|
||||||
await self._matrix_client.close()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
async def send_to_agent(self, recipient: str, content: str) -> bool:
|
|
||||||
"""Send a message to another agent.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
recipient: Agent name (e.g., "Allegro", "Ezra")
|
|
||||||
content: Message content
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
True if sent successfully
|
|
||||||
"""
|
|
||||||
if not self._matrix_client or not self.room_id:
|
|
||||||
logger.warning("Not connected to bridge room")
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Format message with recipient prefix
|
|
||||||
formatted = f"[@{recipient}] {content}"
|
|
||||||
|
|
||||||
try:
|
|
||||||
from mautrix.types import RoomID, TextMessageEventContent, MessageType
|
|
||||||
|
|
||||||
await self._matrix_client.send_message_event(
|
|
||||||
room_id=RoomID(self.room_id),
|
|
||||||
event_type="m.room.message",
|
|
||||||
content=TextMessageEventContent(
|
|
||||||
msgtype=MessageType.TEXT,
|
|
||||||
body=formatted,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info(f"Sent message to {recipient}: {content[:50]}...")
|
|
||||||
return True
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to send message: {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
async def broadcast(self, content: str) -> bool:
|
|
||||||
"""Broadcast a message to all agents.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
content: Message content
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
True if sent successfully
|
|
||||||
"""
|
|
||||||
return await self.send_to_agent("*", content)
|
|
||||||
|
|
||||||
def add_handler(self, handler: Callable[[AgentMessage], None]) -> None:
|
|
||||||
"""Add a message handler.
|
|
||||||
|
|
||||||
Called when a message is received for this agent.
|
|
||||||
"""
|
|
||||||
self._message_handlers.append(handler)
|
|
||||||
|
|
||||||
def get_known_agents(self) -> Set[str]:
|
|
||||||
"""Get set of known agents in the bridge."""
|
|
||||||
return self._known_agents.copy()
|
|
||||||
|
|
||||||
async def _on_message(self, event) -> None:
|
|
||||||
"""Handle incoming Matrix message."""
|
|
||||||
try:
|
|
||||||
# Extract message content
|
|
||||||
content = event.content
|
|
||||||
if not hasattr(content, 'body'):
|
|
||||||
return
|
|
||||||
|
|
||||||
body = content.body
|
|
||||||
|
|
||||||
# Check if message is for this agent
|
|
||||||
if not self._is_for_me(body):
|
|
||||||
return
|
|
||||||
|
|
||||||
# Parse sender and content
|
|
||||||
sender = self._extract_sender(event)
|
|
||||||
message_content = self._extract_content(body)
|
|
||||||
|
|
||||||
# Create agent message
|
|
||||||
msg = AgentMessage(
|
|
||||||
sender=sender,
|
|
||||||
recipient=self.agent_name,
|
|
||||||
content=message_content,
|
|
||||||
timestamp=time.time(),
|
|
||||||
message_id=str(event.event_id),
|
|
||||||
room_id=str(event.room_id),
|
|
||||||
)
|
|
||||||
|
|
||||||
# Track known agents
|
|
||||||
self._known_agents.add(sender)
|
|
||||||
|
|
||||||
# Call handlers
|
|
||||||
for handler in self._message_handlers:
|
|
||||||
try:
|
|
||||||
handler(msg)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Message handler error: {e}")
|
|
||||||
|
|
||||||
if self.callback:
|
|
||||||
try:
|
|
||||||
self.callback(msg)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Callback error: {e}")
|
|
||||||
|
|
||||||
logger.info(f"Received message from {sender}: {message_content[:50]}...")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error processing message: {e}")
|
|
||||||
|
|
||||||
def _is_for_me(self, body: str) -> bool:
|
|
||||||
"""Check if message is addressed to this agent."""
|
|
||||||
# Direct mention
|
|
||||||
if f"[@{self.agent_name}]" in body:
|
|
||||||
return True
|
|
||||||
|
|
||||||
# Broadcast
|
|
||||||
if "[@*]" in body:
|
|
||||||
return True
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _extract_sender(self, event) -> str:
|
|
||||||
"""Extract sender name from event."""
|
|
||||||
try:
|
|
||||||
sender_id = str(event.sender)
|
|
||||||
# Extract name from @name:server format
|
|
||||||
match = re.match(r"@([^:]+):", sender_id)
|
|
||||||
if match:
|
|
||||||
return match.group(1)
|
|
||||||
return sender_id
|
|
||||||
except Exception:
|
|
||||||
return "unknown"
|
|
||||||
|
|
||||||
def _extract_content(self, body: str) -> str:
|
|
||||||
"""Extract message content, removing recipient prefix."""
|
|
||||||
# Remove [@recipient] prefix
|
|
||||||
match = re.match(r"\[@[^\]]+\]\s*(.*)", body, re.DOTALL)
|
|
||||||
if match:
|
|
||||||
return match.group(1).strip()
|
|
||||||
return body.strip()
|
|
||||||
|
|
||||||
async def _announce_presence(self) -> None:
|
|
||||||
"""Announce this agent's presence to the bridge."""
|
|
||||||
await self.broadcast(f"{self.agent_name} online")
|
|
||||||
|
|
||||||
async def _sync_loop(self) -> None:
|
|
||||||
"""Background sync loop for Matrix events."""
|
|
||||||
while self._running:
|
|
||||||
try:
|
|
||||||
if self._matrix_client:
|
|
||||||
await self._matrix_client.sync(timeout=30000)
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
break
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Sync error: {e}")
|
|
||||||
await asyncio.sleep(5)
|
|
||||||
|
|
||||||
|
|
||||||
class AgentRegistry:
|
|
||||||
"""Registry of known agents in the bridge."""
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self._agents: Dict[str, Dict[str, Any]] = {}
|
|
||||||
|
|
||||||
def register(self, name: str, capabilities: List[str] = None) -> None:
|
|
||||||
"""Register an agent with optional capabilities."""
|
|
||||||
self._agents[name] = {
|
|
||||||
"name": name,
|
|
||||||
"capabilities": capabilities or [],
|
|
||||||
"last_seen": time.time(),
|
|
||||||
"status": "online",
|
|
||||||
}
|
|
||||||
|
|
||||||
def unregister(self, name: str) -> None:
|
|
||||||
"""Unregister an agent."""
|
|
||||||
if name in self._agents:
|
|
||||||
self._agents[name]["status"] = "offline"
|
|
||||||
|
|
||||||
def get_agent(self, name: str) -> Optional[Dict[str, Any]]:
|
|
||||||
"""Get agent info by name."""
|
|
||||||
return self._agents.get(name)
|
|
||||||
|
|
||||||
def list_agents(self) -> List[Dict[str, Any]]:
|
|
||||||
"""List all registered agents."""
|
|
||||||
return list(self._agents.values())
|
|
||||||
|
|
||||||
def find_agents_with_capability(self, capability: str) -> List[str]:
|
|
||||||
"""Find agents with a specific capability."""
|
|
||||||
return [
|
|
||||||
name for name, info in self._agents.items()
|
|
||||||
if capability in info.get("capabilities", [])
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
# Global bridge instance
|
|
||||||
_bridge: Optional[MatrixBridge] = None
|
|
||||||
|
|
||||||
|
|
||||||
async def get_bridge(agent_name: str = None) -> MatrixBridge:
|
|
||||||
"""Get or create the global Matrix bridge instance."""
|
|
||||||
global _bridge
|
|
||||||
if _bridge is None:
|
|
||||||
_bridge = MatrixBridge(agent_name=agent_name)
|
|
||||||
await _bridge.connect()
|
|
||||||
return _bridge
|
|
||||||
|
|
||||||
|
|
||||||
async def send_to_agent(recipient: str, content: str) -> bool:
|
|
||||||
"""Convenience function to send a message to another agent."""
|
|
||||||
bridge = await get_bridge()
|
|
||||||
return await bridge.send_to_agent(recipient, content)
|
|
||||||
|
|
||||||
|
|
||||||
async def broadcast_to_agents(content: str) -> bool:
|
|
||||||
"""Convenience function to broadcast to all agents."""
|
|
||||||
bridge = await get_bridge()
|
|
||||||
return await bridge.broadcast(content)
|
|
||||||
@@ -1,216 +0,0 @@
|
|||||||
# Multi-Agent Conversation Bridge
|
|
||||||
|
|
||||||
Allows multiple Hermes instances (Timmy, Allegro, Ezra) to communicate with each other through a shared Matrix room.
|
|
||||||
|
|
||||||
## Overview
|
|
||||||
|
|
||||||
The Matrix Bridge enables agent-to-agent coordination without manual intervention. Agents can:
|
|
||||||
- Send tasks to specific agents
|
|
||||||
- Broadcast to all agents
|
|
||||||
- Respond to requests from other agents
|
|
||||||
- Coordinate on complex workflows
|
|
||||||
|
|
||||||
## Configuration
|
|
||||||
|
|
||||||
### Environment Variables
|
|
||||||
|
|
||||||
```bash
|
|
||||||
# Enable/disable the bridge
|
|
||||||
MATRIX_BRIDGE_ENABLED=true
|
|
||||||
|
|
||||||
# Shared Matrix room ID for agent communication
|
|
||||||
MATRIX_BRIDGE_ROOM=!roomid:matrix.example.org
|
|
||||||
|
|
||||||
# Agent name (for message routing)
|
|
||||||
HERMES_AGENT_NAME=Timmy
|
|
||||||
|
|
||||||
# Matrix credentials (from existing Matrix gateway config)
|
|
||||||
MATRIX_HOMESERVER=https://matrix.example.org
|
|
||||||
MATRIX_ACCESS_TOKEN=syt_...
|
|
||||||
```
|
|
||||||
|
|
||||||
### Matrix Room Setup
|
|
||||||
|
|
||||||
1. Create a Matrix room for agent communication
|
|
||||||
2. Invite all agent accounts to the room
|
|
||||||
3. Set `MATRIX_BRIDGE_ROOM` to the room ID
|
|
||||||
|
|
||||||
## Message Format
|
|
||||||
|
|
||||||
Messages use a simple prefix format for routing:
|
|
||||||
|
|
||||||
```
|
|
||||||
[@Allegro] Check the deployment status on VPS
|
|
||||||
[@Ezra] Can you review PR #456?
|
|
||||||
[@*] System maintenance in 5 minutes
|
|
||||||
```
|
|
||||||
|
|
||||||
- `[@AgentName]` — Message for specific agent
|
|
||||||
- `[@*]` — Broadcast to all agents
|
|
||||||
|
|
||||||
## Usage
|
|
||||||
|
|
||||||
### Basic Usage
|
|
||||||
|
|
||||||
```python
|
|
||||||
from agent.matrix_bridge import MatrixBridge, send_to_agent, broadcast_to_agents
|
|
||||||
|
|
||||||
# Create bridge
|
|
||||||
bridge = MatrixBridge(agent_name="Timmy")
|
|
||||||
await bridge.connect()
|
|
||||||
|
|
||||||
# Send to specific agent
|
|
||||||
await bridge.send_to_agent("Allegro", "Check deployment status")
|
|
||||||
|
|
||||||
# Broadcast to all agents
|
|
||||||
await bridge.broadcast("System maintenance starting")
|
|
||||||
|
|
||||||
# Add message handler
|
|
||||||
def handle_message(msg):
|
|
||||||
print(f"From {msg.sender}: {msg.content}")
|
|
||||||
|
|
||||||
bridge.add_handler(handle_message)
|
|
||||||
```
|
|
||||||
|
|
||||||
### Convenience Functions
|
|
||||||
|
|
||||||
```python
|
|
||||||
from agent.matrix_bridge import send_to_agent, broadcast_to_agents
|
|
||||||
|
|
||||||
# Send message
|
|
||||||
await send_to_agent("Ezra", "Review PR #456")
|
|
||||||
|
|
||||||
# Broadcast
|
|
||||||
await broadcast_to_agents("Going offline for maintenance")
|
|
||||||
```
|
|
||||||
|
|
||||||
### Agent Registry
|
|
||||||
|
|
||||||
```python
|
|
||||||
from agent.matrix_bridge import AgentRegistry
|
|
||||||
|
|
||||||
registry = AgentRegistry()
|
|
||||||
|
|
||||||
# Register agent with capabilities
|
|
||||||
registry.register("Timmy", capabilities=["code", "review", "deploy"])
|
|
||||||
registry.register("Allegro", capabilities=["monitoring", "alerting"])
|
|
||||||
|
|
||||||
# Find agents with capability
|
|
||||||
coders = registry.find_agents_with_capability("code")
|
|
||||||
```
|
|
||||||
|
|
||||||
## Message Flow
|
|
||||||
|
|
||||||
```
|
|
||||||
┌─────────┐ ┌─────────┐ ┌─────────┐
|
|
||||||
│ Timmy │────▶│ Matrix │────▶│ Allegro │
|
|
||||||
│ Agent │ │ Room │ │ Agent │
|
|
||||||
└─────────┘ └─────────┘ └─────────┘
|
|
||||||
│ │ │
|
|
||||||
│ [@Allegro] │ │
|
|
||||||
│ Check deps │ │
|
|
||||||
└──────────────▶│ │
|
|
||||||
│ [@Allegro] │
|
|
||||||
│ Check deps │
|
|
||||||
└──────────────▶│
|
|
||||||
│
|
|
||||||
│ [@Timmy] │
|
|
||||||
│ Done ✓ │
|
|
||||||
│◀──────────────┘
|
|
||||||
│ [@Timmy] │
|
|
||||||
│ Done ✓ │
|
|
||||||
│◀──────────────┘
|
|
||||||
```
|
|
||||||
|
|
||||||
## Integration with Hermes
|
|
||||||
|
|
||||||
### In run_agent.py
|
|
||||||
|
|
||||||
```python
|
|
||||||
# Add to conversation loop
|
|
||||||
if self.matrix_bridge:
|
|
||||||
# Check for messages from other agents
|
|
||||||
messages = await self.matrix_bridge.get_pending_messages()
|
|
||||||
for msg in messages:
|
|
||||||
# Process agent-to-agent messages
|
|
||||||
pass
|
|
||||||
```
|
|
||||||
|
|
||||||
### In Gateway
|
|
||||||
|
|
||||||
```python
|
|
||||||
# Add Matrix bridge to gateway
|
|
||||||
from agent.matrix_bridge import MatrixBridge
|
|
||||||
|
|
||||||
bridge = MatrixBridge(agent_name="Timmy")
|
|
||||||
await bridge.connect()
|
|
||||||
gateway.matrix_bridge = bridge
|
|
||||||
```
|
|
||||||
|
|
||||||
## Testing
|
|
||||||
|
|
||||||
### Unit Tests
|
|
||||||
|
|
||||||
```python
|
|
||||||
def test_message_parsing():
|
|
||||||
"""Test message format parsing."""
|
|
||||||
from agent.matrix_bridge import MatrixBridge
|
|
||||||
|
|
||||||
bridge = MatrixBridge(agent_name="Timmy")
|
|
||||||
|
|
||||||
# Test recipient extraction
|
|
||||||
assert bridge._is_for_me("[@Timmy] Hello")
|
|
||||||
assert not bridge._is_for_me("[@Allegro] Hello")
|
|
||||||
assert bridge._is_for_me("[@*] Broadcast")
|
|
||||||
|
|
||||||
# Test content extraction
|
|
||||||
assert bridge._extract_content("[@Timmy] Hello") == "Hello"
|
|
||||||
assert bridge._extract_content("[@*] Test message") == "Test message"
|
|
||||||
```
|
|
||||||
|
|
||||||
### Integration Test
|
|
||||||
|
|
||||||
```bash
|
|
||||||
# Test with two agents
|
|
||||||
MATRIX_BRIDGE_ENABLED=true \
|
|
||||||
MATRIX_BRIDGE_ROOM=!test:matrix.example.org \
|
|
||||||
HERMES_AGENT_NAME=Timmy \
|
|
||||||
python -c "
|
|
||||||
import asyncio
|
|
||||||
from agent.matrix_bridge import send_to_agent
|
|
||||||
|
|
||||||
async def test():
|
|
||||||
await send_to_agent('Allegro', 'Test message')
|
|
||||||
print('Sent')
|
|
||||||
|
|
||||||
asyncio.run(test())
|
|
||||||
"
|
|
||||||
```
|
|
||||||
|
|
||||||
## Troubleshooting
|
|
||||||
|
|
||||||
### Bridge not connecting
|
|
||||||
|
|
||||||
1. Check `MATRIX_BRIDGE_ENABLED=true`
|
|
||||||
2. Verify `MATRIX_BRIDGE_ROOM` is set
|
|
||||||
3. Ensure Matrix credentials are configured
|
|
||||||
4. Check Matrix homeserver is reachable
|
|
||||||
|
|
||||||
### Messages not received
|
|
||||||
|
|
||||||
1. Verify agent is in the Matrix room
|
|
||||||
2. Check message format: `[@AgentName] content`
|
|
||||||
3. Ensure `HERMES_AGENT_NAME` matches agent name
|
|
||||||
4. Check Matrix sync is running
|
|
||||||
|
|
||||||
### Agent not found
|
|
||||||
|
|
||||||
1. Verify agent has joined the bridge room
|
|
||||||
2. Check agent name matches exactly (case-sensitive)
|
|
||||||
3. Ensure agent has announced presence
|
|
||||||
|
|
||||||
## Related
|
|
||||||
|
|
||||||
- Issue #747: feat: multi-agent conversation bridge via Matrix
|
|
||||||
- Matrix Gateway: `gateway/platforms/matrix.py`
|
|
||||||
- Multi-Agent Orchestration: `docs/multi-agent-orchestration.md`
|
|
||||||
@@ -1,114 +0,0 @@
|
|||||||
"""Tests for Matrix Bridge — Issue #747."""
|
|
||||||
import sys
|
|
||||||
from pathlib import Path
|
|
||||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
||||||
|
|
||||||
from agent.matrix_bridge import MatrixBridge, AgentMessage, AgentRegistry
|
|
||||||
|
|
||||||
|
|
||||||
class TestMessageParsing:
|
|
||||||
"""Test message format parsing."""
|
|
||||||
|
|
||||||
def test_is_for_me_direct(self):
|
|
||||||
bridge = MatrixBridge(agent_name="Timmy")
|
|
||||||
assert bridge._is_for_me("[@Timmy] Hello") == True
|
|
||||||
|
|
||||||
def test_is_not_for_me(self):
|
|
||||||
bridge = MatrixBridge(agent_name="Timmy")
|
|
||||||
assert bridge._is_for_me("[@Allegro] Hello") == False
|
|
||||||
|
|
||||||
def test_is_broadcast(self):
|
|
||||||
bridge = MatrixBridge(agent_name="Timmy")
|
|
||||||
assert bridge._is_for_me("[@*] Broadcast") == True
|
|
||||||
|
|
||||||
def test_extract_content(self):
|
|
||||||
bridge = MatrixBridge(agent_name="Timmy")
|
|
||||||
assert bridge._extract_content("[@Timmy] Hello world") == "Hello world"
|
|
||||||
|
|
||||||
def test_extract_content_multiline(self):
|
|
||||||
bridge = MatrixBridge(agent_name="Timmy")
|
|
||||||
content = bridge._extract_content("[@Timmy] Line 1\nLine 2")
|
|
||||||
assert content == "Line 1\nLine 2"
|
|
||||||
|
|
||||||
|
|
||||||
class TestAgentMessage:
|
|
||||||
"""Test AgentMessage dataclass."""
|
|
||||||
|
|
||||||
def test_to_dict(self):
|
|
||||||
msg = AgentMessage(
|
|
||||||
sender="Timmy",
|
|
||||||
recipient="Allegro",
|
|
||||||
content="Hello",
|
|
||||||
timestamp=1234567890.0,
|
|
||||||
)
|
|
||||||
d = msg.to_dict()
|
|
||||||
assert d["sender"] == "Timmy"
|
|
||||||
assert d["recipient"] == "Allegro"
|
|
||||||
assert d["content"] == "Hello"
|
|
||||||
|
|
||||||
def test_from_dict(self):
|
|
||||||
d = {
|
|
||||||
"sender": "Timmy",
|
|
||||||
"recipient": "Allegro",
|
|
||||||
"content": "Hello",
|
|
||||||
"timestamp": 1234567890.0,
|
|
||||||
"message_id": "",
|
|
||||||
"room_id": "",
|
|
||||||
}
|
|
||||||
msg = AgentMessage.from_dict(d)
|
|
||||||
assert msg.sender == "Timmy"
|
|
||||||
assert msg.recipient == "Allegro"
|
|
||||||
|
|
||||||
|
|
||||||
class TestAgentRegistry:
|
|
||||||
"""Test AgentRegistry."""
|
|
||||||
|
|
||||||
def test_register(self):
|
|
||||||
registry = AgentRegistry()
|
|
||||||
registry.register("Timmy", capabilities=["code", "review"])
|
|
||||||
agent = registry.get_agent("Timmy")
|
|
||||||
assert agent["name"] == "Timmy"
|
|
||||||
assert "code" in agent["capabilities"]
|
|
||||||
|
|
||||||
def test_list_agents(self):
|
|
||||||
registry = AgentRegistry()
|
|
||||||
registry.register("Timmy")
|
|
||||||
registry.register("Allegro")
|
|
||||||
agents = registry.list_agents()
|
|
||||||
assert len(agents) == 2
|
|
||||||
|
|
||||||
def test_find_with_capability(self):
|
|
||||||
registry = AgentRegistry()
|
|
||||||
registry.register("Timmy", capabilities=["code"])
|
|
||||||
registry.register("Allegro", capabilities=["monitoring"])
|
|
||||||
coders = registry.find_agents_with_capability("code")
|
|
||||||
assert "Timmy" in coders
|
|
||||||
assert "Allegro" not in coders
|
|
||||||
|
|
||||||
def test_unregister(self):
|
|
||||||
registry = AgentRegistry()
|
|
||||||
registry.register("Timmy")
|
|
||||||
registry.unregister("Timmy")
|
|
||||||
agent = registry.get_agent("Timmy")
|
|
||||||
assert agent["status"] == "offline"
|
|
||||||
|
|
||||||
|
|
||||||
class TestBridgeInit:
|
|
||||||
"""Test bridge initialization."""
|
|
||||||
|
|
||||||
def test_default_agent_name(self):
|
|
||||||
bridge = MatrixBridge()
|
|
||||||
assert bridge.agent_name == "Hermes"
|
|
||||||
|
|
||||||
def test_custom_agent_name(self):
|
|
||||||
bridge = MatrixBridge(agent_name="Timmy")
|
|
||||||
assert bridge.agent_name == "Timmy"
|
|
||||||
|
|
||||||
def test_known_agents_empty(self):
|
|
||||||
bridge = MatrixBridge()
|
|
||||||
assert len(bridge.get_known_agents()) == 0
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
import pytest
|
|
||||||
pytest.main([__file__, "-v"])
|
|
||||||
60
tests/test_mcp_pid_lock.py
Normal file
60
tests/test_mcp_pid_lock.py
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
"""Tests for MCP PID file lock (#734)."""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
import pytest
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
# We test the functions by mocking _PID_DIR
|
||||||
|
import tools.mcp_pid_lock as pid_mod
|
||||||
|
|
||||||
|
|
||||||
|
class TestPidLock:
|
||||||
|
def setup_method(self):
|
||||||
|
self.tmp = tempfile.mkdtemp()
|
||||||
|
pid_mod._PID_DIR = Path(self.tmp)
|
||||||
|
|
||||||
|
def teardown_method(self):
|
||||||
|
import shutil
|
||||||
|
shutil.rmtree(self.tmp, ignore_errors=True)
|
||||||
|
|
||||||
|
def test_check_returns_none_when_no_file(self):
|
||||||
|
result = pid_mod.check_pid_lock("test-server")
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
def test_write_and_check_alive(self):
|
||||||
|
my_pid = os.getpid()
|
||||||
|
pid_mod.write_pid_lock("test-server", my_pid)
|
||||||
|
result = pid_mod.check_pid_lock("test-server")
|
||||||
|
assert result == my_pid
|
||||||
|
|
||||||
|
def test_stale_pid_cleaned(self):
|
||||||
|
# Write a PID that doesn't exist
|
||||||
|
pid_mod.write_pid_lock("test-server", 999999999)
|
||||||
|
result = pid_mod.check_pid_lock("test-server")
|
||||||
|
assert result is None
|
||||||
|
# PID file should be cleaned up
|
||||||
|
assert not pid_mod._pid_file("test-server").exists()
|
||||||
|
|
||||||
|
def test_corrupted_pid_cleaned(self):
|
||||||
|
pf = pid_mod._pid_file("test-server")
|
||||||
|
pf.write_text("not-a-number")
|
||||||
|
result = pid_mod.check_pid_lock("test-server")
|
||||||
|
assert result is None
|
||||||
|
assert not pf.exists()
|
||||||
|
|
||||||
|
def test_release_removes_file(self):
|
||||||
|
pid_mod.write_pid_lock("test-server", os.getpid())
|
||||||
|
assert pid_mod._pid_file("test-server").exists()
|
||||||
|
pid_mod.release_pid_lock("test-server")
|
||||||
|
assert not pid_mod._pid_file("test-server").exists()
|
||||||
|
|
||||||
|
def test_release_noop_when_no_file(self):
|
||||||
|
# Should not raise
|
||||||
|
pid_mod.release_pid_lock("nonexistent")
|
||||||
|
|
||||||
|
def test_multiple_servers_independent(self):
|
||||||
|
pid_mod.write_pid_lock("server-a", os.getpid())
|
||||||
|
assert pid_mod.check_pid_lock("server-a") == os.getpid()
|
||||||
|
assert pid_mod.check_pid_lock("server-b") is None
|
||||||
87
tools/mcp_pid_lock.py
Normal file
87
tools/mcp_pid_lock.py
Normal file
@@ -0,0 +1,87 @@
|
|||||||
|
"""
|
||||||
|
PID file lock for MCP server instances — prevents concurrent spawning.
|
||||||
|
|
||||||
|
Before spawning an MCP server, check for a PID file. If the process is
|
||||||
|
alive, skip spawn. If stale, clean up. Write PID after spawn, remove
|
||||||
|
on shutdown.
|
||||||
|
|
||||||
|
Related: #714 (zombie cleanup), #734 (preventive lock)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import logging
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_PID_DIR = Path.home() / ".hermes" / "mcp"
|
||||||
|
|
||||||
|
|
||||||
|
def _pid_file(server_name: str) -> Path:
|
||||||
|
"""Return the PID file path for a named MCP server."""
|
||||||
|
_PID_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
|
return _PID_DIR / f"{server_name}.pid"
|
||||||
|
|
||||||
|
|
||||||
|
def _is_process_alive(pid: int) -> bool:
|
||||||
|
"""Check if a process with the given PID is running."""
|
||||||
|
try:
|
||||||
|
os.kill(pid, 0) # Signal 0 = check existence without killing
|
||||||
|
return True
|
||||||
|
except (ProcessLookupError, PermissionError, OSError):
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def check_pid_lock(server_name: str) -> Optional[int]:
|
||||||
|
"""Check if an MCP server instance is already running.
|
||||||
|
|
||||||
|
Returns the running PID if locked, None if safe to spawn.
|
||||||
|
"""
|
||||||
|
pf = _pid_file(server_name)
|
||||||
|
if not pf.exists():
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
pid = int(pf.read_text().strip())
|
||||||
|
except (ValueError, OSError):
|
||||||
|
# Corrupted PID file — clean up
|
||||||
|
logger.warning("MCP PID file %s corrupted, removing", pf)
|
||||||
|
try:
|
||||||
|
pf.unlink()
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
return None
|
||||||
|
|
||||||
|
if _is_process_alive(pid):
|
||||||
|
logger.info("MCP server '%s' already running (PID %d), skipping spawn", server_name, pid)
|
||||||
|
return pid
|
||||||
|
|
||||||
|
# Stale PID file — process is dead
|
||||||
|
logger.info("MCP server '%s' PID %d is stale, cleaning up", server_name, pid)
|
||||||
|
try:
|
||||||
|
pf.unlink()
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def write_pid_lock(server_name: str, pid: int) -> None:
|
||||||
|
"""Write PID file after successful MCP server spawn."""
|
||||||
|
pf = _pid_file(server_name)
|
||||||
|
try:
|
||||||
|
pf.write_text(str(pid))
|
||||||
|
logger.debug("MCP server '%s' PID lock written: %d", server_name, pid)
|
||||||
|
except OSError as e:
|
||||||
|
logger.warning("Failed to write PID lock for '%s': %s", server_name, e)
|
||||||
|
|
||||||
|
|
||||||
|
def release_pid_lock(server_name: str) -> None:
|
||||||
|
"""Remove PID file on MCP server shutdown."""
|
||||||
|
pf = _pid_file(server_name)
|
||||||
|
try:
|
||||||
|
if pf.exists():
|
||||||
|
pf.unlink()
|
||||||
|
logger.debug("MCP server '%s' PID lock released", server_name)
|
||||||
|
except OSError as e:
|
||||||
|
logger.warning("Failed to release PID lock for '%s': %s", server_name, e)
|
||||||
Reference in New Issue
Block a user