Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
2b7b12baf9 feat: MCP server — expose hermes tools to fleet peers (#803)
Some checks are pending
Contributor Attribution Check / check-attribution (pull_request) Waiting to run
Docker Build and Publish / build-and-push (pull_request) Waiting to run
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Waiting to run
Tests / test (pull_request) Waiting to run
Tests / e2e (pull_request) Waiting to run
Resolves #803. Standalone MCP server that exposes safe hermes
tools to other fleet agents.

scripts/mcp_server.py:
- Exposes: terminal, file_read, file_search, web_search, session_search
- Blocks: approval, delegate, memory, config, cron, send_message
- Terminal uses approval.py dangerous command detection
- Auth via Bearer token (MCP_AUTH_KEY)
- HTTP endpoints: GET /mcp/tools, POST /mcp/tools/call, GET /health

Usage:
  python scripts/mcp_server.py --port 8081 --auth-key SECRET
  curl http://localhost:8081/mcp/tools
  curl -X POST http://localhost:8081/mcp/tools/call -d {"name":"file_read","arguments":{"path":"README.md"}}
2026-04-16 01:10:00 -04:00
3 changed files with 265 additions and 382 deletions

View File

@@ -1,288 +0,0 @@
"""Gemma 4 tool calling hardening — parse, validate, benchmark.
Gemma 4 has native multimodal function calling but its output format
may differ from OpenAI/Claude. This module provides:
1. Gemma4ToolParser — robust parsing for Gemma 4's tool call format
2. Parallel tool call detection and splitting
3. Tool call success rate tracking and benchmarking
4. Fallback parsing strategies for malformed output
Usage:
from agent.gemma4_tool_hardening import Gemma4ToolParser
parser = Gemma4ToolParser()
tool_calls = parser.parse(response_text)
"""
from __future__ import annotations
import json
import re
import time
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
@dataclass
class ToolCallAttempt:
"""Record of a single tool call parsing attempt."""
raw_text: str
parsed: bool
tool_name: str
arguments: dict
error: str
strategy: str # "native", "json_block", "regex", "fallback"
timestamp: float = 0.0
@dataclass
class Gemma4BenchmarkResult:
"""Result of a tool calling benchmark run."""
total_calls: int = 0
successful_parses: int = 0
parallel_calls: int = 0
strategies_used: Dict[str, int] = field(default_factory=dict)
avg_parse_time_ms: float = 0.0
success_rate: float = 0.0
errors: List[str] = field(default_factory=list)
def to_dict(self) -> dict:
return {
"total_calls": self.total_calls,
"successful_parses": self.successful_parses,
"parallel_calls": self.parallel_calls,
"success_rate": round(self.success_rate, 3),
"strategies_used": self.strategies_used,
"avg_parse_time_ms": round(self.avg_parse_time_ms, 2),
"error_count": len(self.errors),
"errors": self.errors[:10],
}
class Gemma4ToolParser:
"""Robust tool call parser for Gemma 4 output format.
Tries multiple parsing strategies in order:
1. Native OpenAI format (standard tool_calls)
2. JSON code blocks (```json ... ```)
3. Regex extraction (function_name + arguments patterns)
4. Heuristic fallback (best-effort extraction)
"""
# Patterns for Gemma 4 tool call formats
_JSON_BLOCK_PATTERN = re.compile(
r'```(?:json)?\s*\n?(.*?)\n?```',
re.DOTALL | re.IGNORECASE,
)
_FUNCTION_CALL_PATTERN = re.compile(
r'(?:function|tool|call)[:\s]*(\w+)\s*\(\s*({.*?})\s*\)',
re.DOTALL | re.IGNORECASE,
)
_GEMMA_INLINE_PATTERN = re.compile(
r'\[(?:tool_call|function_call)\]\s*(\w+)\s*:\s*({.*?})',
re.DOTALL | re.IGNORECASE,
)
def __init__(self):
self._attempts: List[ToolCallAttempt] = []
self._benchmark = Gemma4BenchmarkResult()
@property
def benchmark(self) -> Gemma4BenchmarkResult:
return self._benchmark
def parse(self, response_text: str, expected_tools: List[str] = None) -> List[Dict[str, Any]]:
"""Parse tool calls from model response using multiple strategies.
Returns list of tool call dicts in OpenAI format:
[{"id": "...", "type": "function", "function": {"name": "...", "arguments": "..."}}]
"""
t0 = time.monotonic()
self._benchmark.total_calls += 1
# Strategy 1: Native OpenAI format
result = self._try_native_parse(response_text)
if result:
self._record_attempt(response_text, True, result, "native")
self._benchmark.successful_parses += 1
if len(result) > 1:
self._benchmark.parallel_calls += 1
self._benchmark.strategies_used["native"] = self._benchmark.strategies_used.get("native", 0) + 1
self._update_timing(t0)
return result
# Strategy 2: JSON code blocks
result = self._try_json_block_parse(response_text, expected_tools)
if result:
self._record_attempt(response_text, True, result, "json_block")
self._benchmark.successful_parses += 1
if len(result) > 1:
self._benchmark.parallel_calls += 1
self._benchmark.strategies_used["json_block"] = self._benchmark.strategies_used.get("json_block", 0) + 1
self._update_timing(t0)
return result
# Strategy 3: Regex extraction
result = self._try_regex_parse(response_text)
if result:
self._record_attempt(response_text, True, result, "regex")
self._benchmark.successful_parses += 1
self._benchmark.strategies_used["regex"] = self._benchmark.strategies_used.get("regex", 0) + 1
self._update_timing(t0)
return result
# Strategy 4: Heuristic fallback
result = self._try_heuristic_parse(response_text, expected_tools)
if result:
self._record_attempt(response_text, True, result, "fallback")
self._benchmark.successful_parses += 1
self._benchmark.strategies_used["fallback"] = self._benchmark.strategies_used.get("fallback", 0) + 1
self._update_timing(t0)
return result
# All strategies failed
self._record_attempt(response_text, False, [], "none")
self._benchmark.errors.append(f"Failed to parse: {response_text[:200]}")
self._update_timing(t0)
return []
def _try_native_parse(self, text: str) -> List[Dict[str, Any]]:
"""Try parsing standard OpenAI tool_calls JSON."""
try:
data = json.loads(text)
if isinstance(data, dict) and "tool_calls" in data:
return data["tool_calls"]
if isinstance(data, list):
if all(isinstance(item, dict) and "function" in item for item in data):
return data
except json.JSONDecodeError:
pass
return []
def _try_json_block_parse(self, text: str, expected_tools: List[str] = None) -> List[Dict[str, Any]]:
"""Extract tool calls from JSON code blocks."""
matches = self._JSON_BLOCK_PATTERN.findall(text)
calls = []
for match in matches:
try:
data = json.loads(match.strip())
if isinstance(data, dict):
if "name" in data and "arguments" in data:
calls.append(self._to_openai_format(data["name"], data["arguments"]))
elif "function" in data and "arguments" in data:
calls.append(self._to_openai_format(data["function"], data["arguments"]))
elif isinstance(data, list):
for item in data:
if isinstance(item, dict) and "name" in item:
args = item.get("arguments", item.get("args", {}))
calls.append(self._to_openai_format(item["name"], args))
except json.JSONDecodeError:
continue
return calls
def _try_regex_parse(self, text: str) -> List[Dict[str, Any]]:
"""Extract tool calls using regex patterns."""
calls = []
# Pattern: function_name({...})
for match in self._FUNCTION_CALL_PATTERN.finditer(text):
name = match.group(1)
args_str = match.group(2)
try:
args = json.loads(args_str)
calls.append(self._to_openai_format(name, args))
except json.JSONDecodeError:
continue
# Pattern: [tool_call] name: {...}
for match in self._GEMMA_INLINE_PATTERN.finditer(text):
name = match.group(1)
args_str = match.group(2)
try:
args = json.loads(args_str)
calls.append(self._to_openai_format(name, args))
except json.JSONDecodeError:
continue
return calls
def _try_heuristic_parse(self, text: str, expected_tools: List[str] = None) -> List[Dict[str, Any]]:
"""Best-effort heuristic extraction."""
if not expected_tools:
return []
calls = []
for tool_name in expected_tools:
# Look for tool name near JSON-like content
pattern = re.compile(
rf'{re.escape(tool_name)}\s*[\(:]\s*({{[^}}]+}})',
re.IGNORECASE,
)
match = pattern.search(text)
if match:
try:
args = json.loads(match.group(1))
calls.append(self._to_openai_format(tool_name, args))
except json.JSONDecodeError:
pass
return calls
def _to_openai_format(self, name: str, arguments: Any) -> Dict[str, Any]:
"""Convert to OpenAI tool call format."""
import uuid
args_str = json.dumps(arguments) if isinstance(arguments, dict) else str(arguments)
return {
"id": f"call_{uuid.uuid4().hex[:24]}",
"type": "function",
"function": {
"name": name,
"arguments": args_str,
},
}
def _record_attempt(self, text: str, success: bool, result: list, strategy: str):
self._attempts.append(ToolCallAttempt(
raw_text=text[:500],
parsed=success,
tool_name=result[0]["function"]["name"] if result else "",
arguments={},
error="" if success else "parse failed",
strategy=strategy,
timestamp=time.time(),
))
def _update_timing(self, t0: float):
elapsed = (time.monotonic() - t0) * 1000
n = self._benchmark.total_calls
self._benchmark.avg_parse_time_ms = (
(self._benchmark.avg_parse_time_ms * (n - 1) + elapsed) / n
)
self._benchmark.success_rate = (
self._benchmark.successful_parses / n if n > 0 else 0
)
def format_report(self) -> str:
"""Format benchmark report."""
b = self._benchmark
lines = [
"Gemma 4 Tool Calling Benchmark",
"=" * 40,
f"Total attempts: {b.total_calls}",
f"Successful parses: {b.successful_parses}",
f"Success rate: {b.success_rate:.1%}",
f"Parallel calls: {b.parallel_calls}",
f"Avg parse time: {b.avg_parse_time_ms:.2f}ms",
"",
"Strategies used:",
]
for strategy, count in sorted(b.strategies_used.items(), key=lambda x: -x[1]):
lines.append(f" {strategy}: {count}")
if b.errors:
lines.append("")
lines.append(f"Errors ({len(b.errors)}):")
for err in b.errors[:5]:
lines.append(f" {err[:100]}")
return "\n".join(lines)

265
scripts/mcp_server.py Executable file
View File

@@ -0,0 +1,265 @@
#!/usr/bin/env python3
"""Hermes MCP Server — expose hermes-agent tools to fleet peers.
Runs as a standalone MCP server that other agents can connect to
and invoke hermes tools remotely.
Safe tools exposed:
- terminal (safe commands only)
- file_read, file_search
- web_search, web_extract
- session_search
NOT exposed (internal tools):
- approval, delegate, memory, config
Usage:
python -m tools.mcp_server --port 8081
hermes mcp-server --port 8081
python scripts/mcp_server.py --port 8081 --auth-key SECRET
"""
from __future__ import annotations
import argparse
import asyncio
import json
import logging
import os
import sys
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
# Tools safe to expose to other agents
SAFE_TOOLS = {
"terminal": {
"name": "terminal",
"description": "Execute safe shell commands. Dangerous commands are blocked.",
"parameters": {
"type": "object",
"properties": {
"command": {"type": "string", "description": "Shell command to execute"},
},
"required": ["command"],
},
},
"file_read": {
"name": "file_read",
"description": "Read the contents of a file.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path to read"},
"offset": {"type": "integer", "description": "Start line", "default": 1},
"limit": {"type": "integer", "description": "Max lines", "default": 200},
},
"required": ["path"],
},
},
"file_search": {
"name": "file_search",
"description": "Search file contents using regex.",
"parameters": {
"type": "object",
"properties": {
"pattern": {"type": "string", "description": "Regex pattern"},
"path": {"type": "string", "description": "Directory to search", "default": "."},
},
"required": ["pattern"],
},
},
"web_search": {
"name": "web_search",
"description": "Search the web for information.",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
},
"required": ["query"],
},
},
"session_search": {
"name": "session_search",
"description": "Search past conversation sessions.",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"limit": {"type": "integer", "description": "Max results", "default": 3},
},
"required": ["query"],
},
},
}
# Tools explicitly blocked
BLOCKED_TOOLS = {
"approval", "delegate", "memory", "config", "skill_install",
"mcp_tool", "cronjob", "tts", "send_message",
}
class MCPServer:
"""Simple MCP-compatible server for exposing hermes tools."""
def __init__(self, host: str = "127.0.0.1", port: int = 8081,
auth_key: Optional[str] = None):
self._host = host
self._port = port
self._auth_key = auth_key or os.getenv("MCP_AUTH_KEY", "")
async def handle_tools_list(self, request: dict) -> dict:
"""Return available tools."""
tools = list(SAFE_TOOLS.values())
return {"tools": tools}
async def handle_tools_call(self, request: dict) -> dict:
"""Execute a tool call."""
tool_name = request.get("name", "")
arguments = request.get("arguments", {})
if tool_name in BLOCKED_TOOLS:
return {"error": f"Tool '{tool_name}' is not exposed via MCP"}
if tool_name not in SAFE_TOOLS:
return {"error": f"Unknown tool: {tool_name}"}
try:
result = await self._execute_tool(tool_name, arguments)
return {"content": [{"type": "text", "text": str(result)}]}
except Exception as e:
return {"error": str(e)}
async def _execute_tool(self, tool_name: str, arguments: dict) -> str:
"""Execute a tool and return result."""
if tool_name == "terminal":
import subprocess
cmd = arguments.get("command", "")
# Block dangerous commands
from tools.approval import detect_dangerous_command
is_dangerous, _, desc = detect_dangerous_command(cmd)
if is_dangerous:
return f"BLOCKED: Dangerous command detected ({desc}). This tool only executes safe commands."
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30)
return result.stdout or result.stderr or "(no output)"
elif tool_name == "file_read":
path = arguments.get("path", "")
offset = arguments.get("offset", 1)
limit = arguments.get("limit", 200)
with open(path) as f:
lines = f.readlines()
return "".join(lines[offset-1:offset-1+limit])
elif tool_name == "file_search":
import re
pattern = arguments.get("pattern", "")
path = arguments.get("path", ".")
results = []
for p in Path(path).rglob("*.py"):
try:
content = p.read_text()
for i, line in enumerate(content.split("\n"), 1):
if re.search(pattern, line, re.IGNORECASE):
results.append(f"{p}:{i}: {line.strip()}")
if len(results) >= 20:
break
except Exception:
continue
if len(results) >= 20:
break
return "\n".join(results) or "No matches found"
elif tool_name == "web_search":
try:
from tools.web_tools import web_search
return web_search(arguments.get("query", ""))
except ImportError:
return "Web search not available"
elif tool_name == "session_search":
try:
from tools.session_search_tool import session_search
return session_search(
query=arguments.get("query", ""),
limit=arguments.get("limit", 3),
)
except ImportError:
return "Session search not available"
return f"Tool {tool_name} not implemented"
async def start_http(self):
"""Start HTTP server for MCP endpoints."""
try:
from aiohttp import web
except ImportError:
logger.error("aiohttp required: pip install aiohttp")
return
app = web.Application()
async def handle_tools_list_route(request):
if self._auth_key:
auth = request.headers.get("Authorization", "")
if auth != f"Bearer {self._auth_key}":
return web.json_response({"error": "Unauthorized"}, status=401)
result = await self.handle_tools_list({})
return web.json_response(result)
async def handle_tools_call_route(request):
if self._auth_key:
auth = request.headers.get("Authorization", "")
if auth != f"Bearer {self._auth_key}":
return web.json_response({"error": "Unauthorized"}, status=401)
body = await request.json()
result = await self.handle_tools_call(body)
return web.json_response(result)
async def handle_health(request):
return web.json_response({"status": "ok", "tools": len(SAFE_TOOLS)})
app.router.add_get("/mcp/tools", handle_tools_list_route)
app.router.add_post("/mcp/tools/call", handle_tools_call_route)
app.router.add_get("/health", handle_health)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, self._host, self._port)
await site.start()
logger.info("MCP server on http://%s:%s", self._host, self._port)
logger.info("Tools: %s", ", ".join(SAFE_TOOLS.keys()))
if self._auth_key:
logger.info("Auth: Bearer token required")
else:
logger.warning("Auth: No MCP_AUTH_KEY set — server is open")
try:
await asyncio.Event().wait()
except asyncio.CancelledError:
pass
finally:
await runner.cleanup()
def main():
parser = argparse.ArgumentParser(description="Hermes MCP Server")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=8081)
parser.add_argument("--auth-key", default=None, help="Bearer token for auth")
args = parser.parse_args()
logging.basicConfig(level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s")
server = MCPServer(host=args.host, port=args.port, auth_key=args.auth_key)
print(f"Starting MCP server on http://{args.host}:{args.port}")
print(f"Exposed tools: {', '.join(SAFE_TOOLS.keys())}")
asyncio.run(server.start_http())
if __name__ == "__main__":
main()

View File

@@ -1,94 +0,0 @@
"""Tests for Gemma 4 tool calling hardening."""
import json
import pytest
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from agent.gemma4_tool_hardening import Gemma4ToolParser, Gemma4BenchmarkResult
class TestNativeParse:
def test_standard_tool_calls(self):
parser = Gemma4ToolParser()
text = json.dumps({"tool_calls": [{"id": "call_1", "type": "function", "function": {"name": "read_file", "arguments": '{"path": "test.py"}'}}]})
result = parser.parse(text)
assert len(result) == 1
assert result[0]["function"]["name"] == "read_file"
def test_list_format(self):
parser = Gemma4ToolParser()
text = json.dumps([{"id": "c1", "type": "function", "function": {"name": "terminal", "arguments": '{"command": "ls"}'}}])
result = parser.parse(text)
assert len(result) == 1
class TestJsonBlockParse:
def test_json_code_block(self):
parser = Gemma4ToolParser()
text = 'Here is the tool call:\n```json\n{"name": "read_file", "arguments": {"path": "test.py"}}\n```'
result = parser.parse(text)
assert len(result) == 1
assert result[0]["function"]["name"] == "read_file"
def test_multiple_json_blocks(self):
parser = Gemma4ToolParser()
text = '```json\n{"name": "read_file", "arguments": {"path": "a.py"}}\n```\n```json\n{"name": "read_file", "arguments": {"path": "b.py"}}\n```'
result = parser.parse(text)
assert len(result) == 2
def test_list_in_json_block(self):
parser = Gemma4ToolParser()
text = '```json\n[{"name": "terminal", "arguments": {"command": "ls"}}]\n```'
result = parser.parse(text)
assert len(result) == 1
class TestRegexParse:
def test_function_call_pattern(self):
parser = Gemma4ToolParser()
text = 'I will call read_file({"path": "test.py"}) now.'
result = parser.parse(text)
assert len(result) == 1
assert result[0]["function"]["name"] == "read_file"
def test_gemma_inline_pattern(self):
parser = Gemma4ToolParser()
text = '[tool_call] terminal: {"command": "pwd"}'
result = parser.parse(text)
assert len(result) == 1
class TestHeuristicParse:
def test_heuristic_with_expected_tools(self):
parser = Gemma4ToolParser()
text = 'Calling read_file({"path": "config.yaml"}) now'
result = parser.parse(text, expected_tools=["read_file"])
assert len(result) == 1
def test_heuristic_without_expected_tools(self):
parser = Gemma4ToolParser()
text = 'Some text with {"key": "value"} but no tool name'
result = parser.parse(text)
assert len(result) == 0
class TestBenchmark:
def test_benchmark_counts(self):
parser = Gemma4ToolParser()
parser.parse(json.dumps({"tool_calls": [{"id": "1", "type": "function", "function": {"name": "x", "arguments": "{}"}}]}))
parser.parse('```json\n{"name": "y", "arguments": {}}\n```')
parser.parse('no tool call here')
b = parser.benchmark
assert b.total_calls == 3
assert b.successful_parses == 2
assert abs(b.success_rate - 2/3) < 0.01
def test_report_format(self):
parser = Gemma4ToolParser()
parser.parse(json.dumps({"tool_calls": [{"id": "1", "type": "function", "function": {"name": "x", "arguments": "{}"}}]}))
report = parser.format_report()
assert "Gemma 4 Tool Calling Benchmark" in report
assert "native" in report