Compare commits

..

3 Commits

Author SHA1 Message Date
Alexander Whitestone
1f5067e94a Merge: bring in prior QA work on path guard (Refs #962)
All checks were successful
Lint / lint (pull_request) Successful in 15s
2026-04-22 00:25:50 -04:00
Alexander Whitestone
5d3e13ede2 test: add pre-commit path guard hook from burn/921 (Refs #962)
All checks were successful
Lint / lint (pull_request) Successful in 24s
Brings hooks/pre-commit-path-guard.py from burn/921-poka-yoke-hardcoded-paths
to complete QA verification of all guard layers.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 23:55:38 -04:00
Alexander Whitestone
9e00a59791 test: verify hardcoded-home path guard from burn/921 branch
All checks were successful
Lint / lint (pull_request) Successful in 29s
Cherry-picks tools/path_guard.py and tests/test_path_guard.py from
burn/921-poka-yoke-hardcoded-paths (commit 5dcb905). All 21 tests pass:

- hardcoded /Users/<name>/ paths are rejected at runtime
- hardcoded /home/<name>/ paths are rejected at runtime
- ~/.hermes/... via expanduser() passes (safe, expanded at runtime)
- valid relative and /tmp/ absolute paths pass
- static scanner catches violations and respects # noqa: hardcoded-path-ok
- comments are skipped by scanner
- directory scanner skips test files and __pycache__

Refs #962

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 22:26:54 -04:00
8 changed files with 363 additions and 612 deletions

View File

@@ -46,7 +46,6 @@ from hermes_cli.config import (
)
from gateway.status import get_running_pid, read_runtime_status
from agent.agent_card import get_agent_card_json
from agent.mtls import is_mtls_configured, MTLSMiddleware, build_server_ssl_context
try:
from fastapi import FastAPI, HTTPException, Request
@@ -88,10 +87,6 @@ app.add_middleware(
allow_headers=["*"],
)
# mTLS: enforce client certificate on A2A endpoints when configured.
# Activated by setting HERMES_MTLS_CERT, HERMES_MTLS_KEY, HERMES_MTLS_CA.
app.add_middleware(MTLSMiddleware)
# ---------------------------------------------------------------------------
# Endpoints that do NOT require the session token. Everything else under
# /api/ is gated by the auth middleware below. Keep this list minimal —
@@ -2110,20 +2105,6 @@ def start_server(
"authentication. Only use on trusted networks.", host,
)
# mTLS: when configured, pass SSL context to uvicorn so all connections
# are TLS with mandatory client certificate verification.
ssl_context = None
scheme = "http"
if is_mtls_configured():
try:
ssl_context = build_server_ssl_context()
scheme = "https"
_log.info(
"mTLS enabled — server requires client certificates (A2A auth)"
)
except Exception as exc:
_log.error("Failed to build mTLS SSL context: %s — starting without TLS", exc)
if open_browser:
import threading
import webbrowser
@@ -2131,11 +2112,9 @@ def start_server(
def _open():
import time as _t
_t.sleep(1.0)
webbrowser.open(f"{scheme}://{host}:{port}")
webbrowser.open(f"http://{host}:{port}")
threading.Thread(target=_open, daemon=True).start()
print(f" Hermes Web UI → {scheme}://{host}:{port}")
if ssl_context is not None:
print(" mTLS enabled — client certificate required for A2A endpoints")
uvicorn.run(app, host=host, port=port, log_level="warning", ssl=ssl_context)
print(f" Hermes Web UI → http://{host}:{port}")
uvicorn.run(app, host=host, port=port, log_level="warning")

View File

@@ -0,0 +1,68 @@
#!/usr/bin/env python3
"""
Pre-commit hook: Reject hardcoded home-directory paths.
Scans staged Python files for patterns like:
- /Users/<name>/...
- /home/<name>/...
- ~/... (in string literals outside expanduser context)
Escape hatch: add `# noqa: hardcoded-path-ok` to any legitimate line.
Install:
cp hooks/pre-commit-path-guard.py .git/hooks/pre-commit
chmod +x .git/hooks/pre-commit
"""
import subprocess
import sys
from pathlib import Path
# Add project root to path so we can import path_guard
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from tools.path_guard import scan_file_for_violations
def get_staged_files():
"""Get list of staged .py files."""
result = subprocess.run(
["git", "diff", "--cached", "--name-only", "--diff-filter=ACM"],
capture_output=True, text=True
)
return [f for f in result.stdout.strip().splitlines() if f.endswith(".py")]
def main():
files = get_staged_files()
if not files:
sys.exit(0)
all_violations = []
for filepath in files:
if not Path(filepath).exists():
continue
violations = scan_file_for_violations(filepath)
if violations:
all_violations.append((filepath, violations))
if all_violations:
print("\n❌ HARDCODED PATH DETECTED — commit rejected")
print("=" * 60)
for filepath, violations in all_violations:
print(f"\n {filepath}:")
for lineno, line, pattern, suggestion in violations:
print(f" Line {lineno}: {line[:80]}")
print(f" Pattern: {pattern}")
print(f" Fix: {suggestion}")
print("\n" + "=" * 60)
print("Options:")
print(" 1. Use get_hermes_home(), os.environ['HOME'], or relative paths")
print(" 2. Add # noqa: hardcoded-path-ok to the line for legitimate cases")
print("")
sys.exit(1)
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -38,7 +38,6 @@ dependencies = [
[project.optional-dependencies]
modal = ["modal>=1.0.0,<2"]
rag = ["lightrag-hku>=1.4.0,<2", "aiohttp>=3.9.0,<4"]
daytona = ["daytona>=0.148.0,<1"]
dev = ["debugpy>=1.8.0,<2", "pytest>=9.0.2,<10", "pytest-asyncio>=1.3.0,<2", "pytest-xdist>=3.0,<4", "mcp>=1.2.0,<2"]
messaging = ["python-telegram-bot[webhooks]>=22.6,<23", "discord.py[voice]>=2.7.1,<3", "aiohttp>=3.13.3,<4", "slack-bolt>=1.18.0,<2", "slack-sdk>=3.27.0,<4"]

127
tests/test_path_guard.py Normal file
View File

@@ -0,0 +1,127 @@
"""Tests for tools/path_guard.py — poka-yoke hardcoded path detection."""
import os
import tempfile
from pathlib import Path
import pytest
from tools.path_guard import (
PathGuardError,
scan_directory,
scan_file_for_violations,
validate_path,
validate_tool_paths,
)
class TestValidatePath:
"""Runtime path validation."""
def test_valid_relative_path(self):
assert validate_path("tools/file_tools.py") == "tools/file_tools.py"
def test_valid_absolute_path(self):
assert validate_path("/tmp/test.txt") == "/tmp/test.txt"
def test_valid_hermes_home(self):
assert validate_path(os.path.expanduser("~/.hermes/config.yaml")) is not None
def test_reject_users_hardcoded(self):
with pytest.raises(PathGuardError, match="/Users/"):
validate_path("/Users/someone_else/.hermes/config")
def test_reject_home_hardcoded(self):
with pytest.raises(PathGuardError, match="/home/"):
validate_path("/home/user/.hermes/config")
def test_empty_path(self):
assert validate_path("") == ""
assert validate_path(None) is None
def test_non_string(self):
assert validate_path(42) == 42
class TestValidateToolPaths:
"""Batch path validation."""
def test_all_valid(self):
paths = ["tools/file.py", "/tmp/x.txt", "relative/path.py"]
assert validate_tool_paths(paths) == paths
def test_mixed_invalid(self):
with pytest.raises(PathGuardError):
validate_tool_paths(["tools/file.py", "/Users/someone_else/secret.txt"])
def test_skips_non_strings(self):
assert validate_tool_paths([None, 42, "valid.py"]) == ["valid.py"]
class TestScanFileForViolations:
"""Static file scanning."""
def test_clean_file(self, tmp_path):
f = tmp_path / "clean.py"
f.write_text("import os\nHOME = os.environ['HOME']\n")
assert scan_file_for_violations(str(f)) == []
def test_hardcoded_users(self, tmp_path):
f = tmp_path / "bad.py"
f.write_text("CONFIG = '/Users/apayne/.hermes/config.yaml'\n")
violations = scan_file_for_violations(str(f))
assert len(violations) == 1
assert "/Users/<name>/" in violations[0][2]
def test_hardcoded_home(self, tmp_path):
f = tmp_path / "bad2.py"
f.write_text("PATH = '/home/deploy/.hermes/state.db'\n")
violations = scan_file_for_violations(str(f))
assert len(violations) == 1
assert "/home/<name>/" in violations[0][2]
def test_tilde_in_expanduser_ok(self, tmp_path):
f = tmp_path / "ok.py"
f.write_text("p = os.path.expanduser('~/.hermes/config')\n")
assert scan_file_for_violations(str(f)) == []
def test_tilde_in_display_ok(self, tmp_path):
f = tmp_path / "ok2.py"
f.write_text('print("~/config saved")\n')
assert scan_file_for_violations(str(f)) == []
def test_noqa_escape(self, tmp_path):
f = tmp_path / "noqa.py"
f.write_text("PATH = '/Users/apayne/test' # noqa: hardcoded-path-ok\n")
assert scan_file_for_violations(str(f)) == []
def test_comments_skipped(self, tmp_path):
f = tmp_path / "comment.py"
f.write_text("# PATH = '/Users/apayne/test'\n")
assert scan_file_for_violations(str(f)) == []
class TestScanDirectory:
"""Directory scanning."""
def test_clean_tree(self, tmp_path):
(tmp_path / "clean.py").write_text("import os\n")
(tmp_path / "sub").mkdir()
(tmp_path / "sub" / "also_clean.py").write_text("x = 1\n")
assert scan_directory(str(tmp_path)) == []
def test_finds_violations(self, tmp_path):
(tmp_path / "bad.py").write_text("P = '/Users/x/.hermes'\n")
results = scan_directory(str(tmp_path))
assert len(results) == 1
assert results[0][0].endswith("bad.py")
def test_skips_tests(self, tmp_path):
(tmp_path / "test_something.py").write_text("P = '/Users/x/.hermes'\n")
assert scan_directory(str(tmp_path)) == []
def test_skips_pycache(self, tmp_path):
cache = tmp_path / "__pycache__"
cache.mkdir()
(cache / "cached.py").write_text("P = '/Users/x/.hermes'\n")
assert scan_directory(str(tmp_path)) == []

View File

@@ -1,176 +0,0 @@
"""Tests for tools/lightrag_tool.py"""
import json
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# LightRAG may not be installed in all test environments
pytest.importorskip("lightrag", reason="lightrag-hku not installed")
from tools.lightrag_tool import (
check_lightrag_requirements,
lightrag_index,
lightrag_query,
_collect_markdown_files,
_read_text_safe,
LIGHTRAG_DIR,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _parse_result(result: str) -> dict:
"""Parse JSON tool result, falling back to error string detection."""
try:
return json.loads(result)
except json.JSONDecodeError:
return {"_error": result}
# ---------------------------------------------------------------------------
# Unit tests
# ---------------------------------------------------------------------------
class TestCollectMarkdownFiles:
def test_collects_md_files(self, tmp_path):
(tmp_path / "a.md").write_text("# A")
(tmp_path / "b.md").write_text("# B")
(tmp_path / "skip.txt").write_text("text")
found = _collect_markdown_files(tmp_path)
assert len(found) == 2
assert all(p.suffix == ".md" for p in found)
def test_skips_hidden_dirs(self, tmp_path):
(tmp_path / ".git").mkdir()
(tmp_path / ".git" / "readme.md").write_text("# git")
(tmp_path / "visible.md").write_text("# visible")
found = _collect_markdown_files(tmp_path)
names = [p.name for p in found]
assert "visible.md" in names
assert "readme.md" not in names
def test_returns_empty_for_missing_dir(self):
assert _collect_markdown_files(Path("/nonexistent")) == []
class TestReadTextSafe:
def test_reads_small_file(self, tmp_path):
p = tmp_path / "test.md"
p.write_text("hello world")
assert _read_text_safe(p) == "hello world"
def test_truncates_large_file(self, tmp_path):
p = tmp_path / "big.md"
p.write_text("x" * 1_000_000)
text = _read_text_safe(p, limit=500_000)
assert len(text) == 500_000
def test_reads_binary_without_crashing(self, tmp_path):
p = tmp_path / "binary.md"
p.write_bytes(b"\x00\x01\x02")
result = _read_text_safe(p)
# Should not crash; control chars 0x00-0x7F are valid UTF-8
assert isinstance(result, str)
class TestCheckRequirements:
@patch("tools.lightrag_tool._ollama_available", return_value=True)
def test_ok_when_ollama_up(self, mock_ollama):
assert check_lightrag_requirements() is True
@patch("tools.lightrag_tool._ollama_available", return_value=False)
def test_false_when_ollama_down(self, mock_ollama):
assert check_lightrag_requirements() is False
@patch.dict(sys.modules, {"lightrag": None}, clear=False)
def test_false_when_lightrag_missing(self):
with patch("tools.lightrag_tool._ollama_available", return_value=True):
# Force ImportError by removing lightrag from sys.modules
# and blocking import
assert check_lightrag_requirements() is False
class TestLightragIndex:
@patch("tools.lightrag_tool._ollama_available", return_value=False)
def test_error_when_ollama_down(self, mock_ollama):
result = lightrag_index()
assert "Ollama is not running" in result
@patch("tools.lightrag_tool._ollama_available", return_value=True)
@patch("tools.lightrag_tool._has_ollama_model", return_value=False)
def test_error_when_model_missing(self, mock_model, mock_ollama):
result = lightrag_index()
assert "not found in Ollama" in result
@patch("tools.lightrag_tool._ollama_available", return_value=True)
@patch("tools.lightrag_tool._has_ollama_model", return_value=True)
@patch("tools.lightrag_tool._get_lightrag")
@patch("tools.lightrag_tool._collect_markdown_files", return_value=[])
def test_warning_when_no_files(self, mock_collect, mock_get_rag, mock_model, mock_ollama):
result = lightrag_index()
data = _parse_result(result)
assert data.get("status") == "warning"
assert "No markdown files found" in data.get("message", "")
@patch("tools.lightrag_tool._ollama_available", return_value=True)
@patch("tools.lightrag_tool._has_ollama_model", return_value=True)
@patch("tools.lightrag_tool._get_lightrag")
@patch("tools.lightrag_tool._collect_markdown_files")
@patch("tools.lightrag_tool._read_text_safe", return_value="# Skill doc\nContent.")
@patch("asyncio.run")
def test_indexes_files(self, mock_asyncio, mock_read, mock_collect, mock_get_rag, mock_model, mock_ollama):
mock_collect.return_value = [Path("/fake/skills/git.md"), Path("/fake/skills/docker.md")]
mock_rag = MagicMock()
mock_get_rag.return_value = mock_rag
result = lightrag_index()
data = _parse_result(result)
assert data.get("status") == "ok"
assert data.get("indexed_files") == 2
assert data.get("errors") == 0
class TestLightragQuery:
@patch("tools.lightrag_tool._ollama_available", return_value=False)
def test_error_when_ollama_down(self, mock_ollama):
result = lightrag_query("test", mode="hybrid")
assert "Ollama is not running" in result
@patch("tools.lightrag_tool._ollama_available", return_value=True)
@patch("tools.lightrag_tool.LIGHTRAG_DIR")
def test_empty_index_message(self, mock_dir, mock_ollama):
mock_dir.exists.return_value = True
mock_dir.iterdir.return_value = iter([])
result = lightrag_query("test", mode="hybrid")
data = _parse_result(result)
assert data.get("status") == "empty"
@patch("tools.lightrag_tool._ollama_available", return_value=True)
@patch("tools.lightrag_tool.LIGHTRAG_DIR")
@patch("tools.lightrag_tool._get_lightrag")
@patch("asyncio.run", return_value="Use git clone for repos.")
def test_query_returns_answer(self, mock_asyncio, mock_get_rag, mock_dir, mock_ollama):
mock_dir.exists.return_value = True
mock_dir.iterdir.return_value = iter([Path("dummy")])
mock_rag = MagicMock()
mock_get_rag.return_value = mock_rag
result = lightrag_query("How do I clone a repo?", mode="hybrid")
data = _parse_result(result)
assert data.get("status") == "ok"
assert data.get("mode") == "hybrid"
assert "clone" in data.get("answer", "").lower()
@patch("tools.lightrag_tool._ollama_available", return_value=True)
def test_rejects_invalid_mode(self, mock_ollama):
result = lightrag_query("test", mode="invalid")
assert "mode must be one of" in result
def test_rejects_empty_query(self):
result = lightrag_query("", mode="hybrid")
assert "Query cannot be empty" in result

View File

@@ -1,405 +0,0 @@
#!/usr/bin/env python3
"""
LightRAG Tool — Graph-based knowledge retrieval for skills and docs.
Indexes markdown files under ~/.hermes/skills/ (and optional extra dirs)
into a LightRAG knowledge graph stored at ~/.hermes/lightrag/.
Requires:
- lightrag-hku (pip install lightrag-hku)
- Ollama running locally with an embedding model (default: nomic-embed-text)
- Ollama running locally with a chat model (default: qwen2.5:7b)
Usage:
lightrag_query("How do I dispatch the burn fleet?", mode="hybrid")
lightrag_index() # re-index skill files
"""
import asyncio
import json
import logging
import os
from pathlib import Path
from typing import Dict, List, Optional
import numpy as np
from hermes_constants import get_hermes_home
from tools.registry import registry, tool_error
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
DEFAULT_EMBED_MODEL = os.environ.get("LIGHTRAG_EMBED_MODEL", "nomic-embed-text")
DEFAULT_LLM_MODEL = os.environ.get("LIGHTRAG_LLM_MODEL", "qwen2.5:7b")
DEFAULT_OLLAMA_HOST = os.environ.get("LIGHTRAG_OLLAMA_HOST", "http://localhost:11434")
LIGHTRAG_DIR = get_hermes_home() / "lightrag"
SKILLS_DIR = get_hermes_home() / "skills"
# ---------------------------------------------------------------------------
# Ollama helpers
# ---------------------------------------------------------------------------
def _ollama_available() -> bool:
"""Check if Ollama server is reachable."""
try:
import urllib.request
req = urllib.request.Request(f"{DEFAULT_OLLAMA_HOST}/api/tags")
with urllib.request.urlopen(req, timeout=3) as resp:
return resp.status == 200
except Exception:
return False
def _has_ollama_model(model_name: str) -> bool:
"""Check if a specific model is pulled in Ollama."""
try:
import urllib.request
req = urllib.request.Request(f"{DEFAULT_OLLAMA_HOST}/api/tags")
with urllib.request.urlopen(req, timeout=3) as resp:
data = json.loads(resp.read())
models = [m["name"] for m in data.get("models", [])]
return any(model_name in m for m in models)
except Exception:
return False
async def _ollama_embedding(texts: list, **kwargs) -> np.ndarray:
"""Call Ollama embeddings API."""
import aiohttp
payload = {
"model": DEFAULT_EMBED_MODEL,
"input": texts,
}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{DEFAULT_OLLAMA_HOST}/api/embed",
json=payload,
timeout=aiohttp.ClientTimeout(total=60),
) as resp:
resp.raise_for_status()
data = await resp.json()
embeddings = data.get("embeddings", [])
if not embeddings:
raise RuntimeError("Ollama returned empty embeddings")
return np.array(embeddings, dtype=np.float32)
async def _ollama_complete(
prompt, system_prompt=None, history_messages=None, **kwargs
) -> str:
"""Call Ollama generate API for LLM completion."""
import aiohttp
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
if history_messages:
for msg in history_messages:
role = "user" if msg.get("role") == "user" else "assistant"
messages.append({"role": role, "content": msg.get("content", "")})
messages.append({"role": "user", "content": prompt})
payload = {
"model": DEFAULT_LLM_MODEL,
"messages": messages,
"stream": False,
"options": {"temperature": 0.3, "num_predict": 2048},
}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{DEFAULT_OLLAMA_HOST}/api/chat",
json=payload,
timeout=aiohttp.ClientTimeout(total=120),
) as resp:
resp.raise_for_status()
data = await resp.json()
return data.get("message", {}).get("content", "")
# ---------------------------------------------------------------------------
# LightRAG setup
# ---------------------------------------------------------------------------
_lightrag_instance: Optional[object] = None
def _get_lightrag() -> object:
"""Lazy-initialize LightRAG with Ollama backends."""
global _lightrag_instance
if _lightrag_instance is not None:
return _lightrag_instance
try:
from lightrag import LightRAG, QueryParam
from lightrag.utils import EmbeddingFunc
except ImportError as e:
raise RuntimeError(
"lightrag is not installed. Run: pip install lightrag-hku"
) from e
LIGHTRAG_DIR.mkdir(parents=True, exist_ok=True)
# Wrap Ollama embedding for LightRAG
embed_func = EmbeddingFunc(
embedding_dim=768, # nomic-embed-text dimension
func=_ollama_embedding,
max_token_size=8192,
model_name=DEFAULT_EMBED_MODEL,
)
_lightrag_instance = LightRAG(
working_dir=str(LIGHTRAG_DIR),
embedding_func=embed_func,
llm_model_func=_ollama_complete,
llm_model_name=DEFAULT_LLM_MODEL,
chunk_token_size=1200,
chunk_overlap_token_size=100,
)
return _lightrag_instance
# ---------------------------------------------------------------------------
# Indexing
# ---------------------------------------------------------------------------
def _collect_markdown_files(root: Path) -> List[Path]:
"""Collect all .md files under root, excluding node_modules and .git."""
files = []
if not root.exists():
return files
for path in root.rglob("*.md"):
if any(part.startswith(".") or part == "node_modules" for part in path.parts):
continue
files.append(path)
return sorted(files)
def _read_text_safe(path: Path, limit: int = 500_000) -> str:
"""Read file text with size limit."""
try:
stat = path.stat()
if stat.st_size > limit:
return path.read_text(encoding="utf-8", errors="ignore")[:limit]
return path.read_text(encoding="utf-8", errors="ignore")
except Exception as e:
logger.warning("Failed to read %s: %s", path, e)
return ""
def lightrag_index(directories: Optional[List[str]] = None) -> str:
"""Index markdown files into LightRAG knowledge graph.
Args:
directories: Extra directories to index (in addition to ~/.hermes/skills/).
"""
if not _ollama_available():
return tool_error(
"Ollama is not running. Start it with: ollama serve"
)
if not _has_ollama_model(DEFAULT_EMBED_MODEL):
return tool_error(
f"Embedding model '{DEFAULT_EMBED_MODEL}' not found in Ollama. "
f"Pull it with: ollama pull {DEFAULT_EMBED_MODEL}"
)
if not _has_ollama_model(DEFAULT_LLM_MODEL):
return tool_error(
f"LLM model '{DEFAULT_LLM_MODEL}' not found in Ollama. "
f"Pull it with: ollama pull {DEFAULT_LLM_MODEL}"
)
rag = _get_lightrag()
dirs = [SKILLS_DIR]
if directories:
for d in directories:
p = Path(d).expanduser()
if p.exists():
dirs.append(p)
all_files = []
for d in dirs:
all_files.extend(_collect_markdown_files(d))
if not all_files:
return json.dumps({
"status": "warning",
"message": "No markdown files found to index.",
"directories": [str(d) for d in dirs],
})
# Read and insert files
inserted = 0
errors = 0
for path in all_files:
text = _read_text_safe(path)
if not text.strip():
continue
try:
# LightRAG insert is async; bridge it
asyncio.run(rag.atext(text))
inserted += 1
except Exception as e:
logger.warning("Failed to index %s: %s", path, e)
errors += 1
return json.dumps({
"status": "ok",
"indexed_files": inserted,
"errors": errors,
"total_files": len(all_files),
"storage_dir": str(LIGHTRAG_DIR),
})
# ---------------------------------------------------------------------------
# Query
# ---------------------------------------------------------------------------
def lightrag_query(query: str, mode: str = "hybrid") -> str:
"""Query the LightRAG knowledge graph.
Args:
query: The question or search query.
mode: Search mode — "local" (nearby entities), "global" (graph-wide),
or "hybrid" (both).
"""
if not query or not query.strip():
return tool_error("Query cannot be empty.")
if mode not in {"local", "global", "hybrid"}:
return tool_error("mode must be one of: local, global, hybrid")
if not _ollama_available():
return tool_error(
"Ollama is not running. Start it with: ollama serve"
)
rag = _get_lightrag()
# Check if any data has been indexed
if not LIGHTRAG_DIR.exists() or not any(LIGHTRAG_DIR.iterdir()):
return json.dumps({
"status": "empty",
"message": "LightRAG index is empty. Run lightrag_index() first.",
})
try:
from lightrag import QueryParam
param = QueryParam(mode=mode)
result = asyncio.run(rag.aquery(query, param=param))
return json.dumps({
"status": "ok",
"mode": mode,
"query": query,
"answer": result,
})
except Exception as e:
logger.exception("LightRAG query failed")
return tool_error(f"Query failed: {e}")
# ---------------------------------------------------------------------------
# Tool schemas
# ---------------------------------------------------------------------------
LIGHTRAG_QUERY_SCHEMA = {
"name": "lightrag_query",
"description": (
"Graph-based knowledge retrieval over indexed skills and documentation.\n\n"
"Use this when the user asks about: conventions, workflows, tool usage, "
"project-specific practices, or anything that might be documented in skills.\n\n"
"Modes:\n"
"- local: fast, searches nearby entities in the graph\n"
"- global: thorough, reasons across the entire knowledge graph\n"
"- hybrid: balanced, combines local and global (recommended)\n\n"
"If the index is empty, the tool will report that and you should "
"call lightrag_index() to populate it."
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The question or search query.",
},
"mode": {
"type": "string",
"enum": ["local", "global", "hybrid"],
"description": "Search mode. hybrid is recommended.",
},
},
"required": ["query"],
},
}
LIGHTRAG_INDEX_SCHEMA = {
"name": "lightrag_index",
"description": (
"(Re-)build the LightRAG knowledge graph from skill files and docs.\n\n"
"By default indexes ~/.hermes/skills/. Pass extra directories if needed.\n"
"This is a one-time or occasional operation; queries work against the "
"existing index until you re-index."
),
"parameters": {
"type": "object",
"properties": {
"directories": {
"type": "array",
"items": {"type": "string"},
"description": "Optional extra directories to index (in addition to ~/.hermes/skills/).",
},
},
},
}
# ---------------------------------------------------------------------------
# Availability check
# ---------------------------------------------------------------------------
def check_lightrag_requirements() -> bool:
"""Return True if LightRAG and Ollama appear to be available."""
try:
import lightrag # noqa: F401
except ImportError:
return False
return _ollama_available()
# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------
registry.register(
name="lightrag_query",
toolset="rag",
schema=LIGHTRAG_QUERY_SCHEMA,
handler=lambda args, **kw: lightrag_query(
query=args.get("query", ""),
mode=args.get("mode", "hybrid"),
),
check_fn=check_lightrag_requirements,
emoji="🔎",
)
registry.register(
name="lightrag_index",
toolset="rag",
schema=LIGHTRAG_INDEX_SCHEMA,
handler=lambda args, **kw: lightrag_index(
directories=args.get("directories"),
),
check_fn=check_lightrag_requirements,
emoji="📚",
)

165
tools/path_guard.py Normal file
View File

@@ -0,0 +1,165 @@
"""
tools/path_guard.py — Poka-yoke: Prevent hardcoded home-directory paths.
Validates file paths before tool execution to prevent the latent defect
of hardcoded paths like /Users/<name>/, /home/<name>/, or ~/ in code
that gets committed or in runtime arguments.
Usage:
from tools.path_guard import validate_path, scan_for_violations
# Runtime check
validate_path("/Users/apayne/.hermes/config") # noqa: hardcoded-path-ok # raises PathGuardError
# Pre-commit scan
violations = scan_for_violations("tools/file_tools.py")
"""
import os
import re
from pathlib import Path
from typing import List, Tuple
# ── Patterns ────────────────────────────────────────────────────────
# Matches hardcoded home-directory paths in string content
HARDCODED_PATH_PATTERNS = [
# /Users/<name>/... (macOS)
(re.compile(r"""['"]/(Users)/[\w.-]+/"""), "/Users/<name>/"),
# /home/<name>/... (Linux)
(re.compile(r"""['"]/home/[\w.-]+/"""), "/home/<name>/"),
# Bare ~/... (unexpanded tilde in code — NOT in expanduser() calls)
(re.compile(r"""['"]~/[^'"]+['"]"""), "~/..."), # noqa: hardcoded-path-ok
# /root/... (Linux root home)
(re.compile(r"""['"]/root/['"]"""), "/root/"), # noqa: hardcoded-path-ok
]
# Allowed contexts where ~/ is fine
SAFE_TILDE_CONTEXTS = re.compile(
r"""expanduser|display_path|relpath|os\.path|Path\(|str\(.*home|"""
r"""noqa:\s*hardcoded-path-ok|""" # explicit escape hatch
r"""\bprint\(|f['"]|\.format\(|""" # display/formatting contexts
r"""["']~/["']\s*$""", # just displaying ~/ as prefix
re.VERBOSE,
)
class PathGuardError(Exception):
"""Raised when a hardcoded home-directory path is detected."""
def __init__(self, path: str, pattern_name: str, suggestion: str):
self.path = path
self.pattern_name = pattern_name
self.suggestion = suggestion
super().__init__(
f"Hardcoded path detected: {path} matches {pattern_name}. "
f"Suggestion: {suggestion}. "
f"Use get_hermes_home(), os.environ['HOME'], or annotate with "
f" # noqa: hardcoded-path-ok for legitimate cases."
)
# ── Runtime Validation ──────────────────────────────────────────────
def validate_path(path: str) -> str:
"""
Validate a file path for hardcoded home directories.
Returns the path if valid, raises PathGuardError if not.
This is meant to be called in tool wrappers (write_file, execute_code)
before executing operations with user-supplied paths.
Note: At runtime, paths from os.path.expanduser() will resolve to
/Users/<name>/... — this is expected and allowed. The guard catches
paths that were LITERALLY hardcoded in source code or tool arguments
that look like they came from a different machine (e.g., a path
containing a different username than the current user).
"""
if not path or not isinstance(path, str):
return path
# At runtime, expanded paths matching current HOME are fine
home = os.environ.get("HOME", "")
if home and path.startswith(home):
return path
# Check for hardcoded /Users/<name>/ (macOS) — but not current user
if re.match(r"^/Users/[\w.-]+/", path):
raise PathGuardError(
path, "/Users/<name>/",
f"Use $HOME or os.path.expanduser('~') instead. "
f"Got: {path}"
)
# Check for hardcoded /home/<name>/ (Linux)
if re.match(r"^/home/[\w.-]+/", path):
raise PathGuardError(
path, "/home/<name>/",
f"Use $HOME or os.path.expanduser('~') instead. "
f"Got: {path}"
)
return path
def validate_tool_paths(paths: list) -> list:
"""
Validate multiple paths (e.g., from tool arguments).
Returns validated list. Raises PathGuardError on first violation.
"""
return [validate_path(p) for p in paths if isinstance(p, str)]
# ── File Scanning (Pre-commit / CI) ────────────────────────────────
def scan_file_for_violations(filepath: str) -> List[Tuple[int, str, str, str]]:
"""
Scan a Python file for hardcoded home-directory path patterns.
Returns list of (line_number, line_content, pattern_name, suggestion).
"""
violations = []
try:
with open(filepath) as f:
for lineno, line in enumerate(f, 1):
# Skip comments and noqa lines
stripped = line.strip()
if stripped.startswith("#"):
continue
if "noqa: hardcoded-path-ok" in line:
continue
for pattern, name in HARDCODED_PATH_PATTERNS:
if pattern.search(line):
# Special case: ~/ in expanduser/display context is OK
if name == "~/..." and SAFE_TILDE_CONTEXTS.search(line): # noqa: hardcoded-path-ok
continue
violations.append((lineno, line.rstrip(), name,
f"Use get_hermes_home(), os.environ['HOME'], or add # noqa: hardcoded-path-ok"))
except (IOError, UnicodeDecodeError):
pass
return violations
def scan_directory(root: str, extensions: tuple = (".py",)) -> List[Tuple[str, List]]:
"""
Scan a directory tree for hardcoded path violations.
Returns list of (filepath, violations) tuples.
"""
results = []
for dirpath, _, filenames in os.walk(root):
# Skip hidden dirs, __pycache__, venv, test dirs
skip_dirs = {"__pycache__", ".git", "venv", "node_modules", ".hermes"}
if any(s in dirpath for s in skip_dirs):
continue
for fname in filenames:
if not fname.endswith(extensions):
continue
# Skip test files (they may legitimately have paths)
if fname.startswith("test_") or "/tests/" in dirpath:
continue
fpath = os.path.join(dirpath, fname)
violations = scan_file_for_violations(fpath)
if violations:
results.append((fpath, violations))
return results

View File

@@ -167,12 +167,6 @@ TOOLSETS = {
"tools": ["memory"],
"includes": []
},
"rag": {
"description": "Graph-based knowledge retrieval over indexed skills and docs (LightRAG)",
"tools": ["lightrag_query", "lightrag_index"],
"includes": []
},
"session_search": {
"description": "Search and recall past conversations with summarization",