Compare commits
2 Commits
step35/466
...
fix/517
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a091a5c9bf | ||
|
|
4afde6da78 |
@@ -1,144 +0,0 @@
|
||||
# Local Hardware MCP Integration
|
||||
|
||||
Integrate the Model Context Protocol (MCP) to allow Timmy agents to control local hardware securely: file system, smart home (Hue lights), and system information.
|
||||
|
||||
## Components
|
||||
|
||||
- **MCP Server**: `scripts/hardware_mcp_server.py` — stdio-based MCP server exposing 8 tools
|
||||
- **Config Template**: `timmy-local/hardware_mcp_config.yaml` — runtime tuning
|
||||
- **Smoke Tests**: `tests/test_hardware_mcp_server.py`
|
||||
|
||||
## Prerequisites
|
||||
|
||||
```bash
|
||||
# MCP SDK
|
||||
pip install mcp
|
||||
|
||||
# OpenHue CLI (for smart home control)
|
||||
brew install openhue/cli/openhue # macOS
|
||||
# or see: https://github.com/openhue/openhue-cli
|
||||
|
||||
# Optional: psutil for detailed system_info
|
||||
pip install psutil
|
||||
```
|
||||
|
||||
## Quick Start
|
||||
|
||||
### 1. Start the MCP server
|
||||
|
||||
The server runs as a subprocess launched by Hermes Agent via the native-MCP integration.
|
||||
|
||||
Add to `~/.hermes/config.yaml`:
|
||||
|
||||
```yaml
|
||||
mcp_servers:
|
||||
hardware:
|
||||
command: "python"
|
||||
args: ["/full/path/to/timmy-home/scripts/hardware_mcp_server.py"]
|
||||
# Optional: add env vars if needed
|
||||
# env:
|
||||
# OPENHUE_BRIDGE_IP: "192.168.1.100"
|
||||
```
|
||||
|
||||
### 2. Restart Hermes
|
||||
|
||||
On startup, Hermes will:
|
||||
1. Launch the hardware MCP server
|
||||
2. Discover all 8 tools
|
||||
3. Register them with `hardware_*` prefixes (e.g., `hardware_file_read`, `hardware_light_control`)
|
||||
|
||||
### 3. Use in conversation
|
||||
|
||||
```
|
||||
User: Read my Timmy report file.
|
||||
Agent: [calls hardware_file_read with path="~/LOCAL_Timmy_REPORT.md"]
|
||||
|
||||
User: Turn off the bedroom lights.
|
||||
Agent: [calls hardware_light_control with name="Bedroom Lamp", on=false]
|
||||
|
||||
User: List files in my downloads folder.
|
||||
Agent: [calls hardware_file_list with directory="~/Downloads"]
|
||||
|
||||
User: What's my system status?
|
||||
Agent: [calls hardware_system_info]
|
||||
```
|
||||
|
||||
## Tool Reference
|
||||
|
||||
| Tool | Purpose | Parameters |
|
||||
|------|---------|------------|
|
||||
| `hardware_file_read` | Read file (≤10 MB) from home/tmp | `path` (string) |
|
||||
| `hardware_file_write` | Write text file | `path`, `content` |
|
||||
| `hardware_file_list` | List directory contents | `directory` (default: ~) |
|
||||
| `hardware_light_list` | List all Hue lights/rooms/scenes | none |
|
||||
| `hardware_light_control` | Control individual light | `name`, `on`, `brightness`, `color`, `temperature` |
|
||||
| `hardware_room_control` | Control all lights in a room | `name`, `on`, `brightness` |
|
||||
| `hardware_scene_set` | Activate Hue scene | `scene`, `room` |
|
||||
| `hardware_system_info` | System info (OS, CPU, memory, disk) | none |
|
||||
|
||||
## Security Model
|
||||
|
||||
- **File path allowlist**: Only paths under `~` (home), `/tmp`, and `/private/tmp` are permitted.
|
||||
- **File size cap**: 10 MB max per read.
|
||||
- **No arbitrary commands**: Only explicit tool operations; no shell execution.
|
||||
- **Smart home requires OpenHue CLI**: Light control goes through the official Hue CLI which handles bridge authentication.
|
||||
- **Graceful degradation**: If `psutil` is missing, `system_info` returns basic platform data; if `openhue` is missing, light tools return install instructions.
|
||||
|
||||
## Runtime Configuration
|
||||
|
||||
Edit `~/.timmy/hardware/hardware_mcp_config.yaml` (copy from `timmy-local/hardware_mcp_config.yaml`) to adjust:
|
||||
|
||||
```yaml
|
||||
guards:
|
||||
max_consecutive_errors: 3
|
||||
max_mcp_calls_per_session: 0 # 0 = unlimited
|
||||
allowed_dirs:
|
||||
- "~"
|
||||
- "/tmp"
|
||||
- "/private/tmp"
|
||||
max_file_size_bytes: 10485760 # 10 MB
|
||||
```
|
||||
|
||||
## Testing
|
||||
|
||||
```bash
|
||||
# Validate Python syntax
|
||||
python3 -m py_compile scripts/hardware_mcp_server.py
|
||||
|
||||
# Run smoke tests
|
||||
pytest tests/test_hardware_mcp_server.py -v
|
||||
```
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
**MCP tools not appearing in Hermes**
|
||||
|
||||
- Verify `mcp` Python package is installed: `pip show mcp`
|
||||
- Check `~/.hermes/config.yaml` syntax (YAML parse)
|
||||
- Restart Hermes (MCP connects at startup only)
|
||||
- Check Hermes logs: `~/.hermes/logs/` for MCP connection errors
|
||||
|
||||
**"openhue CLI not found"**
|
||||
|
||||
- Install OpenHue: `brew install openhue/cli/openhue`
|
||||
- First run requires pressing the Hue Bridge button to pair
|
||||
- Ensure bridge is on same local network
|
||||
|
||||
**"Path not allowed"**
|
||||
|
||||
- Only home (`~`), `/tmp`, and `/private/tmp` are accessible
|
||||
- Use absolute paths or `~/` expansion; relative paths are resolved from home
|
||||
|
||||
**File too large**
|
||||
|
||||
- Max read size is 10 MB. Split or compress large files.
|
||||
|
||||
## Dependencies
|
||||
|
||||
| Package | Purpose | Install |
|
||||
|---------|---------|---------|
|
||||
| `mcp` | MCP SDK (server framework) | `pip install mcp` |
|
||||
| `openhue` | Hue light control CLI | `brew install openhue/cli/openhue` |
|
||||
| `psutil` (optional) | Detailed memory/disk metrics | `pip install psutil` |
|
||||
|
||||
## Closes #466
|
||||
@@ -454,23 +454,112 @@ class TimmyAI:
|
||||
|
||||
class NPCAI:
|
||||
"""AI for non-player characters. They make choices based on goals."""
|
||||
|
||||
|
||||
GOAL_ROOM_TARGETS = {
|
||||
"Marcus": {
|
||||
"sit": "Garden",
|
||||
"speak_truth": "Threshold",
|
||||
"remember": "Bridge",
|
||||
},
|
||||
"Bezalel": {
|
||||
"forge": "Forge",
|
||||
"tend_fire": "Forge",
|
||||
"create_key": "Forge",
|
||||
},
|
||||
"Allegro": {
|
||||
"oversee": "Threshold",
|
||||
"keep_time": "Tower",
|
||||
"check_tunnel": "Bridge",
|
||||
},
|
||||
"Ezra": {
|
||||
"study": "Tower",
|
||||
"read_whiteboard": "Tower",
|
||||
"find_pattern": "Tower",
|
||||
},
|
||||
"Gemini": {
|
||||
"observe": "Threshold",
|
||||
"tend_garden": "Garden",
|
||||
"listen": "Garden",
|
||||
},
|
||||
"Claude": {
|
||||
"inspect": "Threshold",
|
||||
"organize": "Tower",
|
||||
"enforce_order": "Bridge",
|
||||
},
|
||||
"ClawCode": {
|
||||
"forge": "Forge",
|
||||
"test_edge": "Bridge",
|
||||
"build_weapon": "Forge",
|
||||
},
|
||||
"Kimi": {
|
||||
"contemplate": "Garden",
|
||||
"read": "Tower",
|
||||
"remember": "Bridge",
|
||||
},
|
||||
}
|
||||
|
||||
GOAL_CYCLES = {
|
||||
"Marcus": ("sit", "speak_truth", "remember"),
|
||||
"Allegro": ("oversee", "keep_time", "check_tunnel"),
|
||||
"Claude": ("inspect", "organize", "enforce_order"),
|
||||
"ClawCode": ("test_edge", "forge", "build_weapon"),
|
||||
"Kimi": ("contemplate", "read", "remember"),
|
||||
}
|
||||
|
||||
def __init__(self, world):
|
||||
self.world = world
|
||||
|
||||
|
||||
def _available_targets(self, available, prefix):
|
||||
return [a.split(":", 1)[1] for a in available if a.startswith(f"{prefix}:")]
|
||||
|
||||
def _target_room_for(self, char_name, goal):
|
||||
return self.GOAL_ROOM_TARGETS.get(char_name, {}).get(goal)
|
||||
|
||||
def _next_direction_toward(self, current_room, target_room):
|
||||
if current_room == target_room:
|
||||
return None
|
||||
frontier = [(current_room, [])]
|
||||
seen = {current_room}
|
||||
while frontier:
|
||||
room, path = frontier.pop(0)
|
||||
if room == target_room:
|
||||
return path[0] if path else None
|
||||
for direction, dest in self.world.rooms[room].get("connections", {}).items():
|
||||
if dest not in seen:
|
||||
seen.add(dest)
|
||||
frontier.append((dest, path + [direction]))
|
||||
return None
|
||||
|
||||
def _move_toward_goal(self, room, target_room):
|
||||
direction = self._next_direction_toward(room, target_room)
|
||||
return f"move:{direction}" if direction else None
|
||||
|
||||
def _advance_goal_cycle(self, char_name, char):
|
||||
cycle = self.GOAL_CYCLES.get(char_name)
|
||||
if not cycle or self.world.tick <= 0:
|
||||
return
|
||||
goal = char.get("active_goal")
|
||||
if goal not in cycle:
|
||||
return
|
||||
target_room = self._target_room_for(char_name, goal)
|
||||
if char.get("room") != target_room:
|
||||
return
|
||||
if self.world.tick % 12 != 0:
|
||||
return
|
||||
index = cycle.index(goal)
|
||||
char["active_goal"] = cycle[(index + 1) % len(cycle)]
|
||||
|
||||
def make_choice(self, char_name):
|
||||
"""Make a choice for this NPC this tick."""
|
||||
char = self.world.characters[char_name]
|
||||
self._advance_goal_cycle(char_name, char)
|
||||
room = char["room"]
|
||||
available = ActionSystem.get_available_actions(char_name, self.world)
|
||||
|
||||
|
||||
# If low energy, rest
|
||||
if char["energy"] <= 1:
|
||||
return "rest"
|
||||
|
||||
# Goal-driven behavior
|
||||
goal = char["active_goal"]
|
||||
|
||||
|
||||
if char_name == "Marcus":
|
||||
return self._marcus_choice(char, room, available)
|
||||
elif char_name == "Bezalel":
|
||||
@@ -487,66 +576,96 @@ class NPCAI:
|
||||
return self._clawcode_choice(char, room, available)
|
||||
elif char_name == "Kimi":
|
||||
return self._kimi_choice(char, room, available)
|
||||
|
||||
|
||||
return "rest"
|
||||
|
||||
|
||||
def _marcus_choice(self, char, room, available):
|
||||
goal = char.get("active_goal", "sit")
|
||||
target_room = self._target_room_for("Marcus", goal)
|
||||
if room != target_room:
|
||||
return self._move_toward_goal(room, target_room) or "rest"
|
||||
others = self._available_targets(available, "speak")
|
||||
if goal == "speak_truth" and others:
|
||||
return f"speak:{random.choice(others)}"
|
||||
if goal == "remember" and room == "Bridge":
|
||||
return random.choice(["examine", "rest"])
|
||||
if room == "Garden" and random.random() < 0.7:
|
||||
return "rest"
|
||||
if room != "Garden":
|
||||
return "move:west"
|
||||
# Speak to someone if possible
|
||||
others = [a.split(":")[1] for a in available if a.startswith("speak:")]
|
||||
if others and random.random() < 0.4:
|
||||
return f"speak:{random.choice(others)}"
|
||||
return "rest"
|
||||
|
||||
|
||||
def _bezalel_choice(self, char, room, available):
|
||||
target_room = self._target_room_for("Bezalel", char.get("active_goal", "forge"))
|
||||
if room != target_room:
|
||||
return self._move_toward_goal(room, target_room) or "rest"
|
||||
if room == "Forge" and self.world.rooms["Forge"]["fire"] == "glowing":
|
||||
return random.choice(["forge", "rest"] if char["energy"] > 2 else ["rest"])
|
||||
if room != "Forge":
|
||||
return "move:west"
|
||||
if random.random() < 0.3:
|
||||
return "tend_fire"
|
||||
return "forge"
|
||||
|
||||
|
||||
def _kimi_choice(self, char, room, available):
|
||||
others = [a.split(":")[1] for a in available if a.startswith("speak:")]
|
||||
goal = char.get("active_goal", "contemplate")
|
||||
target_room = self._target_room_for("Kimi", goal)
|
||||
if room != target_room:
|
||||
return self._move_toward_goal(room, target_room) or "rest"
|
||||
others = self._available_targets(available, "speak")
|
||||
if goal == "read" and room == "Tower":
|
||||
return "study" if char["energy"] > 2 else "rest"
|
||||
if room == "Garden" and others and random.random() < 0.3:
|
||||
return f"speak:{random.choice(others)}"
|
||||
if room == "Tower":
|
||||
return "study" if char["energy"] > 2 else "rest"
|
||||
return "move:east" # Head back toward Garden
|
||||
|
||||
if room == "Bridge":
|
||||
return random.choice(["examine", "rest"])
|
||||
return "rest"
|
||||
|
||||
def _gemini_choice(self, char, room, available):
|
||||
others = [a.split(":")[1] for a in available if a.startswith("listen:")]
|
||||
if room == "Garden" and others and random.random() < 0.4:
|
||||
return f"listen:{random.choice(others)}"
|
||||
return random.choice(["plant", "rest"] if room == "Garden" else ["move:west"])
|
||||
|
||||
goal = char.get("active_goal", "observe")
|
||||
target_room = self._target_room_for("Gemini", goal)
|
||||
if room != target_room:
|
||||
return self._move_toward_goal(room, target_room) or "rest"
|
||||
listeners = self._available_targets(available, "listen")
|
||||
if room == "Garden" and listeners and random.random() < 0.4:
|
||||
return f"listen:{random.choice(listeners)}"
|
||||
return random.choice(["plant", "rest"] if room == "Garden" else ["examine", "rest"])
|
||||
|
||||
def _ezra_choice(self, char, room, available):
|
||||
goal = char.get("active_goal", "study")
|
||||
target_room = self._target_room_for("Ezra", goal)
|
||||
if room != target_room:
|
||||
return self._move_toward_goal(room, target_room) or "rest"
|
||||
if room == "Tower" and char["energy"] > 2:
|
||||
return random.choice(["study", "write_rule", "help:Timmy"])
|
||||
if room != "Tower":
|
||||
return "move:south"
|
||||
return "rest"
|
||||
|
||||
|
||||
def _claude_choice(self, char, room, available):
|
||||
others = [a.split(":")[1] for a in available if a.startswith("confront:")]
|
||||
goal = char.get("active_goal", "inspect")
|
||||
target_room = self._target_room_for("Claude", goal)
|
||||
if room != target_room:
|
||||
return self._move_toward_goal(room, target_room) or "rest"
|
||||
others = self._available_targets(available, "confront")
|
||||
if others and random.random() < 0.2:
|
||||
return f"confront:{random.choice(others)}"
|
||||
return random.choice(["examine", "rest"])
|
||||
|
||||
|
||||
def _clawcode_choice(self, char, room, available):
|
||||
goal = char.get("active_goal", "test_edge")
|
||||
target_room = self._target_room_for("ClawCode", goal)
|
||||
if room != target_room:
|
||||
return self._move_toward_goal(room, target_room) or "rest"
|
||||
if room == "Forge" and char["energy"] > 2:
|
||||
return "forge"
|
||||
return random.choice(["move:east", "forge", "rest"])
|
||||
|
||||
return random.choice(["examine", "rest"])
|
||||
|
||||
def _allegro_choice(self, char, room, available):
|
||||
others = [a.split(":")[1] for a in available if a.startswith("speak:")]
|
||||
goal = char.get("active_goal", "oversee")
|
||||
target_room = self._target_room_for("Allegro", goal)
|
||||
if room != target_room:
|
||||
return self._move_toward_goal(room, target_room) or "rest"
|
||||
others = self._available_targets(available, "speak")
|
||||
if others and random.random() < 0.3:
|
||||
return f"speak:{random.choice(others)}"
|
||||
return random.choice(["move:north", "move:south", "examine"])
|
||||
return random.choice(["examine", "rest"])
|
||||
|
||||
|
||||
class DialogueSystem:
|
||||
|
||||
@@ -1,56 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Local Hardware MCP operator helper — generate config snippets and verify environment."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parents[1]
|
||||
HERMES_CONFIG = Path.home() / ".hermes" / "config.yaml"
|
||||
HARDWARE_MCP_CONFIG = Path.home() / ".timmy" / "hardware" / "hardware_mcp_config.yaml"
|
||||
HARDWARE_SERVER = REPO_ROOT / "scripts" / "hardware_mcp_server.py"
|
||||
|
||||
|
||||
def build_mcp_config_snippet() -> str:
|
||||
"""Return the mcp_servers YAML snippet for ~/.hermes/config.yaml."""
|
||||
return f"""mcp_servers:
|
||||
hardware:
|
||||
command: "python"
|
||||
args: ["{HARDWARE_SERVER}"]
|
||||
"""
|
||||
|
||||
|
||||
def build_wakeup_hook() -> str:
|
||||
"""Return a bash snippet that can be sourced before Hermes starts (optional)."""
|
||||
return f"""#!/usr/bin/env bash
|
||||
# Hardware MCP environment check
|
||||
if command -v openhue >/dev/null 2>&1; then
|
||||
echo "[Hardware MCP] OpenHue found: $(openhue version)"
|
||||
else
|
||||
echo "[Hardware MCP] Warning: openhue CLI not installed — light control disabled"
|
||||
fi
|
||||
"""
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
p = argparse.ArgumentParser(description="Hardware MCP integration helper")
|
||||
p.add_argument("--print-config", action="store_true", help="Print mcp_servers YAML snippet")
|
||||
p.add_argument("--print-hook", action="store_true", help="Print optional session-start hook")
|
||||
p.add_argument("--verify", action="store_true", help="Verify server script exists and is executable")
|
||||
args = p.parse_args()
|
||||
|
||||
if args.print_config:
|
||||
print(build_mcp_config_snippet())
|
||||
elif args.print_hook:
|
||||
print(build_wakeup_hook())
|
||||
elif args.verify:
|
||||
ok = HARDWARE_SERVER.exists()
|
||||
print(f"Server script: {'OK' if ok else 'MISSING'} at {HARDWARE_SERVER}")
|
||||
sys.exit(0 if ok else 1)
|
||||
else:
|
||||
p.print_help()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,206 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Local Hardware MCP Server — Secure control of local hardware.
|
||||
|
||||
Exposes tools for:
|
||||
- File system operations (read, write, list) within allowed directories
|
||||
- Smart home control via OpenHue (Philips Hue lights)
|
||||
- System information (safe, read-only)
|
||||
|
||||
Security: Enforces directory allowlist for file access.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import tempfile
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from mcp.server import Server
|
||||
from mcp.server.stdio import stdio_server
|
||||
from mcp.types import Tool, TextContent
|
||||
|
||||
ALLOWED_DIRS = [
|
||||
str(Path.home()), # User home directory
|
||||
"/tmp", # macOS symlink to /private/tmp
|
||||
"/private/tmp", # real tmp path
|
||||
str(Path(tempfile.gettempdir())), # actual system temp dir
|
||||
]
|
||||
OPENHUE_CMD = "openhue"
|
||||
MAX_FILE_SIZE = 10 * 1024 * 1024
|
||||
app = Server("hardware")
|
||||
|
||||
|
||||
def is_path_allowed(path: Path) -> bool:
|
||||
try:
|
||||
resolved = path.resolve()
|
||||
return any(resolved.is_relative_to(Path(d).resolve()) for d in ALLOWED_DIRS)
|
||||
except (ValueError, OSError):
|
||||
return False
|
||||
|
||||
|
||||
def run_openhue(args: list[str]) -> dict[str, Any]:
|
||||
try:
|
||||
result = subprocess.run([OPENHUE_CMD] + args, capture_output=True, text=True, timeout=30)
|
||||
return {
|
||||
"success": result.returncode == 0,
|
||||
"stdout": result.stdout.strip(),
|
||||
"stderr": result.stderr.strip(),
|
||||
"returncode": result.returncode,
|
||||
}
|
||||
except FileNotFoundError:
|
||||
return {"success": False,
|
||||
"error": "openhue CLI not found. Install: brew install openhue/cli/openhue"}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
|
||||
@app.list_tools()
|
||||
async def list_tools():
|
||||
return [
|
||||
Tool(name="file_read",
|
||||
description="Read a file from allowed directories (home, /tmp) up to 10 MB.",
|
||||
inputSchema={"type": "object", "properties": {"path": {"type": "string",
|
||||
"description": "File path to read (e.g., ~/notes.txt)"}}, "required": ["path"]}),
|
||||
Tool(name="file_write",
|
||||
description="Write text content to a file within allowed directories.",
|
||||
inputSchema={"type": "object", "properties": {"path": {"type": "string"},
|
||||
"content": {"type": "string"}}, "required": ["path", "content"]}),
|
||||
Tool(name="file_list",
|
||||
description="List files and directories in a given folder.",
|
||||
inputSchema={"type": "object", "properties": {"directory": {"type": "string", "default": "~"}}, "required": []}),
|
||||
Tool(name="light_list",
|
||||
description="List all Hue lights, rooms, and scenes.",
|
||||
inputSchema={"type": "object", "properties": {}, "required": []}),
|
||||
Tool(name="light_control",
|
||||
description="Control a Hue light: on/off, brightness 0-100, color name/hex, temperature 153-500 mirek.",
|
||||
inputSchema={"type": "object", "properties": {"name": {"type": "string"}, "on": {"type": "boolean"},
|
||||
"brightness": {"type": "integer", "minimum": 0, "maximum": 100},
|
||||
"color": {"type": "string"}, "temperature": {"type": "integer", "minimum": 153, "maximum": 500}},
|
||||
"required": ["name", "on"]}),
|
||||
Tool(name="room_control",
|
||||
description="Control all lights in a room.",
|
||||
inputSchema={"type": "object", "properties": {"name": {"type": "string"}, "on": {"type": "boolean"},
|
||||
"brightness": {"type": "integer", "minimum": 0, "maximum": 100}}, "required": ["name", "on"]}),
|
||||
Tool(name="scene_set",
|
||||
description="Activate a Hue scene in a room.",
|
||||
inputSchema={"type": "object", "properties": {"scene": {"type": "string"}, "room": {"type": "string"}}, "required": ["scene", "room"]}),
|
||||
Tool(name="system_info",
|
||||
description="Get safe system info: OS, CPU count, memory, disk usage.",
|
||||
inputSchema={"type": "object", "properties": {}, "required": []}),
|
||||
]
|
||||
|
||||
|
||||
@app.call_tool()
|
||||
async def call_tool(name: str, arguments: dict):
|
||||
if name == "file_read":
|
||||
path = Path(arguments["path"].strip()).expanduser()
|
||||
if not is_path_allowed(path):
|
||||
return [TextContent(type="text", text=json.dumps({"error": f"Path not allowed: {path}"}))]
|
||||
if not path.is_file():
|
||||
return [TextContent(type="text", text=json.dumps({"error": f"File not found: {path}"}))]
|
||||
try:
|
||||
size = path.stat().st_size
|
||||
if size > MAX_FILE_SIZE:
|
||||
return [TextContent(type="text", text=json.dumps({"error": f"File too large: {size} bytes"}))]
|
||||
content = path.read_text()
|
||||
return [TextContent(type="text", text=json.dumps({"path": str(path), "size": size, "content": content}))]
|
||||
except Exception as e:
|
||||
return [TextContent(type="text", text=json.dumps({"error": str(e)}))]
|
||||
|
||||
elif name == "file_write":
|
||||
path = Path(arguments["path"].strip()).expanduser()
|
||||
if not is_path_allowed(path):
|
||||
return [TextContent(type="text", text=json.dumps({"error": f"Path not allowed: {path}"}))]
|
||||
try:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(arguments["content"])
|
||||
return [TextContent(type="text", text=json.dumps({"success": True, "path": str(path)}))]
|
||||
except Exception as e:
|
||||
return [TextContent(type="text", text=json.dumps({"error": str(e)}))]
|
||||
|
||||
elif name == "file_list":
|
||||
directory = Path(arguments.get("directory", "~").strip()).expanduser()
|
||||
if not is_path_allowed(directory):
|
||||
return [TextContent(type="text", text=json.dumps({"error": f"Directory not allowed: {directory}"}))]
|
||||
if not directory.is_dir():
|
||||
return [TextContent(type="text", text=json.dumps({"error": f"Not a directory: {directory}"}))]
|
||||
try:
|
||||
entries = []
|
||||
for entry in sorted(directory.iterdir()):
|
||||
try:
|
||||
stat = entry.stat()
|
||||
entries.append({"name": entry.name, "is_dir": entry.is_dir(),
|
||||
"size": stat.st_size if entry.is_file() else None})
|
||||
except (OSError, PermissionError):
|
||||
pass
|
||||
return [TextContent(type="text", text=json.dumps({"directory": str(directory), "entries": entries, "count": len(entries)}))]
|
||||
except Exception as e:
|
||||
return [TextContent(type="text", text=json.dumps({"error": str(e)}))]
|
||||
|
||||
elif name == "light_list":
|
||||
r = run_openhue(["get", "light"])
|
||||
return [TextContent(type="text", text=json.dumps(r))]
|
||||
|
||||
elif name == "light_control":
|
||||
args = ["set", "light", f'"{arguments["name"]}"']
|
||||
if arguments.get("on") is not None:
|
||||
args.append("--on" if arguments["on"] else "--off")
|
||||
if brightness := arguments.get("brightness"):
|
||||
args.append(f"--brightness {brightness}")
|
||||
if color := arguments.get("color"):
|
||||
args.append(f"--color {color}")
|
||||
if temperature := arguments.get("temperature"):
|
||||
args.append(f"--temperature {temperature}")
|
||||
return [TextContent(type="text", text=json.dumps(run_openhue(args)))]
|
||||
|
||||
elif name == "room_control":
|
||||
args = ["set", "room", f'"{arguments["name"]}"']
|
||||
if arguments.get("on") is not None:
|
||||
args.append("--on" if arguments["on"] else "--off")
|
||||
if brightness := arguments.get("brightness"):
|
||||
args.append(f"--brightness {brightness}")
|
||||
return [TextContent(type="text", text=json.dumps(run_openhue(args)))]
|
||||
|
||||
elif name == "scene_set":
|
||||
args = ["set", "scene", arguments["scene"], "--room", arguments["room"]]
|
||||
return [TextContent(type="text", text=json.dumps(run_openhue(args)))]
|
||||
|
||||
elif name == "system_info":
|
||||
try:
|
||||
import platform
|
||||
info = {"platform": platform.system(), "release": platform.release(),
|
||||
"arch": platform.machine(), "hostname": platform.node(),
|
||||
"cpu_count": os.cpu_count()}
|
||||
try:
|
||||
import psutil
|
||||
mem = psutil.virtual_memory()
|
||||
info["memory_gb"] = round(mem.total / (1024**3), 2)
|
||||
disk = psutil.disk_usage(str(Path.home()))
|
||||
info["disk_home_gb"] = round(disk.total / (1024**3), 2)
|
||||
except ImportError:
|
||||
info["memory_gb"] = "psutil not installed"
|
||||
info["disk_home_gb"] = "psutil not installed"
|
||||
return [TextContent(type="text", text=json.dumps(info, indent=2))]
|
||||
except Exception as e:
|
||||
return [TextContent(type="text", text=json.dumps({"error": str(e)}))]
|
||||
|
||||
else:
|
||||
return [TextContent(type="text", text=json.dumps({
|
||||
"error": f"Unknown tool: {name}",
|
||||
"available": ["file_read", "file_write", "file_list", "light_list",
|
||||
"light_control", "room_control", "scene_set", "system_info"],
|
||||
}))]
|
||||
|
||||
|
||||
async def main():
|
||||
async with stdio_server() as (rs, ws):
|
||||
await app.run(rs, ws, app.create_initialization_options())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import asyncio
|
||||
asyncio.run(main())
|
||||
|
||||
@@ -1,51 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Functional test for hardware_mcp_server — uses asyncio.get_event_loop for restricted envs."""
|
||||
|
||||
import asyncio, json, tempfile, sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
from scripts.hardware_mcp_server import call_tool, is_path_allowed
|
||||
|
||||
async def run_tests():
|
||||
# Path allowlist
|
||||
assert is_path_allowed(Path.home() / "any.txt")
|
||||
assert is_path_allowed(Path("/tmp/foo"))
|
||||
assert not is_path_allowed(Path("/etc/passwd"))
|
||||
print("✓ Path allowlist")
|
||||
|
||||
# file_list on home
|
||||
res = await call_tool("file_list", {"directory": "~"})
|
||||
data = json.loads(res[0].text)
|
||||
assert "entries" in data and data["count"] >= 0
|
||||
print(f"✓ file_list works, entries: {data['count']}")
|
||||
|
||||
# file_write + file_read round-trip in temp dir
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
fp = Path(td) / "hmcp_test.txt"
|
||||
content = "Hardware MCP round-trip OK"
|
||||
w = await call_tool("file_write", {"path": str(fp), "content": content})
|
||||
assert json.loads(w[0].text).get("success")
|
||||
r = await call_tool("file_read", {"path": str(fp)})
|
||||
assert json.loads(r[0].text)["content"] == content
|
||||
print("✓ file write/read round-trip")
|
||||
|
||||
# file_read error: missing file
|
||||
err = await call_tool("file_read", {"path": str(Path.home() / "no_such_file_xyz")})
|
||||
assert "error" in json.loads(err[0].text)
|
||||
print("✓ file_read reports missing file")
|
||||
|
||||
# Security: path traversal blocked
|
||||
block = await call_tool("file_read", {"path": "/etc/passwd"})
|
||||
bd = json.loads(block[0].text)
|
||||
assert "not allowed" in bd.get("error", "").lower()
|
||||
print("✓ Path traversal blocked")
|
||||
|
||||
print("\nAll functional checks passed!")
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Use get_event_loop for environments where asyncio.run is disabled
|
||||
try:
|
||||
asyncio.run(run_tests())
|
||||
except RuntimeError:
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(run_tests())
|
||||
@@ -1,126 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Smoke tests for hardware_mcp_server."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest import TestCase
|
||||
|
||||
# Add repo root to path
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
sys.path.insert(0, str(ROOT))
|
||||
|
||||
|
||||
class TestHardwareMCPToolDefinitions(TestCase):
|
||||
"""Verify the MCP server is well-formed and tools have required schemas."""
|
||||
|
||||
def test_server_imports(self):
|
||||
"""Server module must import cleanly."""
|
||||
import importlib.util
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
"hardware_mcp_server",
|
||||
ROOT / "scripts" / "hardware_mcp_server.py"
|
||||
)
|
||||
self.assertIsNotNone(spec)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
self.assertTrue(hasattr(mod, "app"))
|
||||
self.assertTrue(hasattr(mod, "list_tools"))
|
||||
self.assertTrue(hasattr(mod, "call_tool"))
|
||||
|
||||
def test_list_tools_returns_at_least_five_tools(self):
|
||||
"""list_tools() must return multiple tools covering file ops, lights, and system info."""
|
||||
import asyncio
|
||||
from scripts.hardware_mcp_server import list_tools
|
||||
tools = asyncio.run(list_tools())
|
||||
tool_names = [t.name for t in tools]
|
||||
# Core capabilities
|
||||
self.assertIn("file_read", tool_names)
|
||||
self.assertIn("file_write", tool_names)
|
||||
self.assertIn("file_list", tool_names)
|
||||
self.assertIn("light_list", tool_names)
|
||||
self.assertIn("light_control", tool_names)
|
||||
self.assertIn("room_control", tool_names)
|
||||
self.assertIn("scene_set", tool_names)
|
||||
self.assertIn("system_info", tool_names)
|
||||
self.assertGreaterEqual(len(tools), 8)
|
||||
|
||||
def test_file_read_schema_requires_path(self):
|
||||
"""file_read tool must require 'path' parameter."""
|
||||
import asyncio
|
||||
from scripts.hardware_mcp_server import list_tools
|
||||
tools = asyncio.run(list_tools())
|
||||
ft = next(t for t in tools if t.name == "file_read")
|
||||
self.assertIn("path", ft.inputSchema["properties"])
|
||||
self.assertIn("path", ft.inputSchema["required"])
|
||||
|
||||
def test_light_control_schema_requires_name_and_on(self):
|
||||
"""light_control requires name and on."""
|
||||
import asyncio
|
||||
from scripts.hardware_mcp_server import list_tools
|
||||
tools = asyncio.run(list_tools())
|
||||
ft = next(t for t in tools if t.name == "light_control")
|
||||
self.assertIn("name", ft.inputSchema["required"])
|
||||
self.assertIn("on", ft.inputSchema["required"])
|
||||
|
||||
def test_system_info_is_readonly(self):
|
||||
"""system_info tool takes no arguments."""
|
||||
import asyncio
|
||||
from scripts.hardware_mcp_server import list_tools
|
||||
tools = asyncio.run(list_tools())
|
||||
ft = next(t for t in tools if t.name == "system_info")
|
||||
self.assertEqual(ft.inputSchema.get("required", []), [])
|
||||
self.assertEqual(len(ft.inputSchema.get("properties", {})), 0)
|
||||
|
||||
def test_file_write_path_allowed_check(self):
|
||||
"""File write must enforce path allowlist (regression guard)."""
|
||||
from scripts.hardware_mcp_server import is_path_allowed, Path
|
||||
self.assertTrue(is_path_allowed(Path.home() / "test.txt"))
|
||||
self.assertTrue(is_path_allowed(Path("/tmp/test.txt")))
|
||||
# Outside allowed dirs should be rejected
|
||||
self.assertFalse(is_path_allowed(Path("/etc/passwd")))
|
||||
|
||||
def test_run_openhue_error_handling(self):
|
||||
"""openhue runner returns structured error when CLI missing."""
|
||||
from scripts.hardware_mcp_server import run_openhue
|
||||
result = run_openhue(["get", "light"])
|
||||
# On a system without openhue, must return success=False with helpful error
|
||||
self.assertIn("success", result)
|
||||
if not result.get("success"):
|
||||
self.assertIn("error", result)
|
||||
self.assertIn("openhue", result.get("error", "").lower())
|
||||
|
||||
|
||||
class TestHardwareMCPConfigCompleteness(TestCase):
|
||||
"""Validate config template matches tool set."""
|
||||
|
||||
def test_config_template_exists(self):
|
||||
self.assertTrue((ROOT / "timmy-local" / "hardware_mcp_config.yaml").exists())
|
||||
|
||||
def test_config_lists_all_tools(self):
|
||||
with open(ROOT / "timmy-local" / "hardware_mcp_config.yaml") as f:
|
||||
content = f.read()
|
||||
# All tool names should appear in the tools: section
|
||||
for tool in ["file_read", "file_write", "file_list", "light_list",
|
||||
"light_control", "room_control", "scene_set", "system_info"]:
|
||||
self.assertIn(tool, content, f"Tool {tool} missing from config tools list")
|
||||
|
||||
def test_config_has_security_guards(self):
|
||||
with open(ROOT / "timmy-local" / "hardware_mcp_config.yaml") as f:
|
||||
content = f.read()
|
||||
self.assertIn("max_consecutive_errors", content)
|
||||
self.assertIn("allowed_dirs", content)
|
||||
self.assertIn("max_file_size_bytes", content)
|
||||
|
||||
def test_config_has_server_key(self):
|
||||
with open(ROOT / "timmy-local" / "hardware_mcp_config.yaml") as f:
|
||||
content = f.read()
|
||||
self.assertIn("server_key: hardware", content)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import unittest
|
||||
unittest.main()
|
||||
54
tests/test_tower_game_npc_purpose.py
Normal file
54
tests/test_tower_game_npc_purpose.py
Normal file
@@ -0,0 +1,54 @@
|
||||
from importlib.util import module_from_spec, spec_from_file_location
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
GAME_PATH = ROOT / "evennia" / "timmy_world" / "world" / "game.py"
|
||||
|
||||
|
||||
def load_game_module():
|
||||
spec = spec_from_file_location("tower_world_game", GAME_PATH)
|
||||
module = module_from_spec(spec)
|
||||
assert spec.loader is not None
|
||||
spec.loader.exec_module(module)
|
||||
module.random.seed(0)
|
||||
return module
|
||||
|
||||
|
||||
def _visitor_sets_after_ticks(module, ticks=100):
|
||||
engine = module.GameEngine()
|
||||
engine.start_new_game()
|
||||
visitors = {room: set() for room in engine.world.rooms}
|
||||
for _ in range(ticks):
|
||||
engine.run_tick("rest")
|
||||
for name, char in engine.world.characters.items():
|
||||
if name == "Timmy":
|
||||
continue
|
||||
visitors[char["room"]].add(name)
|
||||
return visitors
|
||||
|
||||
|
||||
class TestTowerGameNpcPurpose:
|
||||
def test_goal_driven_room_targets(self):
|
||||
module = load_game_module()
|
||||
world = module.World()
|
||||
npc_ai = module.NPCAI(world)
|
||||
|
||||
world.characters["Marcus"]["room"] = "Threshold"
|
||||
world.characters["Marcus"]["active_goal"] = "sit"
|
||||
assert npc_ai.make_choice("Marcus") == "move:east"
|
||||
|
||||
world.characters["Ezra"]["room"] = "Threshold"
|
||||
world.characters["Ezra"]["active_goal"] = "study"
|
||||
assert npc_ai.make_choice("Ezra") == "move:north"
|
||||
|
||||
world.characters["Claude"]["room"] = "Threshold"
|
||||
world.characters["Claude"]["active_goal"] = "enforce_order"
|
||||
assert npc_ai.make_choice("Claude") == "move:south"
|
||||
|
||||
def test_every_room_gets_multiple_npc_visitors_over_100_ticks(self):
|
||||
module = load_game_module()
|
||||
visitors = _visitor_sets_after_ticks(module, ticks=100)
|
||||
|
||||
assert all(len(names) >= 2 for names in visitors.values()), visitors
|
||||
assert len(visitors["Bridge"]) >= 3, visitors["Bridge"]
|
||||
@@ -1,3 +0,0 @@
|
||||
# hardware MCP config
|
||||
|
||||
Copy `hardware_mcp_config.yaml` to `~/.timmy/hardware/hardware_mcp_config.yaml` to enable runtime tuning.
|
||||
@@ -1,67 +0,0 @@
|
||||
# ═══════════════════════════════════════════════════════════════════════
|
||||
# Local Hardware MCP — Runtime Configuration
|
||||
# ═══════════════════════════════════════════════════════════════════════
|
||||
# Edit this file to tune hardware control settings.
|
||||
# Hermes loads this at session start when the hardware MCP server is enabled.
|
||||
#
|
||||
# Location: ~/.timmy/hardware/hardware_mcp_config.yaml
|
||||
# ═══════════════════════════════════════════════════════════════════════
|
||||
|
||||
# ── Server Identity ───────────────────────────────────────────────────
|
||||
server_key: hardware
|
||||
|
||||
# ── Tool Names ────────────────────────────────────────────────────────
|
||||
# Exact tool names Hermes registers. Update if you rename tools in
|
||||
# hardware_mcp_server.py.
|
||||
tools:
|
||||
- name: file_read
|
||||
hint: "Read a file from an allowed directory (home, /tmp). Max 10 MB."
|
||||
- name: file_write
|
||||
hint: "Write text content to a file within allowed directories."
|
||||
- name: file_list
|
||||
hint: "List files and directories in a given folder."
|
||||
- name: light_list
|
||||
hint: "List all Hue lights, rooms, and scenes from OpenHue."
|
||||
- name: light_control
|
||||
hint: "Control a specific Hue light: on/off, brightness, color, temperature."
|
||||
- name: room_control
|
||||
hint: "Control all lights in a room: on/off, brightness."
|
||||
- name: scene_set
|
||||
hint: "Activate a Hue scene in a room."
|
||||
- name: system_info
|
||||
hint: "Get safe system information: OS, CPU count, memory usage, disk space."
|
||||
|
||||
# ── Security Guards ───────────────────────────────────────────────────
|
||||
guards:
|
||||
# Maximum consecutive tool errors before stopping.
|
||||
max_consecutive_errors: 3
|
||||
|
||||
# Max total hardware MCP calls per session (0 = unlimited).
|
||||
max_mcp_calls_per_session: 0
|
||||
|
||||
# Allowed directories for file operations (expanded paths).
|
||||
allowed_dirs:
|
||||
- "~"
|
||||
- "/tmp"
|
||||
- "/private/tmp"
|
||||
|
||||
# Maximum file size for reads (bytes).
|
||||
max_file_size_bytes: 10485760 # 10 MB
|
||||
|
||||
# ── OpenHue ───────────────────────────────────────────────────────────
|
||||
# Path to openhue CLI (auto-detected if in PATH).
|
||||
openhue_command: "openhue"
|
||||
|
||||
# ── Dependencies ───────────────────────────────────────────────────────
|
||||
# Prerequisites:
|
||||
# - OpenHue CLI: brew install openhue/cli/openhue (macOS) or see https://github.com/openhue/openhue-cli
|
||||
# - MCP SDK: pip install mcp
|
||||
# - For system_info: pip install psutil (optional, for detailed memory/disk metrics)
|
||||
#
|
||||
# Config in ~/.hermes/config.yaml:
|
||||
# mcp_servers:
|
||||
# hardware:
|
||||
# command: "python"
|
||||
# args: ["/Users/you/path/to/timmy-home/scripts/hardware_mcp_server.py"]
|
||||
# env:
|
||||
# OPENHUE_BRIDGE_IP: "192.168.1.xx" # optional, if openhue needs it
|
||||
Reference in New Issue
Block a user