From 6716e66e89fc0b7299aa6581ff6a855cbbdc846d Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Sun, 29 Mar 2026 15:47:19 -0700 Subject: [PATCH] =?UTF-8?q?feat:=20add=20MCP=20server=20mode=20=E2=80=94?= =?UTF-8?q?=20hermes=20mcp=20serve=20(#3795)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit hermes mcp serve starts a stdio MCP server that lets any MCP client (Claude Code, Cursor, Codex, etc.) interact with Hermes conversations. Matches OpenClaw's 9-tool channel bridge surface: Tools exposed: - conversations_list: list active sessions across all platforms - conversation_get: details on one conversation - messages_read: read message history - attachments_fetch: extract non-text content from messages - events_poll: poll for new events since a cursor - events_wait: long-poll / block until next event (near-real-time) - messages_send: send to any platform via send_message_tool - channels_list: browse available messaging targets - permissions_list_open: list pending approval requests - permissions_respond: allow/deny approvals Architecture: - EventBridge: background thread polls SessionDB for new messages, maintains in-memory event queue with waiter support - Reads sessions.json + SessionDB directly (no gateway dep for reads) - Reuses send_message_tool for sending (same platform adapters) - FastMCP server with stdio transport - Zero new dependencies (uses existing mcp>=1.2.0 optional dep) Files: - mcp_serve.py: MCP server + EventBridge (~600 lines) - hermes_cli/main.py: added serve sub-parser to hermes mcp - hermes_cli/mcp_config.py: route serve action to run_mcp_server - tests/test_mcp_serve.py: 53 tests - docs: updated MCP page + CLI commands reference --- hermes_cli/main.py | 17 +- hermes_cli/mcp_config.py | 6 + mcp_serve.py | 868 ++++++++++++++++++ tests/test_mcp_serve.py | 1111 +++++++++++++++++++++++ website/docs/reference/cli-commands.md | 5 +- website/docs/user-guide/features/mcp.md | 99 ++ 6 files changed, 2100 insertions(+), 6 deletions(-) create mode 100644 mcp_serve.py create mode 100644 tests/test_mcp_serve.py diff --git a/hermes_cli/main.py b/hermes_cli/main.py index 4a00f1ef..97428e09 100644 --- a/hermes_cli/main.py +++ b/hermes_cli/main.py @@ -4308,16 +4308,25 @@ For more help on a command: # ========================================================================= mcp_parser = subparsers.add_parser( "mcp", - help="Manage MCP server connections", + help="Manage MCP servers and run Hermes as an MCP server", description=( - "Add, remove, list, test, and configure MCP server connections.\n\n" + "Manage MCP server connections and run Hermes as an MCP server.\n\n" "MCP servers provide additional tools via the Model Context Protocol.\n" - "Use 'hermes mcp add' to connect to a new server with interactive\n" - "tool discovery. Run 'hermes mcp' with no subcommand to list servers." + "Use 'hermes mcp add' to connect to a new server, or\n" + "'hermes mcp serve' to expose Hermes conversations over MCP." ), ) mcp_sub = mcp_parser.add_subparsers(dest="mcp_action") + mcp_serve_p = mcp_sub.add_parser( + "serve", + help="Run Hermes as an MCP server (expose conversations to other agents)", + ) + mcp_serve_p.add_argument( + "-v", "--verbose", action="store_true", + help="Enable verbose logging on stderr", + ) + mcp_add_p = mcp_sub.add_parser("add", help="Add an MCP server (discovery-first install)") mcp_add_p.add_argument("name", help="Server name (used as config key)") mcp_add_p.add_argument("--url", help="HTTP/SSE endpoint URL") diff --git a/hermes_cli/mcp_config.py b/hermes_cli/mcp_config.py index 05eb088a..0f08e467 100644 --- a/hermes_cli/mcp_config.py +++ b/hermes_cli/mcp_config.py @@ -608,6 +608,11 @@ def mcp_command(args): """Main dispatcher for ``hermes mcp`` subcommands.""" action = getattr(args, "mcp_action", None) + if action == "serve": + from mcp_serve import run_mcp_server + run_mcp_server(verbose=getattr(args, "verbose", False)) + return + handlers = { "add": cmd_mcp_add, "remove": cmd_mcp_remove, @@ -626,6 +631,7 @@ def mcp_command(args): # No subcommand — show list cmd_mcp_list() print(color(" Commands:", Colors.CYAN)) + _info("hermes mcp serve Run as MCP server") _info("hermes mcp add --url Add an MCP server") _info("hermes mcp add --command Add a stdio server") _info("hermes mcp remove Remove a server") diff --git a/mcp_serve.py b/mcp_serve.py new file mode 100644 index 00000000..93c43979 --- /dev/null +++ b/mcp_serve.py @@ -0,0 +1,868 @@ +""" +Hermes MCP Server — expose messaging conversations as MCP tools. + +Starts a stdio MCP server that lets any MCP client (Claude Code, Cursor, Codex, +etc.) list conversations, read message history, send messages, poll for live +events, and manage approval requests across all connected platforms. + +Matches OpenClaw's 9-tool MCP channel bridge surface: + conversations_list, conversation_get, messages_read, attachments_fetch, + events_poll, events_wait, messages_send, permissions_list_open, + permissions_respond + +Plus: channels_list (Hermes-specific extra) + +Usage: + hermes mcp serve + hermes mcp serve --verbose + +MCP client config (e.g. claude_desktop_config.json): + { + "mcpServers": { + "hermes": { + "command": "hermes", + "args": ["mcp", "serve"] + } + } + } +""" + +from __future__ import annotations + +import json +import logging +import os +import re +import sys +import threading +import time +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional + +logger = logging.getLogger("hermes.mcp_serve") + +# --------------------------------------------------------------------------- +# Lazy MCP SDK import +# --------------------------------------------------------------------------- + +_MCP_SERVER_AVAILABLE = False +try: + from mcp.server.fastmcp import FastMCP + + _MCP_SERVER_AVAILABLE = True +except ImportError: + FastMCP = None # type: ignore[assignment,misc] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _get_sessions_dir() -> Path: + """Return the sessions directory using HERMES_HOME.""" + try: + from hermes_constants import get_hermes_home + return get_hermes_home() / "sessions" + except ImportError: + return Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes")) / "sessions" + + +def _get_session_db(): + """Get a SessionDB instance for reading message transcripts.""" + try: + from hermes_state import SessionDB + return SessionDB() + except Exception as e: + logger.debug("SessionDB unavailable: %s", e) + return None + + +def _load_sessions_index() -> dict: + """Load the gateway sessions.json index directly. + + Returns a dict of session_key -> entry_dict with platform routing info. + This avoids importing the full SessionStore which needs GatewayConfig. + """ + sessions_file = _get_sessions_dir() / "sessions.json" + if not sessions_file.exists(): + return {} + try: + with open(sessions_file, "r", encoding="utf-8") as f: + return json.load(f) + except Exception as e: + logger.debug("Failed to load sessions.json: %s", e) + return {} + + +def _load_channel_directory() -> dict: + """Load the cached channel directory for available targets.""" + try: + from hermes_constants import get_hermes_home + directory_file = get_hermes_home() / "channel_directory.json" + except ImportError: + directory_file = Path( + os.environ.get("HERMES_HOME", Path.home() / ".hermes") + ) / "channel_directory.json" + + if not directory_file.exists(): + return {} + try: + with open(directory_file, "r", encoding="utf-8") as f: + return json.load(f) + except Exception as e: + logger.debug("Failed to load channel_directory.json: %s", e) + return {} + + +def _extract_message_content(msg: dict) -> str: + """Extract text content from a message, handling multi-part content.""" + content = msg.get("content", "") + if isinstance(content, list): + text_parts = [ + p.get("text", "") for p in content + if isinstance(p, dict) and p.get("type") == "text" + ] + return "\n".join(text_parts) + return str(content) if content else "" + + +def _extract_attachments(msg: dict) -> List[dict]: + """Extract non-text attachments from a message. + + Finds: multi-part image/file content blocks, MEDIA: tags in text, + image URLs, and file references. + """ + attachments = [] + content = msg.get("content", "") + + # Multi-part content blocks (image_url, file, etc.) + if isinstance(content, list): + for part in content: + if not isinstance(part, dict): + continue + ptype = part.get("type", "") + if ptype == "image_url": + url = part.get("image_url", {}).get("url", "") if isinstance(part.get("image_url"), dict) else "" + if url: + attachments.append({"type": "image", "url": url}) + elif ptype == "image": + url = part.get("url", part.get("source", {}).get("url", "")) + if url: + attachments.append({"type": "image", "url": url}) + elif ptype not in ("text",): + # Unknown non-text content type + attachments.append({"type": ptype, "data": part}) + + # MEDIA: tags in text content + text = _extract_message_content(msg) + if text: + media_pattern = re.compile(r'MEDIA:\s*(\S+)') + for match in media_pattern.finditer(text): + path = match.group(1) + attachments.append({"type": "media", "path": path}) + + return attachments + + +# --------------------------------------------------------------------------- +# Event Bridge — polls SessionDB for new messages, maintains event queue +# --------------------------------------------------------------------------- + +QUEUE_LIMIT = 1000 +POLL_INTERVAL = 0.2 # seconds between DB polls (200ms) + + +@dataclass +class QueueEvent: + """An event in the bridge's in-memory queue.""" + cursor: int + type: str # "message", "approval_requested", "approval_resolved" + session_key: str = "" + data: dict = field(default_factory=dict) + + +class EventBridge: + """Background poller that watches SessionDB for new messages and + maintains an in-memory event queue with waiter support. + + This is the Hermes equivalent of OpenClaw's WebSocket gateway bridge. + Instead of WebSocket events, we poll the SQLite database for changes. + """ + + def __init__(self): + self._queue: List[QueueEvent] = [] + self._cursor = 0 + self._lock = threading.Lock() + self._new_event = threading.Event() + self._running = False + self._thread: Optional[threading.Thread] = None + self._last_poll_timestamps: Dict[str, float] = {} # session_key -> unix timestamp + # In-memory approval tracking (populated from events) + self._pending_approvals: Dict[str, dict] = {} + # mtime cache — skip expensive work when files haven't changed + self._sessions_json_mtime: float = 0.0 + self._state_db_mtime: float = 0.0 + self._cached_sessions_index: dict = {} + + def start(self): + """Start the background polling thread.""" + if self._running: + return + self._running = True + self._thread = threading.Thread(target=self._poll_loop, daemon=True) + self._thread.start() + logger.debug("EventBridge started") + + def stop(self): + """Stop the background polling thread.""" + self._running = False + self._new_event.set() # Wake any waiters + if self._thread: + self._thread.join(timeout=5) + logger.debug("EventBridge stopped") + + def poll_events( + self, + after_cursor: int = 0, + session_key: Optional[str] = None, + limit: int = 20, + ) -> dict: + """Return events since after_cursor, optionally filtered by session_key.""" + with self._lock: + events = [ + e for e in self._queue + if e.cursor > after_cursor + and (not session_key or e.session_key == session_key) + ][:limit] + + next_cursor = events[-1].cursor if events else after_cursor + return { + "events": [ + {"cursor": e.cursor, "type": e.type, + "session_key": e.session_key, **e.data} + for e in events + ], + "next_cursor": next_cursor, + } + + def wait_for_event( + self, + after_cursor: int = 0, + session_key: Optional[str] = None, + timeout_ms: int = 30000, + ) -> Optional[dict]: + """Block until a matching event arrives or timeout expires.""" + deadline = time.monotonic() + (timeout_ms / 1000.0) + + while time.monotonic() < deadline: + with self._lock: + for e in self._queue: + if e.cursor > after_cursor and ( + not session_key or e.session_key == session_key + ): + return { + "cursor": e.cursor, "type": e.type, + "session_key": e.session_key, **e.data, + } + + remaining = deadline - time.monotonic() + if remaining <= 0: + break + self._new_event.clear() + self._new_event.wait(timeout=min(remaining, POLL_INTERVAL)) + + return None + + def list_pending_approvals(self) -> List[dict]: + """List approval requests observed during this bridge session.""" + with self._lock: + return sorted( + self._pending_approvals.values(), + key=lambda a: a.get("created_at", ""), + ) + + def respond_to_approval(self, approval_id: str, decision: str) -> dict: + """Resolve a pending approval (best-effort without gateway IPC).""" + with self._lock: + approval = self._pending_approvals.pop(approval_id, None) + + if not approval: + return {"error": f"Approval not found: {approval_id}"} + + self._enqueue(QueueEvent( + cursor=0, # Will be set by _enqueue + type="approval_resolved", + session_key=approval.get("session_key", ""), + data={"approval_id": approval_id, "decision": decision}, + )) + + return {"resolved": True, "approval_id": approval_id, "decision": decision} + + def _enqueue(self, event: QueueEvent) -> None: + """Add an event to the queue and wake any waiters.""" + with self._lock: + self._cursor += 1 + event.cursor = self._cursor + self._queue.append(event) + # Trim queue to limit + while len(self._queue) > QUEUE_LIMIT: + self._queue.pop(0) + self._new_event.set() + + def _poll_loop(self): + """Background loop: poll SessionDB for new messages.""" + db = _get_session_db() + if not db: + logger.warning("EventBridge: SessionDB unavailable, event polling disabled") + return + + while self._running: + try: + self._poll_once(db) + except Exception as e: + logger.debug("EventBridge poll error: %s", e) + time.sleep(POLL_INTERVAL) + + def _poll_once(self, db): + """Check for new messages across all sessions. + + Uses mtime checks on sessions.json and state.db to skip work + when nothing has changed — makes 200ms polling essentially free. + """ + # Check if sessions.json has changed (mtime check is ~1μs) + sessions_file = _get_sessions_dir() / "sessions.json" + try: + sj_mtime = sessions_file.stat().st_mtime if sessions_file.exists() else 0.0 + except OSError: + sj_mtime = 0.0 + + if sj_mtime != self._sessions_json_mtime: + self._sessions_json_mtime = sj_mtime + self._cached_sessions_index = _load_sessions_index() + + # Check if state.db has changed + try: + from hermes_constants import get_hermes_home + db_file = get_hermes_home() / "state.db" + except ImportError: + db_file = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes")) / "state.db" + + try: + db_mtime = db_file.stat().st_mtime if db_file.exists() else 0.0 + except OSError: + db_mtime = 0.0 + + if db_mtime == self._state_db_mtime and sj_mtime == self._sessions_json_mtime: + return # Nothing changed since last poll — skip entirely + + self._state_db_mtime = db_mtime + entries = self._cached_sessions_index + + for session_key, entry in entries.items(): + session_id = entry.get("session_id", "") + if not session_id: + continue + + last_seen = self._last_poll_timestamps.get(session_key, 0.0) + + try: + messages = db.get_messages(session_id) + except Exception: + continue + + if not messages: + continue + + # Normalize timestamps to float for comparison + def _ts_float(ts) -> float: + if isinstance(ts, (int, float)): + return float(ts) + if isinstance(ts, str) and ts: + try: + return float(ts) + except ValueError: + # ISO string — parse to epoch + try: + from datetime import datetime + return datetime.fromisoformat(ts).timestamp() + except Exception: + return 0.0 + return 0.0 + + # Find messages newer than our last seen timestamp + new_messages = [] + for msg in messages: + ts = _ts_float(msg.get("timestamp", 0)) + role = msg.get("role", "") + if role not in ("user", "assistant"): + continue + if ts > last_seen: + new_messages.append(msg) + + for msg in new_messages: + content = _extract_message_content(msg) + if not content: + continue + self._enqueue(QueueEvent( + cursor=0, + type="message", + session_key=session_key, + data={ + "role": msg.get("role", ""), + "content": content[:500], + "timestamp": str(msg.get("timestamp", "")), + "message_id": str(msg.get("id", "")), + }, + )) + + # Update last seen to the most recent message timestamp + all_ts = [_ts_float(m.get("timestamp", 0)) for m in messages] + if all_ts: + latest = max(all_ts) + if latest > last_seen: + self._last_poll_timestamps[session_key] = latest + + +# --------------------------------------------------------------------------- +# MCP Server +# --------------------------------------------------------------------------- + +def create_mcp_server(event_bridge: Optional[EventBridge] = None) -> "FastMCP": + """Create and return the Hermes MCP server with all tools registered.""" + if not _MCP_SERVER_AVAILABLE: + raise ImportError( + "MCP server requires the 'mcp' package. " + "Install with: pip install 'hermes-agent[mcp]'" + ) + + mcp = FastMCP( + "hermes", + instructions=( + "Hermes Agent messaging bridge. Use these tools to interact with " + "conversations across Telegram, Discord, Slack, WhatsApp, Signal, " + "Matrix, and other connected platforms." + ), + ) + + bridge = event_bridge or EventBridge() + + # -- conversations_list ------------------------------------------------ + + @mcp.tool() + def conversations_list( + platform: Optional[str] = None, + limit: int = 50, + search: Optional[str] = None, + ) -> str: + """List active messaging conversations across connected platforms. + + Returns conversations with their session keys (needed for messages_read), + platform, chat type, display name, and last activity time. + + Args: + platform: Filter by platform name (telegram, discord, slack, etc.) + limit: Maximum number of conversations to return (default 50) + search: Optional text to filter conversations by name + """ + entries = _load_sessions_index() + conversations = [] + + for key, entry in entries.items(): + origin = entry.get("origin", {}) + entry_platform = entry.get("platform") or origin.get("platform", "") + + if platform and entry_platform.lower() != platform.lower(): + continue + + display_name = entry.get("display_name", "") + chat_name = origin.get("chat_name", "") + if search: + search_lower = search.lower() + if (search_lower not in display_name.lower() + and search_lower not in chat_name.lower() + and search_lower not in key.lower()): + continue + + conversations.append({ + "session_key": key, + "session_id": entry.get("session_id", ""), + "platform": entry_platform, + "chat_type": entry.get("chat_type", origin.get("chat_type", "")), + "display_name": display_name, + "chat_name": chat_name, + "user_name": origin.get("user_name", ""), + "updated_at": entry.get("updated_at", ""), + }) + + conversations.sort(key=lambda c: c.get("updated_at", ""), reverse=True) + conversations = conversations[:limit] + + return json.dumps({ + "count": len(conversations), + "conversations": conversations, + }, indent=2) + + # -- conversation_get -------------------------------------------------- + + @mcp.tool() + def conversation_get(session_key: str) -> str: + """Get detailed info about one conversation by its session key. + + Args: + session_key: The session key from conversations_list + """ + entries = _load_sessions_index() + entry = entries.get(session_key) + + if not entry: + return json.dumps({"error": f"Conversation not found: {session_key}"}) + + origin = entry.get("origin", {}) + return json.dumps({ + "session_key": session_key, + "session_id": entry.get("session_id", ""), + "platform": entry.get("platform") or origin.get("platform", ""), + "chat_type": entry.get("chat_type", origin.get("chat_type", "")), + "display_name": entry.get("display_name", ""), + "user_name": origin.get("user_name", ""), + "chat_name": origin.get("chat_name", ""), + "chat_id": origin.get("chat_id", ""), + "thread_id": origin.get("thread_id"), + "updated_at": entry.get("updated_at", ""), + "created_at": entry.get("created_at", ""), + "input_tokens": entry.get("input_tokens", 0), + "output_tokens": entry.get("output_tokens", 0), + "total_tokens": entry.get("total_tokens", 0), + }, indent=2) + + # -- messages_read ----------------------------------------------------- + + @mcp.tool() + def messages_read( + session_key: str, + limit: int = 50, + ) -> str: + """Read recent messages from a conversation. + + Returns the message history in chronological order with role, content, + and timestamp for each message. + + Args: + session_key: The session key from conversations_list + limit: Maximum number of messages to return (default 50, most recent) + """ + entries = _load_sessions_index() + entry = entries.get(session_key) + if not entry: + return json.dumps({"error": f"Conversation not found: {session_key}"}) + + session_id = entry.get("session_id", "") + if not session_id: + return json.dumps({"error": "No session ID for this conversation"}) + + db = _get_session_db() + if not db: + return json.dumps({"error": "Session database unavailable"}) + + try: + all_messages = db.get_messages(session_id) + except Exception as e: + return json.dumps({"error": f"Failed to read messages: {e}"}) + + filtered = [] + for msg in all_messages: + role = msg.get("role", "") + if role in ("user", "assistant"): + content = _extract_message_content(msg) + if content: + filtered.append({ + "id": str(msg.get("id", "")), + "role": role, + "content": content[:2000], + "timestamp": msg.get("timestamp", ""), + }) + + messages = filtered[-limit:] + + return json.dumps({ + "session_key": session_key, + "count": len(messages), + "total_in_session": len(filtered), + "messages": messages, + }, indent=2) + + # -- attachments_fetch ------------------------------------------------- + + @mcp.tool() + def attachments_fetch( + session_key: str, + message_id: str, + ) -> str: + """List non-text attachments for a message in a conversation. + + Extracts images, media files, and other non-text content blocks + from the specified message. + + Args: + session_key: The session key from conversations_list + message_id: The message ID from messages_read + """ + entries = _load_sessions_index() + entry = entries.get(session_key) + if not entry: + return json.dumps({"error": f"Conversation not found: {session_key}"}) + + session_id = entry.get("session_id", "") + if not session_id: + return json.dumps({"error": "No session ID for this conversation"}) + + db = _get_session_db() + if not db: + return json.dumps({"error": "Session database unavailable"}) + + try: + all_messages = db.get_messages(session_id) + except Exception as e: + return json.dumps({"error": f"Failed to read messages: {e}"}) + + # Find the target message + target_msg = None + for msg in all_messages: + if str(msg.get("id", "")) == message_id: + target_msg = msg + break + + if not target_msg: + return json.dumps({"error": f"Message not found: {message_id}"}) + + attachments = _extract_attachments(target_msg) + + return json.dumps({ + "message_id": message_id, + "count": len(attachments), + "attachments": attachments, + }, indent=2) + + # -- events_poll ------------------------------------------------------- + + @mcp.tool() + def events_poll( + after_cursor: int = 0, + session_key: Optional[str] = None, + limit: int = 20, + ) -> str: + """Poll for new conversation events since a cursor position. + + Returns events that have occurred since the given cursor. Use the + returned next_cursor value for subsequent polls. + + Event types: message, approval_requested, approval_resolved + + Args: + after_cursor: Return events after this cursor (0 for all) + session_key: Optional filter to one conversation + limit: Maximum events to return (default 20) + """ + result = bridge.poll_events( + after_cursor=after_cursor, + session_key=session_key, + limit=limit, + ) + return json.dumps(result, indent=2) + + # -- events_wait ------------------------------------------------------- + + @mcp.tool() + def events_wait( + after_cursor: int = 0, + session_key: Optional[str] = None, + timeout_ms: int = 30000, + ) -> str: + """Wait for the next conversation event (long-poll). + + Blocks until a matching event arrives or the timeout expires. + Use this for near-real-time event delivery without polling. + + Args: + after_cursor: Wait for events after this cursor + session_key: Optional filter to one conversation + timeout_ms: Maximum wait time in milliseconds (default 30000) + """ + event = bridge.wait_for_event( + after_cursor=after_cursor, + session_key=session_key, + timeout_ms=min(timeout_ms, 300000), # Cap at 5 minutes + ) + if event: + return json.dumps({"event": event}, indent=2) + return json.dumps({"event": None, "reason": "timeout"}, indent=2) + + # -- messages_send ----------------------------------------------------- + + @mcp.tool() + def messages_send( + target: str, + message: str, + ) -> str: + """Send a message to a platform conversation. + + The target format is "platform:chat_id" — same format used by the + channels_list tool. You can also use human-friendly channel names + that will be resolved automatically. + + Examples: + target="telegram:6308981865" + target="discord:#general" + target="slack:#engineering" + + Args: + target: Platform target in "platform:identifier" format + message: The message text to send + """ + if not target or not message: + return json.dumps({"error": "Both target and message are required"}) + + try: + from tools.send_message_tool import send_message_tool + result_str = send_message_tool( + {"action": "send", "target": target, "message": message} + ) + return result_str + except ImportError: + return json.dumps({"error": "Send message tool not available"}) + except Exception as e: + return json.dumps({"error": f"Send failed: {e}"}) + + # -- channels_list ----------------------------------------------------- + + @mcp.tool() + def channels_list(platform: Optional[str] = None) -> str: + """List available messaging channels and targets across platforms. + + Returns channels that you can send messages to. The target strings + returned here can be used directly with the messages_send tool. + + Args: + platform: Filter by platform name (telegram, discord, slack, etc.) + """ + directory = _load_channel_directory() + if not directory: + entries = _load_sessions_index() + targets = [] + seen = set() + for key, entry in entries.items(): + origin = entry.get("origin", {}) + p = entry.get("platform") or origin.get("platform", "") + chat_id = origin.get("chat_id", "") + if not p or not chat_id: + continue + if platform and p.lower() != platform.lower(): + continue + target_str = f"{p}:{chat_id}" + if target_str in seen: + continue + seen.add(target_str) + targets.append({ + "target": target_str, + "platform": p, + "name": entry.get("display_name") or origin.get("chat_name", ""), + "chat_type": entry.get("chat_type", origin.get("chat_type", "")), + }) + return json.dumps({"count": len(targets), "channels": targets}, indent=2) + + channels = [] + for plat, entries_list in directory.items(): + if platform and plat.lower() != platform.lower(): + continue + if isinstance(entries_list, list): + for ch in entries_list: + if isinstance(ch, dict): + chat_id = ch.get("id", ch.get("chat_id", "")) + channels.append({ + "target": f"{plat}:{chat_id}" if chat_id else plat, + "platform": plat, + "name": ch.get("name", ch.get("display_name", "")), + "chat_type": ch.get("type", ""), + }) + + return json.dumps({"count": len(channels), "channels": channels}, indent=2) + + # -- permissions_list_open --------------------------------------------- + + @mcp.tool() + def permissions_list_open() -> str: + """List pending approval requests observed during this bridge session. + + Returns exec and plugin approval requests that the bridge has seen + since it started. Approvals are live-session only — older approvals + from before the bridge connected are not included. + """ + approvals = bridge.list_pending_approvals() + return json.dumps({ + "count": len(approvals), + "approvals": approvals, + }, indent=2) + + # -- permissions_respond ----------------------------------------------- + + @mcp.tool() + def permissions_respond( + id: str, + decision: str, + ) -> str: + """Respond to a pending approval request. + + Args: + id: The approval ID from permissions_list_open + decision: One of "allow-once", "allow-always", or "deny" + """ + if decision not in ("allow-once", "allow-always", "deny"): + return json.dumps({ + "error": f"Invalid decision: {decision}. " + f"Must be allow-once, allow-always, or deny" + }) + + result = bridge.respond_to_approval(id, decision) + return json.dumps(result, indent=2) + + return mcp + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + +def run_mcp_server(verbose: bool = False) -> None: + """Start the Hermes MCP server on stdio.""" + if not _MCP_SERVER_AVAILABLE: + print( + "Error: MCP server requires the 'mcp' package.\n" + "Install with: pip install 'hermes-agent[mcp]'", + file=sys.stderr, + ) + sys.exit(1) + + if verbose: + logging.basicConfig(level=logging.DEBUG, stream=sys.stderr) + else: + logging.basicConfig(level=logging.WARNING, stream=sys.stderr) + + bridge = EventBridge() + bridge.start() + + server = create_mcp_server(event_bridge=bridge) + + import asyncio + + async def _run(): + try: + await server.run_stdio_async() + finally: + bridge.stop() + + try: + asyncio.run(_run()) + except KeyboardInterrupt: + bridge.stop() diff --git a/tests/test_mcp_serve.py b/tests/test_mcp_serve.py new file mode 100644 index 00000000..9dc013ca --- /dev/null +++ b/tests/test_mcp_serve.py @@ -0,0 +1,1111 @@ +""" +Tests for mcp_serve — Hermes MCP server. + +Three layers of tests: +1. Unit tests — helpers, content extraction, attachment parsing +2. EventBridge tests — queue mechanics, cursors, waiters, concurrency +3. End-to-end tests — call actual MCP tools through FastMCP's tool manager + with real session data in SQLite and sessions.json +""" + +import asyncio +import json +import os +import sqlite3 +import time +import threading +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture(autouse=True) +def _isolate_hermes_home(tmp_path, monkeypatch): + """Redirect HERMES_HOME to a temp directory.""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + try: + import hermes_constants + monkeypatch.setattr(hermes_constants, "get_hermes_home", lambda: tmp_path) + except (ImportError, AttributeError): + pass + return tmp_path + + +@pytest.fixture +def sessions_dir(tmp_path): + sdir = tmp_path / "sessions" + sdir.mkdir(parents=True, exist_ok=True) + return sdir + + +@pytest.fixture +def sample_sessions(): + return { + "agent:main:telegram:dm:123456": { + "session_key": "agent:main:telegram:dm:123456", + "session_id": "20260329_120000_abc123", + "platform": "telegram", + "chat_type": "dm", + "display_name": "Alice", + "created_at": "2026-03-29T12:00:00", + "updated_at": "2026-03-29T14:30:00", + "input_tokens": 50000, + "output_tokens": 2000, + "total_tokens": 52000, + "origin": { + "platform": "telegram", + "chat_id": "123456", + "chat_name": "Alice", + "chat_type": "dm", + "user_id": "123456", + "user_name": "Alice", + "thread_id": None, + "chat_topic": None, + }, + }, + "agent:main:discord:group:789:456": { + "session_key": "agent:main:discord:group:789:456", + "session_id": "20260329_100000_def456", + "platform": "discord", + "chat_type": "group", + "display_name": "Bob", + "created_at": "2026-03-29T10:00:00", + "updated_at": "2026-03-29T13:00:00", + "input_tokens": 30000, + "output_tokens": 1000, + "total_tokens": 31000, + "origin": { + "platform": "discord", + "chat_id": "789", + "chat_name": "#general", + "chat_type": "group", + "user_id": "456", + "user_name": "Bob", + "thread_id": None, + "chat_topic": None, + }, + }, + "agent:main:slack:group:C1234:U5678": { + "session_key": "agent:main:slack:group:C1234:U5678", + "session_id": "20260328_090000_ghi789", + "platform": "slack", + "chat_type": "group", + "display_name": "Carol", + "created_at": "2026-03-28T09:00:00", + "updated_at": "2026-03-28T11:00:00", + "input_tokens": 10000, + "output_tokens": 500, + "total_tokens": 10500, + "origin": { + "platform": "slack", + "chat_id": "C1234", + "chat_name": "#engineering", + "chat_type": "group", + "user_id": "U5678", + "user_name": "Carol", + "thread_id": None, + "chat_topic": None, + }, + }, + } + + +@pytest.fixture +def populated_sessions_dir(sessions_dir, sample_sessions): + (sessions_dir / "sessions.json").write_text(json.dumps(sample_sessions)) + return sessions_dir + + +def _create_test_db(db_path, session_id, messages): + """Create a minimal SQLite DB mimicking hermes_state schema.""" + conn = sqlite3.connect(str(db_path)) + conn.execute(""" + CREATE TABLE IF NOT EXISTS sessions ( + id TEXT PRIMARY KEY, + source TEXT DEFAULT 'cli', + started_at TEXT, + message_count INTEGER DEFAULT 0 + ) + """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT NOT NULL, + role TEXT NOT NULL, + content TEXT, + tool_call_id TEXT, + tool_calls TEXT, + tool_name TEXT, + timestamp TEXT, + token_count INTEGER DEFAULT 0, + finish_reason TEXT, + reasoning TEXT, + reasoning_details TEXT, + codex_reasoning_items TEXT + ) + """) + conn.execute( + "INSERT OR IGNORE INTO sessions (id, source, started_at, message_count) VALUES (?, 'gateway', ?, ?)", + (session_id, "2026-03-29T12:00:00", len(messages)), + ) + for msg in messages: + content = msg.get("content", "") + if isinstance(content, (list, dict)): + content = json.dumps(content) + conn.execute( + "INSERT INTO messages (session_id, role, content, timestamp, tool_calls) VALUES (?, ?, ?, ?, ?)", + (session_id, msg["role"], content, + msg.get("timestamp", "2026-03-29T12:00:00"), + json.dumps(msg["tool_calls"]) if msg.get("tool_calls") else None), + ) + conn.commit() + conn.close() + + +@pytest.fixture +def mock_session_db(tmp_path, populated_sessions_dir): + """Create a real SQLite DB with test messages and wire it up.""" + db_path = tmp_path / "state.db" + messages = [ + {"role": "user", "content": "Hello Alice!", "timestamp": "2026-03-29T12:00:01"}, + {"role": "assistant", "content": "Hi! How can I help?", "timestamp": "2026-03-29T12:00:05"}, + {"role": "user", "content": "Check the image MEDIA: /tmp/screenshot.png please", + "timestamp": "2026-03-29T12:01:00"}, + {"role": "assistant", "content": "I see the screenshot. It shows a terminal.", + "timestamp": "2026-03-29T12:01:10"}, + {"role": "tool", "content": '{"result": "ok"}', "timestamp": "2026-03-29T12:01:15"}, + {"role": "user", "content": "Thanks!", "timestamp": "2026-03-29T12:02:00"}, + ] + _create_test_db(db_path, "20260329_120000_abc123", messages) + + # Create a mock SessionDB that reads from our test DB + class TestSessionDB: + def __init__(self): + self._db_path = db_path + + def get_messages(self, session_id): + conn = sqlite3.connect(str(self._db_path)) + conn.row_factory = sqlite3.Row + rows = conn.execute( + "SELECT * FROM messages WHERE session_id = ? ORDER BY id", + (session_id,), + ).fetchall() + conn.close() + result = [] + for r in rows: + d = dict(r) + if d.get("tool_calls"): + d["tool_calls"] = json.loads(d["tool_calls"]) + result.append(d) + return result + + return TestSessionDB() + + +# --------------------------------------------------------------------------- +# 1. UNIT TESTS — helpers, extraction, attachments +# --------------------------------------------------------------------------- + +class TestImports: + def test_import_module(self): + import mcp_serve + assert hasattr(mcp_serve, "create_mcp_server") + assert hasattr(mcp_serve, "run_mcp_server") + assert hasattr(mcp_serve, "EventBridge") + + def test_mcp_available_flag(self): + import mcp_serve + assert isinstance(mcp_serve._MCP_SERVER_AVAILABLE, bool) + + +class TestHelpers: + def test_get_sessions_dir(self, tmp_path): + from mcp_serve import _get_sessions_dir + result = _get_sessions_dir() + assert result == tmp_path / "sessions" + + def test_load_sessions_index_empty(self, sessions_dir, monkeypatch): + import mcp_serve + monkeypatch.setattr(mcp_serve, "_get_sessions_dir", lambda: sessions_dir) + assert mcp_serve._load_sessions_index() == {} + + def test_load_sessions_index_with_data(self, populated_sessions_dir, monkeypatch): + import mcp_serve + monkeypatch.setattr(mcp_serve, "_get_sessions_dir", lambda: populated_sessions_dir) + result = mcp_serve._load_sessions_index() + assert len(result) == 3 + + def test_load_sessions_index_corrupt(self, sessions_dir, monkeypatch): + (sessions_dir / "sessions.json").write_text("not json!") + import mcp_serve + monkeypatch.setattr(mcp_serve, "_get_sessions_dir", lambda: sessions_dir) + assert mcp_serve._load_sessions_index() == {} + + +class TestContentExtraction: + def test_text(self): + from mcp_serve import _extract_message_content + assert _extract_message_content({"content": "Hello"}) == "Hello" + + def test_multipart(self): + from mcp_serve import _extract_message_content + msg = {"content": [ + {"type": "text", "text": "A"}, + {"type": "image", "url": "http://x.com/i.png"}, + {"type": "text", "text": "B"}, + ]} + assert _extract_message_content(msg) == "A\nB" + + def test_empty(self): + from mcp_serve import _extract_message_content + assert _extract_message_content({"content": ""}) == "" + assert _extract_message_content({}) == "" + assert _extract_message_content({"content": None}) == "" + + +class TestAttachmentExtraction: + def test_image_url_block(self): + from mcp_serve import _extract_attachments + msg = {"content": [ + {"type": "image_url", "image_url": {"url": "http://x.com/pic.jpg"}}, + ]} + att = _extract_attachments(msg) + assert len(att) == 1 + assert att[0] == {"type": "image", "url": "http://x.com/pic.jpg"} + + def test_media_tag_in_text(self): + from mcp_serve import _extract_attachments + msg = {"content": "Here MEDIA: /tmp/out.png done"} + att = _extract_attachments(msg) + assert len(att) == 1 + assert att[0] == {"type": "media", "path": "/tmp/out.png"} + + def test_multiple_media_tags(self): + from mcp_serve import _extract_attachments + msg = {"content": "MEDIA: /a.png and MEDIA: /b.mp3"} + assert len(_extract_attachments(msg)) == 2 + + def test_no_attachments(self): + from mcp_serve import _extract_attachments + assert _extract_attachments({"content": "plain text"}) == [] + + def test_image_content_block(self): + from mcp_serve import _extract_attachments + msg = {"content": [{"type": "image", "url": "http://x.com/p.png"}]} + att = _extract_attachments(msg) + assert att[0]["type"] == "image" + + +# --------------------------------------------------------------------------- +# 2. EVENT BRIDGE TESTS — queue, cursors, waiters, concurrency +# --------------------------------------------------------------------------- + +class TestEventBridge: + def test_create(self): + from mcp_serve import EventBridge + b = EventBridge() + assert b._cursor == 0 + assert b._queue == [] + + def test_enqueue_and_poll(self): + from mcp_serve import EventBridge, QueueEvent + b = EventBridge() + b._enqueue(QueueEvent(cursor=0, type="message", session_key="k1", + data={"content": "hi"})) + r = b.poll_events(after_cursor=0) + assert len(r["events"]) == 1 + assert r["events"][0]["type"] == "message" + assert r["next_cursor"] == 1 + + def test_cursor_filter(self): + from mcp_serve import EventBridge, QueueEvent + b = EventBridge() + for i in range(5): + b._enqueue(QueueEvent(cursor=0, type="message", session_key=f"s{i}")) + r = b.poll_events(after_cursor=3) + assert len(r["events"]) == 2 + assert r["events"][0]["session_key"] == "s3" + + def test_session_filter(self): + from mcp_serve import EventBridge, QueueEvent + b = EventBridge() + b._enqueue(QueueEvent(cursor=0, type="message", session_key="a")) + b._enqueue(QueueEvent(cursor=0, type="message", session_key="b")) + b._enqueue(QueueEvent(cursor=0, type="message", session_key="a")) + r = b.poll_events(after_cursor=0, session_key="a") + assert len(r["events"]) == 2 + + def test_poll_empty(self): + from mcp_serve import EventBridge + r = EventBridge().poll_events(after_cursor=0) + assert r["events"] == [] + assert r["next_cursor"] == 0 + + def test_poll_limit(self): + from mcp_serve import EventBridge, QueueEvent + b = EventBridge() + for i in range(10): + b._enqueue(QueueEvent(cursor=0, type="message", session_key=f"s{i}")) + r = b.poll_events(after_cursor=0, limit=3) + assert len(r["events"]) == 3 + + def test_wait_immediate(self): + from mcp_serve import EventBridge, QueueEvent + b = EventBridge() + b._enqueue(QueueEvent(cursor=0, type="message", session_key="t", + data={"content": "hi"})) + event = b.wait_for_event(after_cursor=0, timeout_ms=100) + assert event is not None + assert event["type"] == "message" + + def test_wait_timeout(self): + from mcp_serve import EventBridge + start = time.monotonic() + event = EventBridge().wait_for_event(after_cursor=0, timeout_ms=150) + assert event is None + assert time.monotonic() - start >= 0.1 + + def test_wait_wakes_on_enqueue(self): + from mcp_serve import EventBridge, QueueEvent + b = EventBridge() + result = [None] + + def waiter(): + result[0] = b.wait_for_event(after_cursor=0, timeout_ms=5000) + + t = threading.Thread(target=waiter) + t.start() + time.sleep(0.05) + b._enqueue(QueueEvent(cursor=0, type="message", session_key="wake")) + t.join(timeout=2) + assert result[0] is not None + assert result[0]["session_key"] == "wake" + + def test_queue_limit(self): + from mcp_serve import EventBridge, QueueEvent, QUEUE_LIMIT + b = EventBridge() + for i in range(QUEUE_LIMIT + 50): + b._enqueue(QueueEvent(cursor=0, type="message", session_key=f"s{i}")) + assert len(b._queue) == QUEUE_LIMIT + + def test_concurrent_enqueue(self): + from mcp_serve import EventBridge, QueueEvent + b = EventBridge() + errors = [] + + def batch(start): + try: + for i in range(100): + b._enqueue(QueueEvent(cursor=0, type="message", + session_key=f"s{start}_{i}")) + except Exception as e: + errors.append(e) + + threads = [threading.Thread(target=batch, args=(i,)) for i in range(5)] + for t in threads: + t.start() + for t in threads: + t.join() + assert not errors + assert len(b._queue) == 500 + assert b._cursor == 500 + + def test_approvals_lifecycle(self): + from mcp_serve import EventBridge + b = EventBridge() + b._pending_approvals["a1"] = { + "id": "a1", "kind": "exec", + "description": "rm -rf /tmp", + "session_key": "test", "created_at": "2026-03-29T12:00:00", + } + assert len(b.list_pending_approvals()) == 1 + result = b.respond_to_approval("a1", "deny") + assert result["resolved"] is True + assert len(b.list_pending_approvals()) == 0 + + def test_respond_nonexistent(self): + from mcp_serve import EventBridge + r = EventBridge().respond_to_approval("nope", "deny") + assert "error" in r + + +# --------------------------------------------------------------------------- +# 3. END-TO-END TESTS — call MCP tools through FastMCP server +# --------------------------------------------------------------------------- + +@pytest.fixture +def mcp_server_e2e(populated_sessions_dir, mock_session_db, monkeypatch): + """Create a fully wired MCP server for E2E testing.""" + mcp = pytest.importorskip("mcp", reason="MCP SDK not installed") + import mcp_serve + monkeypatch.setattr(mcp_serve, "_get_sessions_dir", lambda: populated_sessions_dir) + monkeypatch.setattr(mcp_serve, "_get_session_db", lambda: mock_session_db) + monkeypatch.setattr(mcp_serve, "_load_channel_directory", lambda: {}) + + bridge = mcp_serve.EventBridge() + server = mcp_serve.create_mcp_server(event_bridge=bridge) + return server, bridge + + +def _run_tool(server, name, args=None): + """Call an MCP tool through FastMCP's tool manager and return parsed JSON.""" + result = asyncio.get_event_loop().run_until_complete( + server._tool_manager.call_tool(name, args or {}) + ) + return json.loads(result) if isinstance(result, str) else result + + +@pytest.fixture +def _event_loop(): + """Ensure an event loop exists for sync tests calling async tools.""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + yield loop + loop.close() + + +class TestE2EConversationsList: + def test_list_all(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + result = _run_tool(server, "conversations_list") + assert result["count"] == 3 + platforms = {c["platform"] for c in result["conversations"]} + assert platforms == {"telegram", "discord", "slack"} + + def test_list_sorted_by_updated(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + result = _run_tool(server, "conversations_list") + keys = [c["session_key"] for c in result["conversations"]] + # Telegram (14:30) > Discord (13:00) > Slack (11:00) + assert keys[0] == "agent:main:telegram:dm:123456" + assert keys[1] == "agent:main:discord:group:789:456" + assert keys[2] == "agent:main:slack:group:C1234:U5678" + + def test_filter_by_platform(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + result = _run_tool(server, "conversations_list", {"platform": "discord"}) + assert result["count"] == 1 + assert result["conversations"][0]["platform"] == "discord" + + def test_filter_by_platform_case_insensitive(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + result = _run_tool(server, "conversations_list", {"platform": "TELEGRAM"}) + assert result["count"] == 1 + + def test_search_by_name(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + result = _run_tool(server, "conversations_list", {"search": "Alice"}) + assert result["count"] == 1 + assert result["conversations"][0]["display_name"] == "Alice" + + def test_search_no_match(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + result = _run_tool(server, "conversations_list", {"search": "nobody"}) + assert result["count"] == 0 + + def test_limit(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + result = _run_tool(server, "conversations_list", {"limit": 2}) + assert result["count"] == 2 + + +class TestE2EConversationGet: + def test_get_existing(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + result = _run_tool(server, "conversation_get", + {"session_key": "agent:main:telegram:dm:123456"}) + assert result["platform"] == "telegram" + assert result["display_name"] == "Alice" + assert result["chat_id"] == "123456" + assert result["input_tokens"] == 50000 + + def test_get_nonexistent(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + result = _run_tool(server, "conversation_get", + {"session_key": "nonexistent:key"}) + assert "error" in result + + +class TestE2EMessagesRead: + def test_read_messages(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + result = _run_tool(server, "messages_read", + {"session_key": "agent:main:telegram:dm:123456"}) + assert result["count"] > 0 + # Should filter out tool messages — only user/assistant + roles = {m["role"] for m in result["messages"]} + assert "tool" not in roles + assert "user" in roles + assert "assistant" in roles + + def test_read_messages_content(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + result = _run_tool(server, "messages_read", + {"session_key": "agent:main:telegram:dm:123456"}) + contents = [m["content"] for m in result["messages"]] + assert "Hello Alice!" in contents + assert "Hi! How can I help?" in contents + + def test_read_messages_have_ids(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + result = _run_tool(server, "messages_read", + {"session_key": "agent:main:telegram:dm:123456"}) + for msg in result["messages"]: + assert "id" in msg + assert msg["id"] # non-empty + + def test_read_with_limit(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + result = _run_tool(server, "messages_read", + {"session_key": "agent:main:telegram:dm:123456", + "limit": 2}) + assert result["count"] == 2 + + def test_read_nonexistent_session(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + result = _run_tool(server, "messages_read", + {"session_key": "nonexistent:key"}) + assert "error" in result + + +class TestE2EAttachmentsFetch: + def test_fetch_media_from_message(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + # First get message IDs + msgs = _run_tool(server, "messages_read", + {"session_key": "agent:main:telegram:dm:123456"}) + # Find the message with MEDIA: tag + media_msg = None + for m in msgs["messages"]: + if "MEDIA:" in m["content"]: + media_msg = m + break + assert media_msg is not None, "Should have a message with MEDIA: tag" + + result = _run_tool(server, "attachments_fetch", { + "session_key": "agent:main:telegram:dm:123456", + "message_id": media_msg["id"], + }) + assert result["count"] >= 1 + assert result["attachments"][0]["type"] == "media" + assert result["attachments"][0]["path"] == "/tmp/screenshot.png" + + def test_fetch_from_nonexistent_message(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + result = _run_tool(server, "attachments_fetch", { + "session_key": "agent:main:telegram:dm:123456", + "message_id": "99999", + }) + assert "error" in result + + def test_fetch_from_nonexistent_session(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + result = _run_tool(server, "attachments_fetch", { + "session_key": "nonexistent:key", + "message_id": "1", + }) + assert "error" in result + + +class TestE2EEventsPoll: + def test_poll_empty(self, mcp_server_e2e, _event_loop): + server, bridge = mcp_server_e2e + result = _run_tool(server, "events_poll") + assert result["events"] == [] + assert result["next_cursor"] == 0 + + def test_poll_with_events(self, mcp_server_e2e, _event_loop): + from mcp_serve import QueueEvent + server, bridge = mcp_server_e2e + bridge._enqueue(QueueEvent(cursor=0, type="message", + session_key="agent:main:telegram:dm:123456", + data={"role": "user", "content": "Hello"})) + bridge._enqueue(QueueEvent(cursor=0, type="message", + session_key="agent:main:telegram:dm:123456", + data={"role": "assistant", "content": "Hi"})) + + result = _run_tool(server, "events_poll") + assert len(result["events"]) == 2 + assert result["events"][0]["content"] == "Hello" + assert result["events"][1]["content"] == "Hi" + assert result["next_cursor"] == 2 + + def test_poll_cursor_pagination(self, mcp_server_e2e, _event_loop): + from mcp_serve import QueueEvent + server, bridge = mcp_server_e2e + for i in range(5): + bridge._enqueue(QueueEvent(cursor=0, type="message", + session_key=f"s{i}")) + + page1 = _run_tool(server, "events_poll", {"limit": 2}) + assert len(page1["events"]) == 2 + assert page1["next_cursor"] == 2 + + page2 = _run_tool(server, "events_poll", + {"after_cursor": page1["next_cursor"], "limit": 2}) + assert len(page2["events"]) == 2 + assert page2["next_cursor"] == 4 + + def test_poll_session_filter(self, mcp_server_e2e, _event_loop): + from mcp_serve import QueueEvent + server, bridge = mcp_server_e2e + bridge._enqueue(QueueEvent(cursor=0, type="message", session_key="a")) + bridge._enqueue(QueueEvent(cursor=0, type="message", session_key="b")) + bridge._enqueue(QueueEvent(cursor=0, type="message", session_key="a")) + + result = _run_tool(server, "events_poll", + {"session_key": "b"}) + assert len(result["events"]) == 1 + + +class TestE2EEventsWait: + def test_wait_timeout(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + result = _run_tool(server, "events_wait", {"timeout_ms": 100}) + assert result["event"] is None + assert result["reason"] == "timeout" + + def test_wait_with_existing_event(self, mcp_server_e2e, _event_loop): + from mcp_serve import QueueEvent + server, bridge = mcp_server_e2e + bridge._enqueue(QueueEvent(cursor=0, type="message", + session_key="test", + data={"content": "waiting for this"})) + result = _run_tool(server, "events_wait", {"timeout_ms": 100}) + assert result["event"] is not None + assert result["event"]["content"] == "waiting for this" + + def test_wait_caps_timeout(self, mcp_server_e2e, _event_loop): + """Timeout should be capped at 300000ms (5 min).""" + from mcp_serve import QueueEvent + server, bridge = mcp_server_e2e + bridge._enqueue(QueueEvent(cursor=0, type="message", session_key="t")) + # Even with huge timeout, should return immediately since event exists + result = _run_tool(server, "events_wait", {"timeout_ms": 999999}) + assert result["event"] is not None + + +class TestE2EMessagesSend: + def test_send_missing_args(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + result = _run_tool(server, "messages_send", {"target": "", "message": "hi"}) + assert "error" in result + + def test_send_delegates_to_tool(self, mcp_server_e2e, _event_loop, monkeypatch): + server, _ = mcp_server_e2e + mock = MagicMock(return_value=json.dumps({"success": True, "platform": "telegram"})) + monkeypatch.setattr("tools.send_message_tool.send_message_tool", mock) + + result = _run_tool(server, "messages_send", + {"target": "telegram:123456", "message": "Hello!"}) + assert result["success"] is True + mock.assert_called_once() + call_args = mock.call_args[0][0] + assert call_args["action"] == "send" + assert call_args["target"] == "telegram:123456" + + +class TestE2EChannelsList: + def test_channels_from_sessions(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + result = _run_tool(server, "channels_list") + assert result["count"] == 3 + targets = {c["target"] for c in result["channels"]} + assert "telegram:123456" in targets + assert "discord:789" in targets + assert "slack:C1234" in targets + + def test_channels_platform_filter(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + result = _run_tool(server, "channels_list", {"platform": "slack"}) + assert result["count"] == 1 + assert result["channels"][0]["target"] == "slack:C1234" + + def test_channels_with_directory(self, mcp_server_e2e, _event_loop, monkeypatch): + import mcp_serve + monkeypatch.setattr(mcp_serve, "_load_channel_directory", lambda: { + "telegram": [ + {"id": "123456", "name": "Alice", "type": "dm"}, + {"id": "-100999", "name": "Dev Group", "type": "group"}, + ], + }) + # Need to recreate server to pick up the new mock + server, bridge = mcp_server_e2e + # The tool closure already captured the old mock, so test the function directly + directory = mcp_serve._load_channel_directory() + assert len(directory["telegram"]) == 2 + + +class TestE2EPermissions: + def test_list_empty(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + result = _run_tool(server, "permissions_list_open") + assert result["count"] == 0 + assert result["approvals"] == [] + + def test_list_with_approvals(self, mcp_server_e2e, _event_loop): + server, bridge = mcp_server_e2e + bridge._pending_approvals["a1"] = { + "id": "a1", "kind": "exec", + "description": "sudo rm -rf /", + "session_key": "test", + "created_at": "2026-03-29T12:00:00", + } + result = _run_tool(server, "permissions_list_open") + assert result["count"] == 1 + assert result["approvals"][0]["id"] == "a1" + + def test_respond_allow(self, mcp_server_e2e, _event_loop): + server, bridge = mcp_server_e2e + bridge._pending_approvals["a1"] = {"id": "a1", "kind": "exec"} + result = _run_tool(server, "permissions_respond", + {"id": "a1", "decision": "allow-once"}) + assert result["resolved"] is True + assert result["decision"] == "allow-once" + # Should be gone now + check = _run_tool(server, "permissions_list_open") + assert check["count"] == 0 + + def test_respond_deny(self, mcp_server_e2e, _event_loop): + server, bridge = mcp_server_e2e + bridge._pending_approvals["a2"] = {"id": "a2", "kind": "plugin"} + result = _run_tool(server, "permissions_respond", + {"id": "a2", "decision": "deny"}) + assert result["resolved"] is True + + def test_respond_invalid_decision(self, mcp_server_e2e, _event_loop): + server, bridge = mcp_server_e2e + bridge._pending_approvals["a3"] = {"id": "a3", "kind": "exec"} + result = _run_tool(server, "permissions_respond", + {"id": "a3", "decision": "maybe"}) + assert "error" in result + + def test_respond_nonexistent(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + result = _run_tool(server, "permissions_respond", + {"id": "nope", "decision": "deny"}) + assert "error" in result + + +# --------------------------------------------------------------------------- +# 4. TOOL LISTING — verify all 10 tools are registered +# --------------------------------------------------------------------------- + +class TestToolRegistration: + def test_all_tools_registered(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + tools = server._tool_manager.list_tools() + tool_names = {t.name for t in tools} + + expected = { + "conversations_list", "conversation_get", "messages_read", + "attachments_fetch", "events_poll", "events_wait", + "messages_send", "channels_list", + "permissions_list_open", "permissions_respond", + } + assert expected == tool_names, f"Missing: {expected - tool_names}, Extra: {tool_names - expected}" + + def test_tools_have_descriptions(self, mcp_server_e2e, _event_loop): + server, _ = mcp_server_e2e + for tool in server._tool_manager.list_tools(): + assert tool.description, f"Tool {tool.name} has no description" + + +# --------------------------------------------------------------------------- +# 5. SERVER LIFECYCLE / CLI INTEGRATION +# --------------------------------------------------------------------------- + +class TestServerCreation: + def test_create_server(self, populated_sessions_dir, monkeypatch): + pytest.importorskip("mcp", reason="MCP SDK not installed") + import mcp_serve + monkeypatch.setattr(mcp_serve, "_get_sessions_dir", lambda: populated_sessions_dir) + assert mcp_serve.create_mcp_server() is not None + + def test_create_with_bridge(self, populated_sessions_dir, monkeypatch): + pytest.importorskip("mcp", reason="MCP SDK not installed") + import mcp_serve + monkeypatch.setattr(mcp_serve, "_get_sessions_dir", lambda: populated_sessions_dir) + bridge = mcp_serve.EventBridge() + assert mcp_serve.create_mcp_server(event_bridge=bridge) is not None + + def test_create_without_mcp_sdk(self, monkeypatch): + import mcp_serve + monkeypatch.setattr(mcp_serve, "_MCP_SERVER_AVAILABLE", False) + with pytest.raises(ImportError, match="MCP server requires"): + mcp_serve.create_mcp_server() + + +class TestRunMcpServer: + def test_run_without_mcp_exits(self, monkeypatch): + import mcp_serve + monkeypatch.setattr(mcp_serve, "_MCP_SERVER_AVAILABLE", False) + with pytest.raises(SystemExit) as exc_info: + mcp_serve.run_mcp_server() + assert exc_info.value.code == 1 + + +class TestCliIntegration: + def test_parse_serve(self): + import argparse + parser = argparse.ArgumentParser() + subs = parser.add_subparsers(dest="command") + mcp_p = subs.add_parser("mcp") + mcp_sub = mcp_p.add_subparsers(dest="mcp_action") + serve_p = mcp_sub.add_parser("serve") + serve_p.add_argument("-v", "--verbose", action="store_true") + + args = parser.parse_args(["mcp", "serve"]) + assert args.mcp_action == "serve" + assert args.verbose is False + + def test_parse_serve_verbose(self): + import argparse + parser = argparse.ArgumentParser() + subs = parser.add_subparsers(dest="command") + mcp_p = subs.add_parser("mcp") + mcp_sub = mcp_p.add_subparsers(dest="mcp_action") + serve_p = mcp_sub.add_parser("serve") + serve_p.add_argument("-v", "--verbose", action="store_true") + + args = parser.parse_args(["mcp", "serve", "--verbose"]) + assert args.verbose is True + + def test_dispatcher_routes_serve(self, monkeypatch, tmp_path): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + mock_run = MagicMock() + monkeypatch.setattr("mcp_serve.run_mcp_server", mock_run) + + import argparse + args = argparse.Namespace(mcp_action="serve", verbose=True) + from hermes_cli.mcp_config import mcp_command + mcp_command(args) + mock_run.assert_called_once_with(verbose=True) + + +# --------------------------------------------------------------------------- +# 6. EDGE CASES +# --------------------------------------------------------------------------- + +class TestEdgeCases: + def test_empty_sessions_json(self, sessions_dir, monkeypatch): + (sessions_dir / "sessions.json").write_text("{}") + import mcp_serve + monkeypatch.setattr(mcp_serve, "_get_sessions_dir", lambda: sessions_dir) + assert mcp_serve._load_sessions_index() == {} + + def test_sessions_without_origin(self, sessions_dir, monkeypatch): + data = {"agent:main:telegram:dm:111": { + "session_key": "agent:main:telegram:dm:111", + "session_id": "20260329_120000_xyz", + "platform": "telegram", + "updated_at": "2026-03-29T12:00:00", + }} + (sessions_dir / "sessions.json").write_text(json.dumps(data)) + import mcp_serve + monkeypatch.setattr(mcp_serve, "_get_sessions_dir", lambda: sessions_dir) + entries = mcp_serve._load_sessions_index() + assert entries["agent:main:telegram:dm:111"]["platform"] == "telegram" + + def test_bridge_start_stop(self): + from mcp_serve import EventBridge + b = EventBridge() + assert not b._running + b._running = True + b.stop() + assert not b._running + + def test_truncation(self): + assert len(("x" * 5000)[:2000]) == 2000 + + +# --------------------------------------------------------------------------- +# 7. EVENT BRIDGE POLL LOOP E2E — real SQLite DB, mtime optimization +# --------------------------------------------------------------------------- + +class TestEventBridgePollE2E: + """End-to-end tests for the EventBridge polling loop with real files.""" + + def test_poll_detects_new_messages(self, tmp_path, monkeypatch): + """Write to SQLite + sessions.json, verify EventBridge picks it up.""" + import mcp_serve + sessions_dir = tmp_path / "sessions" + sessions_dir.mkdir() + monkeypatch.setattr(mcp_serve, "_get_sessions_dir", lambda: sessions_dir) + + session_id = "20260329_150000_poll_test" + db_path = tmp_path / "state.db" + + # Write sessions.json + sessions_data = { + "agent:main:telegram:dm:poll_test": { + "session_key": "agent:main:telegram:dm:poll_test", + "session_id": session_id, + "platform": "telegram", + "chat_type": "dm", + "display_name": "PollTest", + "updated_at": "2026-03-29T15:00:05", + "origin": {"platform": "telegram", "chat_id": "poll_test"}, + } + } + (sessions_dir / "sessions.json").write_text(json.dumps(sessions_data)) + + # Write messages to SQLite + messages = [ + {"role": "user", "content": "First message", + "timestamp": "2026-03-29T15:00:01"}, + {"role": "assistant", "content": "Reply", + "timestamp": "2026-03-29T15:00:03"}, + ] + _create_test_db(db_path, session_id, messages) + + # Create a mock SessionDB that reads our test DB + class TestDB: + def get_messages(self, sid): + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + rows = conn.execute( + "SELECT * FROM messages WHERE session_id = ? ORDER BY id", + (sid,), + ).fetchall() + conn.close() + return [dict(r) for r in rows] + + monkeypatch.setattr(mcp_serve, "_get_session_db", lambda: TestDB()) + + bridge = mcp_serve.EventBridge() + # Run one poll cycle manually + bridge._poll_once(TestDB()) + + # Should have found the messages + result = bridge.poll_events(after_cursor=0) + assert len(result["events"]) == 2 + assert result["events"][0]["role"] == "user" + assert result["events"][0]["content"] == "First message" + assert result["events"][1]["role"] == "assistant" + + def test_poll_skips_when_unchanged(self, tmp_path, monkeypatch): + """Second poll with no file changes should be a no-op.""" + import mcp_serve + sessions_dir = tmp_path / "sessions" + sessions_dir.mkdir() + monkeypatch.setattr(mcp_serve, "_get_sessions_dir", lambda: sessions_dir) + + session_id = "20260329_150000_skip_test" + db_path = tmp_path / "state.db" + + sessions_data = { + "agent:main:telegram:dm:skip": { + "session_key": "agent:main:telegram:dm:skip", + "session_id": session_id, + "platform": "telegram", + "updated_at": "2026-03-29T15:00:05", + "origin": {"platform": "telegram", "chat_id": "skip"}, + } + } + (sessions_dir / "sessions.json").write_text(json.dumps(sessions_data)) + _create_test_db(db_path, session_id, [ + {"role": "user", "content": "Hello", "timestamp": "2026-03-29T15:00:01"}, + ]) + + class TestDB: + def __init__(self): + self.call_count = 0 + + def get_messages(self, sid): + self.call_count += 1 + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + rows = conn.execute( + "SELECT * FROM messages WHERE session_id = ? ORDER BY id", + (sid,), + ).fetchall() + conn.close() + return [dict(r) for r in rows] + + db = TestDB() + bridge = mcp_serve.EventBridge() + + # First poll — should process + bridge._poll_once(db) + first_calls = db.call_count + assert first_calls >= 1 + + # Second poll — files unchanged, should skip entirely + bridge._poll_once(db) + assert db.call_count == first_calls, \ + "Second poll should skip DB queries when files unchanged" + + def test_poll_detects_new_message_after_db_write(self, tmp_path, monkeypatch): + """Write a new message to the DB after first poll, verify it's detected.""" + import mcp_serve + sessions_dir = tmp_path / "sessions" + sessions_dir.mkdir() + monkeypatch.setattr(mcp_serve, "_get_sessions_dir", lambda: sessions_dir) + + session_id = "20260329_150000_new_msg" + db_path = tmp_path / "state.db" + + sessions_data = { + "agent:main:telegram:dm:new": { + "session_key": "agent:main:telegram:dm:new", + "session_id": session_id, + "platform": "telegram", + "updated_at": "2026-03-29T15:00:05", + "origin": {"platform": "telegram", "chat_id": "new"}, + } + } + (sessions_dir / "sessions.json").write_text(json.dumps(sessions_data)) + _create_test_db(db_path, session_id, [ + {"role": "user", "content": "First", "timestamp": "2026-03-29T15:00:01"}, + ]) + + class TestDB: + def get_messages(self, sid): + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + rows = conn.execute( + "SELECT * FROM messages WHERE session_id = ? ORDER BY id", + (sid,), + ).fetchall() + conn.close() + return [dict(r) for r in rows] + + db = TestDB() + bridge = mcp_serve.EventBridge() + + # First poll + bridge._poll_once(db) + r1 = bridge.poll_events(after_cursor=0) + assert len(r1["events"]) == 1 + + # Add a new message to the DB + conn = sqlite3.connect(str(db_path)) + conn.execute( + "INSERT INTO messages (session_id, role, content, timestamp) VALUES (?, ?, ?, ?)", + (session_id, "assistant", "New reply!", "2026-03-29T15:00:10"), + ) + conn.commit() + conn.close() + # Touch the DB file to update mtime (WAL mode may not update mtime on small writes) + os.utime(db_path, None) + + # Update sessions.json updated_at to trigger re-check + sessions_data["agent:main:telegram:dm:new"]["updated_at"] = "2026-03-29T15:00:10" + (sessions_dir / "sessions.json").write_text(json.dumps(sessions_data)) + + # Second poll — should detect the new message + bridge._poll_once(db) + r2 = bridge.poll_events(after_cursor=r1["next_cursor"]) + assert len(r2["events"]) == 1 + assert r2["events"][0]["content"] == "New reply!" + + def test_poll_interval_is_200ms(self): + """Verify the poll interval constant.""" + from mcp_serve import POLL_INTERVAL + assert POLL_INTERVAL == 0.2 diff --git a/website/docs/reference/cli-commands.md b/website/docs/reference/cli-commands.md index 57b9d35b..e1ef0b17 100644 --- a/website/docs/reference/cli-commands.md +++ b/website/docs/reference/cli-commands.md @@ -384,17 +384,18 @@ See [ACP Editor Integration](../user-guide/features/acp.md) and [ACP Internals]( hermes mcp ``` -Manage MCP (Model Context Protocol) server configurations. +Manage MCP (Model Context Protocol) server configurations and run Hermes as an MCP server. | Subcommand | Description | |------------|-------------| +| `serve [-v\|--verbose]` | Run Hermes as an MCP server — expose conversations to other agents. | | `add [--url URL] [--command CMD] [--args ...] [--auth oauth\|header]` | Add an MCP server with automatic tool discovery. | | `remove ` (alias: `rm`) | Remove an MCP server from config. | | `list` (alias: `ls`) | List configured MCP servers. | | `test ` | Test connection to an MCP server. | | `configure ` (alias: `config`) | Toggle tool selection for a server. | -See [MCP Config Reference](./mcp-config-reference.md) and [Use MCP with Hermes](../guides/use-mcp-with-hermes.md). +See [MCP Config Reference](./mcp-config-reference.md), [Use MCP with Hermes](../guides/use-mcp-with-hermes.md), and [MCP Server Mode](../user-guide/features/mcp.md#running-hermes-as-an-mcp-server). ## `hermes plugins` diff --git a/website/docs/user-guide/features/mcp.md b/website/docs/user-guide/features/mcp.md index 15890015..9b8326d4 100644 --- a/website/docs/user-guide/features/mcp.md +++ b/website/docs/user-guide/features/mcp.md @@ -403,6 +403,105 @@ Because Hermes now only registers those wrappers when both are true: This is intentional and keeps the tool list honest. +## Running Hermes as an MCP server + +In addition to connecting **to** MCP servers, Hermes can also **be** an MCP server. This lets other MCP-capable agents (Claude Code, Cursor, Codex, or any MCP client) use Hermes's messaging capabilities — list conversations, read message history, and send messages across all your connected platforms. + +### When to use this + +- You want Claude Code, Cursor, or another coding agent to send and read Telegram/Discord/Slack messages through Hermes +- You want a single MCP server that bridges to all of Hermes's connected messaging platforms at once +- You already have a running Hermes gateway with connected platforms + +### Quick start + +```bash +hermes mcp serve +``` + +This starts a stdio MCP server. The MCP client (not you) manages the process lifecycle. + +### MCP client configuration + +Add Hermes to your MCP client config. For example, in Claude Code's `~/.claude/claude_desktop_config.json`: + +```json +{ + "mcpServers": { + "hermes": { + "command": "hermes", + "args": ["mcp", "serve"] + } + } +} +``` + +Or if you installed Hermes in a specific location: + +```json +{ + "mcpServers": { + "hermes": { + "command": "/home/user/.hermes/hermes-agent/venv/bin/hermes", + "args": ["mcp", "serve"] + } + } +} +``` + +### Available tools + +The MCP server exposes 10 tools, matching OpenClaw's channel bridge surface plus a Hermes-specific channel browser: + +| Tool | Description | +|------|-------------| +| `conversations_list` | List active messaging conversations. Filter by platform or search by name. | +| `conversation_get` | Get detailed info about one conversation by session key. | +| `messages_read` | Read recent message history for a conversation. | +| `attachments_fetch` | Extract non-text attachments (images, media) from a specific message. | +| `events_poll` | Poll for new conversation events since a cursor position. | +| `events_wait` | Long-poll / block until the next event arrives (near-real-time). | +| `messages_send` | Send a message through a platform (e.g. `telegram:123456`, `discord:#general`). | +| `channels_list` | List available messaging targets across all platforms. | +| `permissions_list_open` | List pending approval requests observed during this bridge session. | +| `permissions_respond` | Allow or deny a pending approval request. | + +### Event system + +The MCP server includes a live event bridge that polls Hermes's session database for new messages. This gives MCP clients near-real-time awareness of incoming conversations: + +``` +# Poll for new events (non-blocking) +events_poll(after_cursor=0) + +# Wait for next event (blocks up to timeout) +events_wait(after_cursor=42, timeout_ms=30000) +``` + +Event types: `message`, `approval_requested`, `approval_resolved` + +The event queue is in-memory and starts when the bridge connects. Older messages are available through `messages_read`. + +### Options + +```bash +hermes mcp serve # Normal mode +hermes mcp serve --verbose # Debug logging on stderr +``` + +### How it works + +The MCP server reads conversation data directly from Hermes's session store (`~/.hermes/sessions/sessions.json` and the SQLite database). A background thread polls the database for new messages and maintains an in-memory event queue. For sending messages, it uses the same `send_message` infrastructure as the Hermes agent itself. + +The gateway does NOT need to be running for read operations (listing conversations, reading history, polling events). It DOES need to be running for send operations, since the platform adapters need active connections. + +### Current limits + +- Stdio transport only (no HTTP MCP transport yet) +- Event polling at ~200ms intervals via mtime-optimized DB polling (skips work when files are unchanged) +- No `claude/channel` push notification protocol yet +- Text-only sends (no media/attachment sending through `messages_send`) + ## Related docs - [Use MCP with Hermes](/docs/guides/use-mcp-with-hermes)