Compare commits
2 Commits
fix/803
...
feat/822-a
| Author | SHA1 | Date | |
|---|---|---|---|
| 4c8d63a5c9 | |||
| 6bc10419b1 |
257
hermes_cli/a2a_health.py
Normal file
257
hermes_cli/a2a_health.py
Normal file
@@ -0,0 +1,257 @@
|
||||
"""
|
||||
A2A Health Monitor — Fleet Agent Heartbeat (#822)
|
||||
|
||||
Pings each fleet agent's A2A endpoint and tracks health status.
|
||||
Persists state to ~/.hermes/a2a_health.json.
|
||||
|
||||
Usage:
|
||||
from hermes_cli.a2a_health import check_fleet_health, check_agent_health
|
||||
|
||||
report = check_fleet_health()
|
||||
for agent in report["agents"]:
|
||||
print(f"{agent['name']}: {agent['status']} ({agent['response_ms']}ms)")
|
||||
"""
|
||||
|
||||
import json
|
||||
import time
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
HERMES_HOME = Path.home() / ".hermes"
|
||||
FLEET_CONFIG = HERMES_HOME / "fleet_agents.json"
|
||||
HEALTH_STATE = HERMES_HOME / "a2a_health.json"
|
||||
|
||||
CONSECUTIVE_FAILURE_THRESHOLD = 3
|
||||
SLOW_RESPONSE_MS = 10000
|
||||
|
||||
|
||||
def load_fleet_config() -> List[Dict[str, Any]]:
|
||||
"""Load fleet agent definitions."""
|
||||
if not FLEET_CONFIG.exists():
|
||||
return []
|
||||
try:
|
||||
with open(FLEET_CONFIG) as f:
|
||||
data = json.load(f)
|
||||
return data.get("agents", [])
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
def load_health_state() -> Dict[str, Any]:
|
||||
"""Load persisted health state."""
|
||||
if not HEALTH_STATE.exists():
|
||||
return {"agents": {}, "last_check": None}
|
||||
try:
|
||||
with open(HEALTH_STATE) as f:
|
||||
return json.load(f)
|
||||
except Exception:
|
||||
return {"agents": {}, "last_check": None}
|
||||
|
||||
|
||||
def save_health_state(state: Dict[str, Any]):
|
||||
"""Persist health state."""
|
||||
HEALTH_STATE.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(HEALTH_STATE, "w") as f:
|
||||
json.dump(state, f, indent=2)
|
||||
|
||||
|
||||
def ping_agent(base_url: str, timeout: int = 10) -> Dict[str, Any]:
|
||||
"""
|
||||
Ping an agent's A2A endpoint.
|
||||
|
||||
Tries /health first, falls back to /.well-known/agent-card.json.
|
||||
"""
|
||||
start = time.monotonic()
|
||||
endpoints = ["/health", "/.well-known/agent-card.json"]
|
||||
|
||||
for endpoint in endpoints:
|
||||
url = f"{base_url.rstrip('/')}{endpoint}"
|
||||
try:
|
||||
req = urllib.request.Request(url, method="GET")
|
||||
req.add_header("User-Agent", "hermes-a2a-health/1.0")
|
||||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||||
elapsed = (time.monotonic() - start) * 1000
|
||||
body = resp.read(1024).decode("utf-8", errors="replace")
|
||||
|
||||
result = {
|
||||
"alive": True,
|
||||
"status_code": resp.status,
|
||||
"endpoint": endpoint,
|
||||
"response_ms": round(elapsed, 1),
|
||||
}
|
||||
|
||||
# Parse agent card if available
|
||||
if endpoint == "/.well-known/agent-card.json":
|
||||
try:
|
||||
card = json.loads(body)
|
||||
result["agent_card"] = {
|
||||
"name": card.get("name", "unknown"),
|
||||
"tools_count": len(card.get("skills", [])),
|
||||
}
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return result
|
||||
except urllib.error.URLError:
|
||||
continue
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
elapsed = (time.monotonic() - start) * 1000
|
||||
return {
|
||||
"alive": False,
|
||||
"error": "All endpoints unreachable",
|
||||
"response_ms": round(elapsed, 1),
|
||||
}
|
||||
|
||||
|
||||
def check_agent_health(agent: Dict[str, Any], prev_state: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Check health of a single agent."""
|
||||
name = agent.get("name", "unknown")
|
||||
base_url = ""
|
||||
|
||||
# Get URL from agent config
|
||||
interfaces = agent.get("supportedInterfaces", [])
|
||||
if interfaces:
|
||||
base_url = interfaces[0].get("url", "")
|
||||
if not base_url:
|
||||
base_url = agent.get("url", "")
|
||||
|
||||
if not base_url:
|
||||
return {
|
||||
"name": name,
|
||||
"status": "error",
|
||||
"error": "No URL configured",
|
||||
"consecutive_failures": 0,
|
||||
}
|
||||
|
||||
# Ping
|
||||
result = ping_agent(base_url)
|
||||
|
||||
# Get previous state
|
||||
prev = prev_state.get("agents", {}).get(name, {})
|
||||
prev_failures = prev.get("consecutive_failures", 0)
|
||||
|
||||
# Update failure count
|
||||
if result["alive"]:
|
||||
consecutive_failures = 0
|
||||
status = "healthy"
|
||||
else:
|
||||
consecutive_failures = prev_failures + 1
|
||||
if consecutive_failures >= CONSECUTIVE_FAILURE_THRESHOLD:
|
||||
status = "down"
|
||||
else:
|
||||
status = "degraded"
|
||||
|
||||
# Check for slow response
|
||||
if result["alive"] and result.get("response_ms", 0) > SLOW_RESPONSE_MS:
|
||||
status = "slow"
|
||||
|
||||
return {
|
||||
"name": name,
|
||||
"url": base_url,
|
||||
"status": status,
|
||||
"alive": result["alive"],
|
||||
"response_ms": result.get("response_ms"),
|
||||
"endpoint": result.get("endpoint"),
|
||||
"status_code": result.get("status_code"),
|
||||
"agent_card": result.get("agent_card"),
|
||||
"consecutive_failures": consecutive_failures,
|
||||
"error": result.get("error"),
|
||||
"checked_at": time.strftime("%Y-%m-%dT%H:%M:%S"),
|
||||
}
|
||||
|
||||
|
||||
def check_fleet_health(
|
||||
agent_name: Optional[str] = None,
|
||||
timeout: int = 10,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Check health of all (or one) fleet agent.
|
||||
|
||||
Returns report dict with agents list and summary.
|
||||
"""
|
||||
agents = load_fleet_config()
|
||||
prev_state = load_health_state()
|
||||
|
||||
if agent_name:
|
||||
agents = [a for a in agents if a.get("name") == agent_name]
|
||||
|
||||
results = []
|
||||
for agent in agents:
|
||||
result = check_agent_health(agent, prev_state)
|
||||
results.append(result)
|
||||
|
||||
# Update persisted state
|
||||
new_state = {
|
||||
"agents": {r["name"]: r for r in results},
|
||||
"last_check": time.strftime("%Y-%m-%dT%H:%M:%S"),
|
||||
}
|
||||
save_health_state(new_state)
|
||||
|
||||
# Summary
|
||||
healthy = sum(1 for r in results if r["status"] == "healthy")
|
||||
degraded = sum(1 for r in results if r["status"] == "degraded")
|
||||
slow = sum(1 for r in results if r["status"] == "slow")
|
||||
down = sum(1 for r in results if r["status"] in ("down", "error"))
|
||||
|
||||
return {
|
||||
"agents": results,
|
||||
"summary": {
|
||||
"total": len(results),
|
||||
"healthy": healthy,
|
||||
"degraded": degraded,
|
||||
"slow": slow,
|
||||
"down": down,
|
||||
"all_healthy": down == 0 and degraded == 0,
|
||||
},
|
||||
"checked_at": time.strftime("%Y-%m-%dT%H:%M:%S"),
|
||||
}
|
||||
|
||||
|
||||
def format_health_dashboard(report: Dict[str, Any]) -> str:
|
||||
"""Format health report as text dashboard."""
|
||||
lines = []
|
||||
summary = report["summary"]
|
||||
|
||||
# Header
|
||||
if summary["all_healthy"]:
|
||||
lines.append("\u2705 All fleet agents healthy")
|
||||
elif summary["down"] > 0:
|
||||
lines.append(f"\u274c {summary['down']} agent(s) DOWN")
|
||||
else:
|
||||
lines.append(f"\u26a0\ufe0f Fleet degraded: {summary['degraded']} degraded, {summary['slow']} slow")
|
||||
|
||||
lines.append(f"Checked: {report['checked_at']}")
|
||||
lines.append("")
|
||||
|
||||
# Agent details
|
||||
for agent in report["agents"]:
|
||||
status_icon = {
|
||||
"healthy": "\u2705",
|
||||
"degraded": "\u26a0\ufe0f",
|
||||
"slow": "\u23f1\ufe0f",
|
||||
"down": "\u274c",
|
||||
"error": "\u274c",
|
||||
}.get(agent["status"], "\u2753")
|
||||
|
||||
name = agent["name"]
|
||||
ms = agent.get("response_ms", "?")
|
||||
failures = agent.get("consecutive_failures", 0)
|
||||
|
||||
line = f" {status_icon} {name}"
|
||||
if agent.get("alive"):
|
||||
line += f" — {ms}ms"
|
||||
if agent.get("agent_card"):
|
||||
tools = agent["agent_card"].get("tools_count", 0)
|
||||
line += f" — {tools} tools"
|
||||
else:
|
||||
line += f" — {agent.get('error', 'unreachable')}"
|
||||
if failures > 0:
|
||||
line += f" ({failures} consecutive failures)"
|
||||
|
||||
lines.append(line)
|
||||
|
||||
return "\n".join(lines)
|
||||
@@ -1,265 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Hermes MCP Server — expose hermes-agent tools to fleet peers.
|
||||
|
||||
Runs as a standalone MCP server that other agents can connect to
|
||||
and invoke hermes tools remotely.
|
||||
|
||||
Safe tools exposed:
|
||||
- terminal (safe commands only)
|
||||
- file_read, file_search
|
||||
- web_search, web_extract
|
||||
- session_search
|
||||
|
||||
NOT exposed (internal tools):
|
||||
- approval, delegate, memory, config
|
||||
|
||||
Usage:
|
||||
python -m tools.mcp_server --port 8081
|
||||
hermes mcp-server --port 8081
|
||||
python scripts/mcp_server.py --port 8081 --auth-key SECRET
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Tools safe to expose to other agents
|
||||
SAFE_TOOLS = {
|
||||
"terminal": {
|
||||
"name": "terminal",
|
||||
"description": "Execute safe shell commands. Dangerous commands are blocked.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"command": {"type": "string", "description": "Shell command to execute"},
|
||||
},
|
||||
"required": ["command"],
|
||||
},
|
||||
},
|
||||
"file_read": {
|
||||
"name": "file_read",
|
||||
"description": "Read the contents of a file.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"path": {"type": "string", "description": "File path to read"},
|
||||
"offset": {"type": "integer", "description": "Start line", "default": 1},
|
||||
"limit": {"type": "integer", "description": "Max lines", "default": 200},
|
||||
},
|
||||
"required": ["path"],
|
||||
},
|
||||
},
|
||||
"file_search": {
|
||||
"name": "file_search",
|
||||
"description": "Search file contents using regex.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"pattern": {"type": "string", "description": "Regex pattern"},
|
||||
"path": {"type": "string", "description": "Directory to search", "default": "."},
|
||||
},
|
||||
"required": ["pattern"],
|
||||
},
|
||||
},
|
||||
"web_search": {
|
||||
"name": "web_search",
|
||||
"description": "Search the web for information.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"query": {"type": "string", "description": "Search query"},
|
||||
},
|
||||
"required": ["query"],
|
||||
},
|
||||
},
|
||||
"session_search": {
|
||||
"name": "session_search",
|
||||
"description": "Search past conversation sessions.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"query": {"type": "string", "description": "Search query"},
|
||||
"limit": {"type": "integer", "description": "Max results", "default": 3},
|
||||
},
|
||||
"required": ["query"],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
# Tools explicitly blocked
|
||||
BLOCKED_TOOLS = {
|
||||
"approval", "delegate", "memory", "config", "skill_install",
|
||||
"mcp_tool", "cronjob", "tts", "send_message",
|
||||
}
|
||||
|
||||
|
||||
class MCPServer:
|
||||
"""Simple MCP-compatible server for exposing hermes tools."""
|
||||
|
||||
def __init__(self, host: str = "127.0.0.1", port: int = 8081,
|
||||
auth_key: Optional[str] = None):
|
||||
self._host = host
|
||||
self._port = port
|
||||
self._auth_key = auth_key or os.getenv("MCP_AUTH_KEY", "")
|
||||
|
||||
async def handle_tools_list(self, request: dict) -> dict:
|
||||
"""Return available tools."""
|
||||
tools = list(SAFE_TOOLS.values())
|
||||
return {"tools": tools}
|
||||
|
||||
async def handle_tools_call(self, request: dict) -> dict:
|
||||
"""Execute a tool call."""
|
||||
tool_name = request.get("name", "")
|
||||
arguments = request.get("arguments", {})
|
||||
|
||||
if tool_name in BLOCKED_TOOLS:
|
||||
return {"error": f"Tool '{tool_name}' is not exposed via MCP"}
|
||||
if tool_name not in SAFE_TOOLS:
|
||||
return {"error": f"Unknown tool: {tool_name}"}
|
||||
|
||||
try:
|
||||
result = await self._execute_tool(tool_name, arguments)
|
||||
return {"content": [{"type": "text", "text": str(result)}]}
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
async def _execute_tool(self, tool_name: str, arguments: dict) -> str:
|
||||
"""Execute a tool and return result."""
|
||||
if tool_name == "terminal":
|
||||
import subprocess
|
||||
cmd = arguments.get("command", "")
|
||||
# Block dangerous commands
|
||||
from tools.approval import detect_dangerous_command
|
||||
is_dangerous, _, desc = detect_dangerous_command(cmd)
|
||||
if is_dangerous:
|
||||
return f"BLOCKED: Dangerous command detected ({desc}). This tool only executes safe commands."
|
||||
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30)
|
||||
return result.stdout or result.stderr or "(no output)"
|
||||
|
||||
elif tool_name == "file_read":
|
||||
path = arguments.get("path", "")
|
||||
offset = arguments.get("offset", 1)
|
||||
limit = arguments.get("limit", 200)
|
||||
with open(path) as f:
|
||||
lines = f.readlines()
|
||||
return "".join(lines[offset-1:offset-1+limit])
|
||||
|
||||
elif tool_name == "file_search":
|
||||
import re
|
||||
pattern = arguments.get("pattern", "")
|
||||
path = arguments.get("path", ".")
|
||||
results = []
|
||||
for p in Path(path).rglob("*.py"):
|
||||
try:
|
||||
content = p.read_text()
|
||||
for i, line in enumerate(content.split("\n"), 1):
|
||||
if re.search(pattern, line, re.IGNORECASE):
|
||||
results.append(f"{p}:{i}: {line.strip()}")
|
||||
if len(results) >= 20:
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
if len(results) >= 20:
|
||||
break
|
||||
return "\n".join(results) or "No matches found"
|
||||
|
||||
elif tool_name == "web_search":
|
||||
try:
|
||||
from tools.web_tools import web_search
|
||||
return web_search(arguments.get("query", ""))
|
||||
except ImportError:
|
||||
return "Web search not available"
|
||||
|
||||
elif tool_name == "session_search":
|
||||
try:
|
||||
from tools.session_search_tool import session_search
|
||||
return session_search(
|
||||
query=arguments.get("query", ""),
|
||||
limit=arguments.get("limit", 3),
|
||||
)
|
||||
except ImportError:
|
||||
return "Session search not available"
|
||||
|
||||
return f"Tool {tool_name} not implemented"
|
||||
|
||||
async def start_http(self):
|
||||
"""Start HTTP server for MCP endpoints."""
|
||||
try:
|
||||
from aiohttp import web
|
||||
except ImportError:
|
||||
logger.error("aiohttp required: pip install aiohttp")
|
||||
return
|
||||
|
||||
app = web.Application()
|
||||
|
||||
async def handle_tools_list_route(request):
|
||||
if self._auth_key:
|
||||
auth = request.headers.get("Authorization", "")
|
||||
if auth != f"Bearer {self._auth_key}":
|
||||
return web.json_response({"error": "Unauthorized"}, status=401)
|
||||
result = await self.handle_tools_list({})
|
||||
return web.json_response(result)
|
||||
|
||||
async def handle_tools_call_route(request):
|
||||
if self._auth_key:
|
||||
auth = request.headers.get("Authorization", "")
|
||||
if auth != f"Bearer {self._auth_key}":
|
||||
return web.json_response({"error": "Unauthorized"}, status=401)
|
||||
body = await request.json()
|
||||
result = await self.handle_tools_call(body)
|
||||
return web.json_response(result)
|
||||
|
||||
async def handle_health(request):
|
||||
return web.json_response({"status": "ok", "tools": len(SAFE_TOOLS)})
|
||||
|
||||
app.router.add_get("/mcp/tools", handle_tools_list_route)
|
||||
app.router.add_post("/mcp/tools/call", handle_tools_call_route)
|
||||
app.router.add_get("/health", handle_health)
|
||||
|
||||
runner = web.AppRunner(app)
|
||||
await runner.setup()
|
||||
site = web.TCPSite(runner, self._host, self._port)
|
||||
await site.start()
|
||||
logger.info("MCP server on http://%s:%s", self._host, self._port)
|
||||
logger.info("Tools: %s", ", ".join(SAFE_TOOLS.keys()))
|
||||
if self._auth_key:
|
||||
logger.info("Auth: Bearer token required")
|
||||
else:
|
||||
logger.warning("Auth: No MCP_AUTH_KEY set — server is open")
|
||||
|
||||
try:
|
||||
await asyncio.Event().wait()
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
finally:
|
||||
await runner.cleanup()
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Hermes MCP Server")
|
||||
parser.add_argument("--host", default="127.0.0.1")
|
||||
parser.add_argument("--port", type=int, default=8081)
|
||||
parser.add_argument("--auth-key", default=None, help="Bearer token for auth")
|
||||
args = parser.parse_args()
|
||||
|
||||
logging.basicConfig(level=logging.INFO,
|
||||
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s")
|
||||
|
||||
server = MCPServer(host=args.host, port=args.port, auth_key=args.auth_key)
|
||||
print(f"Starting MCP server on http://{args.host}:{args.port}")
|
||||
print(f"Exposed tools: {', '.join(SAFE_TOOLS.keys())}")
|
||||
asyncio.run(server.start_http())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
80
tests/test_a2a_health.py
Normal file
80
tests/test_a2a_health.py
Normal file
@@ -0,0 +1,80 @@
|
||||
"""Tests for A2A health monitor (#822)."""
|
||||
|
||||
import sys
|
||||
import json
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from hermes_cli.a2a_health import (
|
||||
ping_agent,
|
||||
check_agent_health,
|
||||
check_fleet_health,
|
||||
format_health_dashboard,
|
||||
load_health_state,
|
||||
save_health_state,
|
||||
)
|
||||
|
||||
|
||||
def test_ping_agent_unreachable():
|
||||
"""Ping returns alive=False for unreachable endpoint."""
|
||||
result = ping_agent("http://192.0.2.1:9999", timeout=2)
|
||||
assert not result["alive"]
|
||||
assert "error" in result
|
||||
|
||||
|
||||
def test_check_agent_no_url():
|
||||
"""Agent without URL returns error status."""
|
||||
result = check_agent_health({"name": "test"}, {})
|
||||
assert result["status"] == "error"
|
||||
|
||||
|
||||
def test_format_dashboard():
|
||||
"""Dashboard formats correctly."""
|
||||
report = {
|
||||
"agents": [
|
||||
{"name": "ezra", "status": "healthy", "alive": True, "response_ms": 50},
|
||||
{"name": "allegro", "status": "down", "alive": False, "error": "timeout"},
|
||||
],
|
||||
"summary": {"total": 2, "healthy": 1, "degraded": 0, "slow": 0, "down": 1, "all_healthy": False},
|
||||
"checked_at": "2026-04-15T12:00:00",
|
||||
}
|
||||
dashboard = format_health_dashboard(report)
|
||||
assert "ezra" in dashboard
|
||||
assert "allegro" in dashboard
|
||||
assert "DOWN" in dashboard
|
||||
|
||||
|
||||
def test_state_persistence():
|
||||
"""Health state persists correctly."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
state_file = Path(tmpdir) / "health.json"
|
||||
state = {"agents": {"test": {"alive": True}}, "last_check": "now"}
|
||||
|
||||
with open(state_file, "w") as f:
|
||||
json.dump(state, f)
|
||||
|
||||
with open(state_file) as f:
|
||||
loaded = json.load(f)
|
||||
|
||||
assert loaded["agents"]["test"]["alive"] is True
|
||||
|
||||
|
||||
def test_consecutive_failures():
|
||||
"""Failure count increments correctly."""
|
||||
prev = {"agents": {"test": {"consecutive_failures": 2}}}
|
||||
agent = {"name": "test", "url": "http://192.0.2.1:9999"}
|
||||
result = check_agent_health(agent, prev)
|
||||
assert result["consecutive_failures"] == 3
|
||||
assert result["status"] == "down"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
tests = [test_ping_agent_unreachable, test_check_agent_no_url,
|
||||
test_format_dashboard, test_state_persistence, test_consecutive_failures]
|
||||
for t in tests:
|
||||
print(f"Running {t.__name__}...")
|
||||
t()
|
||||
print(" PASS")
|
||||
print("\nAll tests passed.")
|
||||
Reference in New Issue
Block a user