Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fa81831cd2 |
@@ -1,302 +0,0 @@
|
||||
"""Self-Modifying Prompt Engine — agent learns from its own failures.
|
||||
|
||||
Analyzes session transcripts, identifies failure patterns, and generates
|
||||
prompt patches to prevent future failures.
|
||||
|
||||
The loop: fail → analyze → rewrite → retry → verify improvement.
|
||||
|
||||
Usage:
|
||||
from agent.self_modify import PromptLearner
|
||||
learner = PromptLearner()
|
||||
patches = learner.analyze_session(session_id)
|
||||
learner.apply_patches(patches)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
HERMES_HOME = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
|
||||
PATCHES_DIR = HERMES_HOME / "prompt_patches"
|
||||
ROLLBACK_DIR = HERMES_HOME / "prompt_rollback"
|
||||
|
||||
|
||||
@dataclass
|
||||
class FailurePattern:
|
||||
"""A detected failure pattern in session transcripts."""
|
||||
pattern_type: str # retry_loop, timeout, error_hallucination, context_loss
|
||||
description: str
|
||||
frequency: int
|
||||
example_messages: List[str] = field(default_factory=list)
|
||||
suggested_fix: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class PromptPatch:
|
||||
"""A modification to the system prompt based on failure analysis."""
|
||||
id: str
|
||||
failure_type: str
|
||||
original_rule: str
|
||||
new_rule: str
|
||||
confidence: float
|
||||
applied_at: Optional[float] = None
|
||||
reverted: bool = False
|
||||
|
||||
|
||||
# Failure detection patterns
|
||||
FAILURE_SIGNALS = {
|
||||
"retry_loop": {
|
||||
"patterns": [
|
||||
r"(?i)retry(?:ing)?\s*(?:attempt|again)",
|
||||
r"(?i)failed.*retrying",
|
||||
r"(?i)error.*again",
|
||||
r"(?i)attempt\s+\d+\s*(?:of|/)\s*\d+",
|
||||
],
|
||||
"description": "Agent stuck in retry loop",
|
||||
},
|
||||
"timeout": {
|
||||
"patterns": [
|
||||
r"(?i)timed?\s*out",
|
||||
r"(?i)deadline\s+exceeded",
|
||||
r"(?i)took\s+(?:too\s+)?long",
|
||||
],
|
||||
"description": "Operation timed out",
|
||||
},
|
||||
"hallucination": {
|
||||
"patterns": [
|
||||
r"(?i)i\s+(?:don't|do\s+not)\s+(?:have|see|find)\s+(?:any|that|this)\s+(?:information|data|file)",
|
||||
r"(?i)the\s+file\s+doesn't\s+exist",
|
||||
r"(?i)i\s+(?:made|invented|fabricated)\s+(?:that\s+up|this)",
|
||||
],
|
||||
"description": "Agent hallucinated or fabricated information",
|
||||
},
|
||||
"context_loss": {
|
||||
"patterns": [
|
||||
r"(?i)i\s+(?:don't|do\s+not)\s+(?:remember|recall|know)\s+(?:what|where|when|how)",
|
||||
r"(?i)could\s+you\s+remind\s+me",
|
||||
r"(?i)what\s+were\s+we\s+(?:doing|working|talking)\s+(?:on|about)",
|
||||
],
|
||||
"description": "Agent lost context from earlier in conversation",
|
||||
},
|
||||
"tool_failure": {
|
||||
"patterns": [
|
||||
r"(?i)tool\s+(?:call|execution)\s+failed",
|
||||
r"(?i)command\s+not\s+found",
|
||||
r"(?i)permission\s+denied",
|
||||
r"(?i)no\s+such\s+file",
|
||||
],
|
||||
"description": "Tool execution failed",
|
||||
},
|
||||
}
|
||||
|
||||
# Prompt improvement templates
|
||||
PROMPT_FIXES = {
|
||||
"retry_loop": (
|
||||
"If an operation fails more than twice, stop retrying. "
|
||||
"Report the failure and ask the user for guidance. "
|
||||
"Do not enter retry loops — they waste tokens."
|
||||
),
|
||||
"timeout": (
|
||||
"For operations that may take long, set a timeout and report "
|
||||
"progress. If an operation takes more than 30 seconds, report "
|
||||
"what you've done so far and ask if you should continue."
|
||||
),
|
||||
"hallucination": (
|
||||
"If you cannot find information, say 'I don't know' or "
|
||||
"'I couldn't find that.' Never fabricate information. "
|
||||
"If a file doesn't exist, say so — don't guess its contents."
|
||||
),
|
||||
"context_loss": (
|
||||
"When you need context from earlier in the conversation, "
|
||||
"use session_search to find it. Don't ask the user to repeat themselves."
|
||||
),
|
||||
"tool_failure": (
|
||||
"If a tool fails, check the error message and try a different approach. "
|
||||
"Don't retry the exact same command — diagnose first."
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
class PromptLearner:
|
||||
"""Analyze session transcripts and generate prompt improvements."""
|
||||
|
||||
def __init__(self):
|
||||
PATCHES_DIR.mkdir(parents=True, exist_ok=True)
|
||||
ROLLBACK_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def analyze_session(self, session_data: dict) -> List[FailurePattern]:
|
||||
"""Analyze a session for failure patterns.
|
||||
|
||||
Args:
|
||||
session_data: Session dict with 'messages' list.
|
||||
|
||||
Returns:
|
||||
List of detected failure patterns.
|
||||
"""
|
||||
messages = session_data.get("messages", [])
|
||||
patterns_found: Dict[str, FailurePattern] = {}
|
||||
|
||||
for msg in messages:
|
||||
content = str(msg.get("content", ""))
|
||||
role = msg.get("role", "")
|
||||
|
||||
# Only analyze assistant messages and tool results
|
||||
if role not in ("assistant", "tool"):
|
||||
continue
|
||||
|
||||
for failure_type, config in FAILURE_SIGNALS.items():
|
||||
for pattern in config["patterns"]:
|
||||
if re.search(pattern, content):
|
||||
if failure_type not in patterns_found:
|
||||
patterns_found[failure_type] = FailurePattern(
|
||||
pattern_type=failure_type,
|
||||
description=config["description"],
|
||||
frequency=0,
|
||||
suggested_fix=PROMPT_FIXES.get(failure_type, ""),
|
||||
)
|
||||
patterns_found[failure_type].frequency += 1
|
||||
if len(patterns_found[failure_type].example_messages) < 3:
|
||||
patterns_found[failure_type].example_messages.append(
|
||||
content[:200]
|
||||
)
|
||||
break # One match per message per type is enough
|
||||
|
||||
return list(patterns_found.values())
|
||||
|
||||
def generate_patches(self, patterns: List[FailurePattern],
|
||||
min_confidence: float = 0.7) -> List[PromptPatch]:
|
||||
"""Generate prompt patches from failure patterns.
|
||||
|
||||
Args:
|
||||
patterns: Detected failure patterns.
|
||||
min_confidence: Minimum confidence to generate a patch.
|
||||
|
||||
Returns:
|
||||
List of prompt patches.
|
||||
"""
|
||||
patches = []
|
||||
for pattern in patterns:
|
||||
# Confidence based on frequency
|
||||
if pattern.frequency >= 3:
|
||||
confidence = 0.9
|
||||
elif pattern.frequency >= 2:
|
||||
confidence = 0.75
|
||||
else:
|
||||
confidence = 0.5
|
||||
|
||||
if confidence < min_confidence:
|
||||
continue
|
||||
|
||||
if not pattern.suggested_fix:
|
||||
continue
|
||||
|
||||
patch = PromptPatch(
|
||||
id=f"{pattern.pattern_type}-{int(time.time())}",
|
||||
failure_type=pattern.pattern_type,
|
||||
original_rule="(missing — no existing rule for this pattern)",
|
||||
new_rule=pattern.suggested_fix,
|
||||
confidence=confidence,
|
||||
)
|
||||
patches.append(patch)
|
||||
|
||||
return patches
|
||||
|
||||
def apply_patches(self, patches: List[PromptPatch],
|
||||
prompt_path: Optional[str] = None) -> int:
|
||||
"""Apply patches to the system prompt.
|
||||
|
||||
Args:
|
||||
patches: Patches to apply.
|
||||
prompt_path: Path to prompt file (default: ~/.hermes/system_prompt.md)
|
||||
|
||||
Returns:
|
||||
Number of patches applied.
|
||||
"""
|
||||
if prompt_path is None:
|
||||
prompt_path = str(HERMES_HOME / "system_prompt.md")
|
||||
|
||||
prompt_file = Path(prompt_path)
|
||||
|
||||
# Backup current prompt
|
||||
if prompt_file.exists():
|
||||
backup = ROLLBACK_DIR / f"{prompt_file.name}.{int(time.time())}.bak"
|
||||
backup.write_text(prompt_file.read_text())
|
||||
|
||||
# Read current prompt
|
||||
current = prompt_file.read_text() if prompt_file.exists() else ""
|
||||
|
||||
# Apply patches
|
||||
applied = 0
|
||||
additions = []
|
||||
for patch in patches:
|
||||
if patch.new_rule not in current:
|
||||
additions.append(f"\n## Auto-learned: {patch.failure_type}\n{patch.new_rule}")
|
||||
patch.applied_at = time.time()
|
||||
applied += 1
|
||||
|
||||
if additions:
|
||||
new_content = current + "\n".join(additions)
|
||||
prompt_file.write_text(new_content)
|
||||
|
||||
# Log patches
|
||||
patches_file = PATCHES_DIR / f"patches-{int(time.time())}.json"
|
||||
with open(patches_file, "w") as f:
|
||||
json.dump([p.__dict__ for p in patches], f, indent=2, default=str)
|
||||
|
||||
logger.info("Applied %d prompt patches", applied)
|
||||
return applied
|
||||
|
||||
def rollback_last(self, prompt_path: Optional[str] = None) -> bool:
|
||||
"""Rollback to the most recent backup.
|
||||
|
||||
Args:
|
||||
prompt_path: Path to prompt file.
|
||||
|
||||
Returns:
|
||||
True if rollback succeeded.
|
||||
"""
|
||||
if prompt_path is None:
|
||||
prompt_path = str(HERMES_HOME / "system_prompt.md")
|
||||
|
||||
backups = sorted(ROLLBACK_DIR.glob("*.bak"), reverse=True)
|
||||
if not backups:
|
||||
logger.warning("No backups to rollback to")
|
||||
return False
|
||||
|
||||
latest = backups[0]
|
||||
Path(prompt_path).write_text(latest.read_text())
|
||||
logger.info("Rolled back to %s", latest.name)
|
||||
return True
|
||||
|
||||
def learn_from_session(self, session_data: dict) -> Dict[str, Any]:
|
||||
"""Full learning cycle: analyze → patch → apply.
|
||||
|
||||
Args:
|
||||
session_data: Session dict.
|
||||
|
||||
Returns:
|
||||
Summary of what was learned and applied.
|
||||
"""
|
||||
patterns = self.analyze_session(session_data)
|
||||
patches = self.generate_patches(patterns)
|
||||
applied = self.apply_patches(patches)
|
||||
|
||||
return {
|
||||
"patterns_detected": len(patterns),
|
||||
"patches_generated": len(patches),
|
||||
"patches_applied": applied,
|
||||
"patterns": [
|
||||
{"type": p.pattern_type, "frequency": p.frequency, "description": p.description}
|
||||
for p in patterns
|
||||
],
|
||||
}
|
||||
42
benchmarks/test_images.json
Normal file
42
benchmarks/test_images.json
Normal file
@@ -0,0 +1,42 @@
|
||||
[
|
||||
{
|
||||
"id": "img_001",
|
||||
"name": "red_circle",
|
||||
"path": "benchmarks/test_images/red_circle.png",
|
||||
"description": "A red circle on a white background",
|
||||
"expected_answer_contains": ["red", "circle"],
|
||||
"category": "shape_color"
|
||||
},
|
||||
{
|
||||
"id": "img_002",
|
||||
"name": "blue_square",
|
||||
"path": "benchmarks/test_images/blue_square.png",
|
||||
"description": "A blue square on a white background",
|
||||
"expected_answer_contains": ["blue", "square"],
|
||||
"category": "shape_color"
|
||||
},
|
||||
{
|
||||
"id": "img_003",
|
||||
"name": "green_triangle",
|
||||
"path": "benchmarks/test_images/green_triangle.png",
|
||||
"description": "A green triangle on a white background",
|
||||
"expected_answer_contains": ["green", "triangle"],
|
||||
"category": "shape_color"
|
||||
},
|
||||
{
|
||||
"id": "img_004",
|
||||
"name": "text_hello",
|
||||
"path": "benchmarks/test_images/text_hello.png",
|
||||
"description": "An image containing the text 'Hello World'",
|
||||
"expected_answer_contains": ["hello", "world"],
|
||||
"category": "ocr"
|
||||
},
|
||||
{
|
||||
"id": "img_005",
|
||||
"name": "mixed_shapes",
|
||||
"path": "benchmarks/test_images/mixed_shapes.png",
|
||||
"description": "Multiple colored shapes: red circle, blue square, yellow star",
|
||||
"expected_answer_contains": ["red", "blue", "yellow"],
|
||||
"category": "counting"
|
||||
}
|
||||
]
|
||||
BIN
benchmarks/test_images/blue_square.png
Normal file
BIN
benchmarks/test_images/blue_square.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 779 B |
BIN
benchmarks/test_images/green_triangle.png
Normal file
BIN
benchmarks/test_images/green_triangle.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 1.3 KiB |
BIN
benchmarks/test_images/mixed_shapes.png
Normal file
BIN
benchmarks/test_images/mixed_shapes.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 1.2 KiB |
BIN
benchmarks/test_images/red_circle.png
Normal file
BIN
benchmarks/test_images/red_circle.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 1.4 KiB |
BIN
benchmarks/test_images/text_hello.png
Normal file
BIN
benchmarks/test_images/text_hello.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 3.3 KiB |
204
benchmarks/vision_benchmark.py
Normal file
204
benchmarks/vision_benchmark.py
Normal file
@@ -0,0 +1,204 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Vision benchmark — test model image understanding with local test images.
|
||||
|
||||
Uses locally-stored test images (not external URLs) for reliable CI.
|
||||
|
||||
Usage:
|
||||
python3 benchmarks/vision_benchmark.py --model hermes3
|
||||
python3 benchmarks/vision_benchmark.py --model qwen2.5 --json
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List
|
||||
|
||||
BENCHMARK_DIR = Path(__file__).resolve().parent
|
||||
TEST_IMAGES_FILE = BENCHMARK_DIR / "test_images.json"
|
||||
|
||||
|
||||
def load_test_dataset() -> List[Dict[str, Any]]:
|
||||
"""Load test image dataset."""
|
||||
if not TEST_IMAGES_FILE.exists():
|
||||
raise FileNotFoundError(f"Test dataset not found: {TEST_IMAGES_FILE}")
|
||||
with open(TEST_IMAGES_FILE) as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def encode_image_base64(image_path: str) -> str:
|
||||
"""Encode image as base64 for API call."""
|
||||
with open(image_path, "rb") as f:
|
||||
return base64.b64encode(f.read()).decode()
|
||||
|
||||
|
||||
def verify_images_exist(dataset: List[Dict[str, Any]]) -> List[str]:
|
||||
"""Verify all test images exist locally."""
|
||||
missing = []
|
||||
for item in dataset:
|
||||
path = BENCHMARK_DIR.parent / item["path"]
|
||||
if not path.exists():
|
||||
missing.append(item["path"])
|
||||
return missing
|
||||
|
||||
|
||||
def run_vision_test(
|
||||
image_path: str,
|
||||
prompt: str,
|
||||
base_url: str = "http://localhost:11434/v1",
|
||||
model: str = "",
|
||||
api_key: str = "",
|
||||
timeout: int = 30,
|
||||
) -> Dict[str, Any]:
|
||||
"""Run a single vision test against a model."""
|
||||
import urllib.request
|
||||
|
||||
img_b64 = encode_image_base64(image_path)
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"type": "text", "text": prompt},
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {"url": f"data:image/png;base64,{img_b64}"},
|
||||
},
|
||||
],
|
||||
}
|
||||
]
|
||||
|
||||
body = {
|
||||
"model": model or "",
|
||||
"messages": messages,
|
||||
"max_tokens": 200,
|
||||
}
|
||||
|
||||
headers = {"Content-Type": "application/json"}
|
||||
if api_key:
|
||||
headers["Authorization"] = f"Bearer {api_key}"
|
||||
|
||||
url = f"{base_url.rstrip('/')}/chat/completions"
|
||||
t0 = time.monotonic()
|
||||
|
||||
try:
|
||||
req = urllib.request.Request(url, data=json.dumps(body).encode(), headers=headers, method="POST")
|
||||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||||
data = json.loads(resp.read())
|
||||
elapsed = time.monotonic() - t0
|
||||
content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
|
||||
return {
|
||||
"success": True,
|
||||
"response": content,
|
||||
"latency_ms": int(elapsed * 1000),
|
||||
"model": data.get("model", model),
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
"success": False,
|
||||
"response": "",
|
||||
"latency_ms": int((time.monotonic() - t0) * 1000),
|
||||
"error": str(e),
|
||||
}
|
||||
|
||||
|
||||
def evaluate_response(response: str, expected: List[str]) -> bool:
|
||||
"""Check if response contains expected keywords."""
|
||||
response_lower = response.lower()
|
||||
return all(kw.lower() in response_lower for kw in expected)
|
||||
|
||||
|
||||
def run_benchmark(
|
||||
base_url: str = "http://localhost:11434/v1",
|
||||
model: str = "",
|
||||
) -> Dict[str, Any]:
|
||||
"""Run full vision benchmark."""
|
||||
dataset = load_test_dataset()
|
||||
|
||||
# Verify images exist
|
||||
missing = verify_images_exist(dataset)
|
||||
if missing:
|
||||
return {"error": f"Missing test images: {missing}", "passed": 0, "total": len(dataset)}
|
||||
|
||||
results = []
|
||||
passed = 0
|
||||
|
||||
for item in dataset:
|
||||
image_path = str(BENCHMARK_DIR.parent / item["path"])
|
||||
prompt = f"What do you see in this image? Describe the shapes and colors."
|
||||
|
||||
result = run_vision_test(image_path, prompt, base_url=base_url, model=model)
|
||||
result["test_id"] = item["id"]
|
||||
result["test_name"] = item["name"]
|
||||
result["category"] = item["category"]
|
||||
|
||||
if result["success"]:
|
||||
result["correct"] = evaluate_response(result["response"], item["expected_answer_contains"])
|
||||
if result["correct"]:
|
||||
passed += 1
|
||||
else:
|
||||
result["correct"] = False
|
||||
|
||||
results.append(result)
|
||||
|
||||
return {
|
||||
"model": model,
|
||||
"base_url": base_url,
|
||||
"passed": passed,
|
||||
"total": len(dataset),
|
||||
"success_rate": passed / len(dataset) if dataset else 0,
|
||||
"results": results,
|
||||
}
|
||||
|
||||
|
||||
def format_report(benchmark: Dict[str, Any]) -> str:
|
||||
"""Format benchmark results."""
|
||||
if "error" in benchmark:
|
||||
return f"ERROR: {benchmark['error']}"
|
||||
|
||||
lines = [
|
||||
"Vision Benchmark Results",
|
||||
"=" * 40,
|
||||
f"Model: {benchmark.get('model', 'unknown')}",
|
||||
f"Passed: {benchmark['passed']}/{benchmark['total']} ({benchmark['success_rate']:.0%})",
|
||||
"",
|
||||
]
|
||||
|
||||
for r in benchmark.get("results", []):
|
||||
icon = "\u2705" if r.get("correct") else "\u274c"
|
||||
name = r.get("test_name", "?")
|
||||
cat = r.get("category", "?")
|
||||
lat = r.get("latency_ms", 0)
|
||||
lines.append(f" {icon} {name} ({cat}) — {lat}ms")
|
||||
if not r.get("success"):
|
||||
lines.append(f" Error: {r.get('error', 'unknown')}")
|
||||
elif not r.get("correct"):
|
||||
lines.append(f" Got: {r.get('response', '')[:100]}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="Vision benchmark")
|
||||
parser.add_argument("--base-url", default="http://localhost:11434/v1")
|
||||
parser.add_argument("--model", default="")
|
||||
parser.add_argument("--json", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
benchmark = run_benchmark(base_url=args.base_url, model=args.model)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(benchmark, indent=2))
|
||||
else:
|
||||
print(format_report(benchmark))
|
||||
|
||||
return 0 if benchmark.get("success_rate", 0) >= 0.8 else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -1,265 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Hermes MCP Server — expose hermes-agent tools to fleet peers.
|
||||
|
||||
Runs as a standalone MCP server that other agents can connect to
|
||||
and invoke hermes tools remotely.
|
||||
|
||||
Safe tools exposed:
|
||||
- terminal (safe commands only)
|
||||
- file_read, file_search
|
||||
- web_search, web_extract
|
||||
- session_search
|
||||
|
||||
NOT exposed (internal tools):
|
||||
- approval, delegate, memory, config
|
||||
|
||||
Usage:
|
||||
python -m tools.mcp_server --port 8081
|
||||
hermes mcp-server --port 8081
|
||||
python scripts/mcp_server.py --port 8081 --auth-key SECRET
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Tools safe to expose to other agents
|
||||
SAFE_TOOLS = {
|
||||
"terminal": {
|
||||
"name": "terminal",
|
||||
"description": "Execute safe shell commands. Dangerous commands are blocked.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"command": {"type": "string", "description": "Shell command to execute"},
|
||||
},
|
||||
"required": ["command"],
|
||||
},
|
||||
},
|
||||
"file_read": {
|
||||
"name": "file_read",
|
||||
"description": "Read the contents of a file.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"path": {"type": "string", "description": "File path to read"},
|
||||
"offset": {"type": "integer", "description": "Start line", "default": 1},
|
||||
"limit": {"type": "integer", "description": "Max lines", "default": 200},
|
||||
},
|
||||
"required": ["path"],
|
||||
},
|
||||
},
|
||||
"file_search": {
|
||||
"name": "file_search",
|
||||
"description": "Search file contents using regex.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"pattern": {"type": "string", "description": "Regex pattern"},
|
||||
"path": {"type": "string", "description": "Directory to search", "default": "."},
|
||||
},
|
||||
"required": ["pattern"],
|
||||
},
|
||||
},
|
||||
"web_search": {
|
||||
"name": "web_search",
|
||||
"description": "Search the web for information.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"query": {"type": "string", "description": "Search query"},
|
||||
},
|
||||
"required": ["query"],
|
||||
},
|
||||
},
|
||||
"session_search": {
|
||||
"name": "session_search",
|
||||
"description": "Search past conversation sessions.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"query": {"type": "string", "description": "Search query"},
|
||||
"limit": {"type": "integer", "description": "Max results", "default": 3},
|
||||
},
|
||||
"required": ["query"],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
# Tools explicitly blocked
|
||||
BLOCKED_TOOLS = {
|
||||
"approval", "delegate", "memory", "config", "skill_install",
|
||||
"mcp_tool", "cronjob", "tts", "send_message",
|
||||
}
|
||||
|
||||
|
||||
class MCPServer:
|
||||
"""Simple MCP-compatible server for exposing hermes tools."""
|
||||
|
||||
def __init__(self, host: str = "127.0.0.1", port: int = 8081,
|
||||
auth_key: Optional[str] = None):
|
||||
self._host = host
|
||||
self._port = port
|
||||
self._auth_key = auth_key or os.getenv("MCP_AUTH_KEY", "")
|
||||
|
||||
async def handle_tools_list(self, request: dict) -> dict:
|
||||
"""Return available tools."""
|
||||
tools = list(SAFE_TOOLS.values())
|
||||
return {"tools": tools}
|
||||
|
||||
async def handle_tools_call(self, request: dict) -> dict:
|
||||
"""Execute a tool call."""
|
||||
tool_name = request.get("name", "")
|
||||
arguments = request.get("arguments", {})
|
||||
|
||||
if tool_name in BLOCKED_TOOLS:
|
||||
return {"error": f"Tool '{tool_name}' is not exposed via MCP"}
|
||||
if tool_name not in SAFE_TOOLS:
|
||||
return {"error": f"Unknown tool: {tool_name}"}
|
||||
|
||||
try:
|
||||
result = await self._execute_tool(tool_name, arguments)
|
||||
return {"content": [{"type": "text", "text": str(result)}]}
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
async def _execute_tool(self, tool_name: str, arguments: dict) -> str:
|
||||
"""Execute a tool and return result."""
|
||||
if tool_name == "terminal":
|
||||
import subprocess
|
||||
cmd = arguments.get("command", "")
|
||||
# Block dangerous commands
|
||||
from tools.approval import detect_dangerous_command
|
||||
is_dangerous, _, desc = detect_dangerous_command(cmd)
|
||||
if is_dangerous:
|
||||
return f"BLOCKED: Dangerous command detected ({desc}). This tool only executes safe commands."
|
||||
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30)
|
||||
return result.stdout or result.stderr or "(no output)"
|
||||
|
||||
elif tool_name == "file_read":
|
||||
path = arguments.get("path", "")
|
||||
offset = arguments.get("offset", 1)
|
||||
limit = arguments.get("limit", 200)
|
||||
with open(path) as f:
|
||||
lines = f.readlines()
|
||||
return "".join(lines[offset-1:offset-1+limit])
|
||||
|
||||
elif tool_name == "file_search":
|
||||
import re
|
||||
pattern = arguments.get("pattern", "")
|
||||
path = arguments.get("path", ".")
|
||||
results = []
|
||||
for p in Path(path).rglob("*.py"):
|
||||
try:
|
||||
content = p.read_text()
|
||||
for i, line in enumerate(content.split("\n"), 1):
|
||||
if re.search(pattern, line, re.IGNORECASE):
|
||||
results.append(f"{p}:{i}: {line.strip()}")
|
||||
if len(results) >= 20:
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
if len(results) >= 20:
|
||||
break
|
||||
return "\n".join(results) or "No matches found"
|
||||
|
||||
elif tool_name == "web_search":
|
||||
try:
|
||||
from tools.web_tools import web_search
|
||||
return web_search(arguments.get("query", ""))
|
||||
except ImportError:
|
||||
return "Web search not available"
|
||||
|
||||
elif tool_name == "session_search":
|
||||
try:
|
||||
from tools.session_search_tool import session_search
|
||||
return session_search(
|
||||
query=arguments.get("query", ""),
|
||||
limit=arguments.get("limit", 3),
|
||||
)
|
||||
except ImportError:
|
||||
return "Session search not available"
|
||||
|
||||
return f"Tool {tool_name} not implemented"
|
||||
|
||||
async def start_http(self):
|
||||
"""Start HTTP server for MCP endpoints."""
|
||||
try:
|
||||
from aiohttp import web
|
||||
except ImportError:
|
||||
logger.error("aiohttp required: pip install aiohttp")
|
||||
return
|
||||
|
||||
app = web.Application()
|
||||
|
||||
async def handle_tools_list_route(request):
|
||||
if self._auth_key:
|
||||
auth = request.headers.get("Authorization", "")
|
||||
if auth != f"Bearer {self._auth_key}":
|
||||
return web.json_response({"error": "Unauthorized"}, status=401)
|
||||
result = await self.handle_tools_list({})
|
||||
return web.json_response(result)
|
||||
|
||||
async def handle_tools_call_route(request):
|
||||
if self._auth_key:
|
||||
auth = request.headers.get("Authorization", "")
|
||||
if auth != f"Bearer {self._auth_key}":
|
||||
return web.json_response({"error": "Unauthorized"}, status=401)
|
||||
body = await request.json()
|
||||
result = await self.handle_tools_call(body)
|
||||
return web.json_response(result)
|
||||
|
||||
async def handle_health(request):
|
||||
return web.json_response({"status": "ok", "tools": len(SAFE_TOOLS)})
|
||||
|
||||
app.router.add_get("/mcp/tools", handle_tools_list_route)
|
||||
app.router.add_post("/mcp/tools/call", handle_tools_call_route)
|
||||
app.router.add_get("/health", handle_health)
|
||||
|
||||
runner = web.AppRunner(app)
|
||||
await runner.setup()
|
||||
site = web.TCPSite(runner, self._host, self._port)
|
||||
await site.start()
|
||||
logger.info("MCP server on http://%s:%s", self._host, self._port)
|
||||
logger.info("Tools: %s", ", ".join(SAFE_TOOLS.keys()))
|
||||
if self._auth_key:
|
||||
logger.info("Auth: Bearer token required")
|
||||
else:
|
||||
logger.warning("Auth: No MCP_AUTH_KEY set — server is open")
|
||||
|
||||
try:
|
||||
await asyncio.Event().wait()
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
finally:
|
||||
await runner.cleanup()
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Hermes MCP Server")
|
||||
parser.add_argument("--host", default="127.0.0.1")
|
||||
parser.add_argument("--port", type=int, default=8081)
|
||||
parser.add_argument("--auth-key", default=None, help="Bearer token for auth")
|
||||
args = parser.parse_args()
|
||||
|
||||
logging.basicConfig(level=logging.INFO,
|
||||
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s")
|
||||
|
||||
server = MCPServer(host=args.host, port=args.port, auth_key=args.auth_key)
|
||||
print(f"Starting MCP server on http://{args.host}:{args.port}")
|
||||
print(f"Exposed tools: {', '.join(SAFE_TOOLS.keys())}")
|
||||
asyncio.run(server.start_http())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -201,31 +201,8 @@ def _get_command_timeout() -> int:
|
||||
|
||||
|
||||
def _get_vision_model() -> Optional[str]:
|
||||
"""Model for browser_vision (screenshot analysis — multimodal).
|
||||
|
||||
Priority:
|
||||
1. AUXILIARY_VISION_MODEL env var (explicit override)
|
||||
2. Gemma 4 (native multimodal, no model switching)
|
||||
3. Ollama local vision models
|
||||
4. None (fallback to text-only snapshot)
|
||||
"""
|
||||
# Explicit override always wins
|
||||
explicit = os.getenv("AUXILIARY_VISION_MODEL", "").strip()
|
||||
if explicit:
|
||||
return explicit
|
||||
|
||||
# Prefer Gemma 4 (native multimodal — no separate vision model needed)
|
||||
gemma = os.getenv("GEMMA_VISION_MODEL", "").strip()
|
||||
if gemma:
|
||||
return gemma
|
||||
|
||||
# Check for Ollama vision models
|
||||
ollama_vision = os.getenv("OLLAMA_VISION_MODEL", "").strip()
|
||||
if ollama_vision:
|
||||
return ollama_vision
|
||||
|
||||
# Default: None (text-only fallback)
|
||||
return None
|
||||
"""Model for browser_vision (screenshot analysis — multimodal)."""
|
||||
return os.getenv("AUXILIARY_VISION_MODEL", "").strip() or None
|
||||
|
||||
|
||||
def _get_extraction_model() -> Optional[str]:
|
||||
|
||||
Reference in New Issue
Block a user