Compare commits
1 Commits
claude/iss
...
fix/982
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a61761d321 |
@@ -50,6 +50,78 @@ def sanitize_context(text: str) -> str:
|
||||
return _FENCE_TAG_RE.sub('', text)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Prefetch filtering helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Meta-instruction debris that memory providers sometimes echo back.
|
||||
# These are prompts/instructions, not user-generated content.
|
||||
_META_INSTRUCTION_PATTERNS = [
|
||||
re.compile(r"^\s*[\-\*]?\s*>?\s*Focus on:\s*", re.IGNORECASE),
|
||||
re.compile(r"^\s*[\-\*]?\s*>?\s*Note:\s*", re.IGNORECASE),
|
||||
re.compile(r"^\s*[\-\*]?\s*>?\s*System\s+(note|prompt|instruction):", re.IGNORECASE),
|
||||
re.compile(r"^\s*[\-\*]?\s*>?\s*You are\s+", re.IGNORECASE),
|
||||
re.compile(r"^\s*[\-\*]?\s*>?\s*Please\s+(provide|respond|answer|write)", re.IGNORECASE),
|
||||
re.compile(r"^\s*[\-\*]?\s*>?\s*Do not\s+", re.IGNORECASE),
|
||||
re.compile(r"^\s*[\-\*]?\s*>?\s*Always\s+", re.IGNORECASE),
|
||||
re.compile(r"^\s*[\-\*]?\s*>?\s*Consider\s+(the following|these|this)\b", re.IGNORECASE),
|
||||
re.compile(r"^\s*[\-\*]?\s*>?\s*Here\s+(is|are)\s+(some|the|a few)\b", re.IGNORECASE),
|
||||
]
|
||||
|
||||
|
||||
def _is_meta_instruction_line(line: str) -> bool:
|
||||
"""Return True if the line looks like a prompt/template instruction, not memory content."""
|
||||
for pat in _META_INSTRUCTION_PATTERNS:
|
||||
if pat.search(line):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _is_low_signal_line(line: str) -> bool:
|
||||
"""Return True for very short or content-free lines."""
|
||||
stripped = line.strip()
|
||||
# Empty or just punctuation/list marker
|
||||
if not stripped or stripped in {"-", "*", ">", "•", "—", "--"}:
|
||||
return True
|
||||
# Too short to be meaningful (< 15 chars after stripping markers)
|
||||
cleaned = re.sub(r"^[\-\*•>\s]+", "", stripped)
|
||||
if len(cleaned) < 15:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _filter_prefetch_lines(text: str) -> str:
|
||||
"""Filter and deduplicate prefetch result lines.
|
||||
|
||||
Removes:
|
||||
- exact duplicate lines
|
||||
- meta-instruction debris (prompts, templates)
|
||||
- very short / content-free lines
|
||||
|
||||
Returns cleaned text, preserving original line grouping.
|
||||
"""
|
||||
if not text or not text.strip():
|
||||
return ""
|
||||
|
||||
seen: set = set()
|
||||
kept: list = []
|
||||
for line in text.splitlines(keepends=False):
|
||||
stripped = line.strip()
|
||||
# Deduplicate exact lines
|
||||
if stripped in seen:
|
||||
continue
|
||||
# Skip meta-instructions
|
||||
if _is_meta_instruction_line(line):
|
||||
continue
|
||||
# Skip low-signal lines
|
||||
if _is_low_signal_line(line):
|
||||
continue
|
||||
seen.add(stripped)
|
||||
kept.append(line)
|
||||
|
||||
return "\n".join(kept)
|
||||
|
||||
|
||||
def build_memory_context_block(raw_context: str) -> str:
|
||||
"""Wrap prefetched memory in a fenced block with system note.
|
||||
|
||||
@@ -180,7 +252,14 @@ class MemoryManager:
|
||||
"Memory provider '%s' prefetch failed (non-fatal): %s",
|
||||
provider.name, e,
|
||||
)
|
||||
return "\n\n".join(parts)
|
||||
raw = "\n\n".join(parts)
|
||||
if not raw:
|
||||
return ""
|
||||
# Apply line-level filtering: dedupe, strip meta-instructions,
|
||||
# remove very short fragments. This prevents noisy providers
|
||||
# (e.g. MemPalace transcript recall) from bloating context.
|
||||
filtered = _filter_prefetch_lines(raw)
|
||||
return filtered
|
||||
|
||||
def queue_prefetch_all(self, query: str, *, session_id: str = "") -> None:
|
||||
"""Queue background prefetch on all providers for the next turn."""
|
||||
|
||||
@@ -46,6 +46,7 @@ from hermes_cli.config import (
|
||||
)
|
||||
from gateway.status import get_running_pid, read_runtime_status
|
||||
from agent.agent_card import get_agent_card_json
|
||||
from agent.mtls import is_mtls_configured, MTLSMiddleware, build_server_ssl_context
|
||||
|
||||
try:
|
||||
from fastapi import FastAPI, HTTPException, Request
|
||||
@@ -87,6 +88,10 @@ app.add_middleware(
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
# mTLS: enforce client certificate on A2A endpoints when configured.
|
||||
# Activated by setting HERMES_MTLS_CERT, HERMES_MTLS_KEY, HERMES_MTLS_CA.
|
||||
app.add_middleware(MTLSMiddleware)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Endpoints that do NOT require the session token. Everything else under
|
||||
# /api/ is gated by the auth middleware below. Keep this list minimal —
|
||||
@@ -2105,6 +2110,20 @@ def start_server(
|
||||
"authentication. Only use on trusted networks.", host,
|
||||
)
|
||||
|
||||
# mTLS: when configured, pass SSL context to uvicorn so all connections
|
||||
# are TLS with mandatory client certificate verification.
|
||||
ssl_context = None
|
||||
scheme = "http"
|
||||
if is_mtls_configured():
|
||||
try:
|
||||
ssl_context = build_server_ssl_context()
|
||||
scheme = "https"
|
||||
_log.info(
|
||||
"mTLS enabled — server requires client certificates (A2A auth)"
|
||||
)
|
||||
except Exception as exc:
|
||||
_log.error("Failed to build mTLS SSL context: %s — starting without TLS", exc)
|
||||
|
||||
if open_browser:
|
||||
import threading
|
||||
import webbrowser
|
||||
@@ -2112,9 +2131,11 @@ def start_server(
|
||||
def _open():
|
||||
import time as _t
|
||||
_t.sleep(1.0)
|
||||
webbrowser.open(f"http://{host}:{port}")
|
||||
webbrowser.open(f"{scheme}://{host}:{port}")
|
||||
|
||||
threading.Thread(target=_open, daemon=True).start()
|
||||
|
||||
print(f" Hermes Web UI → http://{host}:{port}")
|
||||
uvicorn.run(app, host=host, port=port, log_level="warning")
|
||||
print(f" Hermes Web UI → {scheme}://{host}:{port}")
|
||||
if ssl_context is not None:
|
||||
print(" mTLS enabled — client certificate required for A2A endpoints")
|
||||
uvicorn.run(app, host=host, port=port, log_level="warning", ssl=ssl_context)
|
||||
|
||||
@@ -1,68 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Pre-commit hook: Reject hardcoded home-directory paths.
|
||||
|
||||
Scans staged Python files for patterns like:
|
||||
- /Users/<name>/...
|
||||
- /home/<name>/...
|
||||
- ~/... (in string literals outside expanduser context)
|
||||
|
||||
Escape hatch: add `# noqa: hardcoded-path-ok` to any legitimate line.
|
||||
|
||||
Install:
|
||||
cp hooks/pre-commit-path-guard.py .git/hooks/pre-commit
|
||||
chmod +x .git/hooks/pre-commit
|
||||
"""
|
||||
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Add project root to path so we can import path_guard
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
from tools.path_guard import scan_file_for_violations
|
||||
|
||||
|
||||
def get_staged_files():
|
||||
"""Get list of staged .py files."""
|
||||
result = subprocess.run(
|
||||
["git", "diff", "--cached", "--name-only", "--diff-filter=ACM"],
|
||||
capture_output=True, text=True
|
||||
)
|
||||
return [f for f in result.stdout.strip().splitlines() if f.endswith(".py")]
|
||||
|
||||
|
||||
def main():
|
||||
files = get_staged_files()
|
||||
if not files:
|
||||
sys.exit(0)
|
||||
|
||||
all_violations = []
|
||||
for filepath in files:
|
||||
if not Path(filepath).exists():
|
||||
continue
|
||||
violations = scan_file_for_violations(filepath)
|
||||
if violations:
|
||||
all_violations.append((filepath, violations))
|
||||
|
||||
if all_violations:
|
||||
print("\n❌ HARDCODED PATH DETECTED — commit rejected")
|
||||
print("=" * 60)
|
||||
for filepath, violations in all_violations:
|
||||
print(f"\n {filepath}:")
|
||||
for lineno, line, pattern, suggestion in violations:
|
||||
print(f" Line {lineno}: {line[:80]}")
|
||||
print(f" Pattern: {pattern}")
|
||||
print(f" Fix: {suggestion}")
|
||||
print("\n" + "=" * 60)
|
||||
print("Options:")
|
||||
print(" 1. Use get_hermes_home(), os.environ['HOME'], or relative paths")
|
||||
print(" 2. Add # noqa: hardcoded-path-ok to the line for legitimate cases")
|
||||
print("")
|
||||
sys.exit(1)
|
||||
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -198,14 +198,14 @@ class TestMemoryManager:
|
||||
def test_prefetch_skips_empty(self):
|
||||
mgr = MemoryManager()
|
||||
p1 = FakeMemoryProvider("builtin")
|
||||
p1._prefetch_result = "Has memories"
|
||||
p1._prefetch_result = "This provider has meaningful memories with enough length"
|
||||
p2 = FakeMemoryProvider("external")
|
||||
p2._prefetch_result = ""
|
||||
mgr.add_provider(p1)
|
||||
mgr.add_provider(p2)
|
||||
|
||||
result = mgr.prefetch_all("query")
|
||||
assert result == "Has memories"
|
||||
assert result == "This provider has meaningful memories with enough length"
|
||||
|
||||
def test_queue_prefetch_all(self):
|
||||
mgr = MemoryManager()
|
||||
@@ -695,3 +695,92 @@ class TestMemoryContextFencing:
|
||||
fence_end = combined.index("</memory-context>")
|
||||
assert "Alice" in combined[fence_start:fence_end]
|
||||
assert combined.index("weather") < fence_start
|
||||
|
||||
|
||||
class TestPrefetchFiltering:
|
||||
"""Tests for _filter_prefetch_lines and related helpers."""
|
||||
|
||||
def test_deduplicates_exact_lines(self):
|
||||
from agent.memory_manager import _filter_prefetch_lines
|
||||
raw = "- This is line one with enough characters\n- This is line two with enough characters\n- This is line one with enough characters\n- This is line three with enough characters"
|
||||
result = _filter_prefetch_lines(raw)
|
||||
lines = [l for l in result.splitlines() if l.strip()]
|
||||
assert len(lines) == 3
|
||||
assert "- This is line one with enough characters" in result
|
||||
assert "- This is line two with enough characters" in result
|
||||
assert "- This is line three with enough characters" in result
|
||||
|
||||
def test_removes_meta_instruction_debris(self):
|
||||
from agent.memory_manager import _filter_prefetch_lines
|
||||
raw = (
|
||||
"## Fleet Memories\n"
|
||||
"- > Focus on: was a non-trivial approach used\n"
|
||||
"- > Focus on: was a non-trivial approach used\n"
|
||||
"- Actual memory content about fleet ops\n"
|
||||
"- Note: this is just a note\n"
|
||||
)
|
||||
result = _filter_prefetch_lines(raw)
|
||||
assert "Focus on" not in result
|
||||
assert "Note:" not in result
|
||||
assert "Actual memory content about fleet ops" in result
|
||||
assert "Fleet Memories" in result
|
||||
|
||||
def test_removes_low_signal_short_lines(self):
|
||||
from agent.memory_manager import _filter_prefetch_lines
|
||||
raw = (
|
||||
"- \n"
|
||||
"- x\n"
|
||||
"- This is a meaningful memory entry with enough length\n"
|
||||
)
|
||||
result = _filter_prefetch_lines(raw)
|
||||
assert "- x" not in result
|
||||
assert "meaningful memory entry" in result
|
||||
|
||||
def test_preserves_structured_facts(self):
|
||||
from agent.memory_manager import _filter_prefetch_lines
|
||||
raw = (
|
||||
"## Local Facts (Hologram)\n"
|
||||
"- ALEXANDER: Prefers Gitea for reports and deliverables.\n"
|
||||
"- Telegram home channel is Timmy Time.\n"
|
||||
)
|
||||
result = _filter_prefetch_lines(raw)
|
||||
assert "ALEXANDER" in result
|
||||
assert "Gitea" in result
|
||||
assert "Telegram" in result
|
||||
|
||||
def test_is_meta_instruction_line(self):
|
||||
from agent.memory_manager import _is_meta_instruction_line
|
||||
assert _is_meta_instruction_line("- > Focus on: something") is True
|
||||
assert _is_meta_instruction_line("- Focus on: something") is True
|
||||
assert _is_meta_instruction_line("* Focus on: something") is True
|
||||
assert _is_meta_instruction_line("- Actual user memory content") is False
|
||||
assert _is_meta_instruction_line("ALEXANDER: Prefers Gitea") is False
|
||||
|
||||
def test_is_low_signal_line(self):
|
||||
from agent.memory_manager import _is_low_signal_line
|
||||
assert _is_low_signal_line("- ") is True
|
||||
assert _is_low_signal_line("*") is True
|
||||
assert _is_low_signal_line("- x") is True
|
||||
assert _is_low_signal_line("- Short line") is True
|
||||
assert _is_low_signal_line("- This is a long meaningful memory entry") is False
|
||||
|
||||
def test_prefetch_all_applies_filtering(self):
|
||||
from agent.memory_manager import MemoryManager
|
||||
mgr = MemoryManager()
|
||||
fake = FakeMemoryProvider(name="test")
|
||||
fake._prefetch_result = (
|
||||
"- > Focus on: was a non-trivial approach\n"
|
||||
"- > Focus on: was a non-trivial approach\n"
|
||||
"- Real memory fact\n"
|
||||
)
|
||||
mgr.add_provider(fake)
|
||||
result = mgr.prefetch_all("query")
|
||||
assert "Focus on" not in result
|
||||
assert "Real memory fact" in result
|
||||
assert result.count("Real memory fact") == 1
|
||||
|
||||
def test_empty_prefetch_returns_empty(self):
|
||||
from agent.memory_manager import _filter_prefetch_lines
|
||||
assert _filter_prefetch_lines("") == ""
|
||||
assert _filter_prefetch_lines(" ") == ""
|
||||
assert _filter_prefetch_lines("\n\n") == ""
|
||||
|
||||
@@ -1,127 +0,0 @@
|
||||
"""Tests for tools/path_guard.py — poka-yoke hardcoded path detection."""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from tools.path_guard import (
|
||||
PathGuardError,
|
||||
scan_directory,
|
||||
scan_file_for_violations,
|
||||
validate_path,
|
||||
validate_tool_paths,
|
||||
)
|
||||
|
||||
|
||||
class TestValidatePath:
|
||||
"""Runtime path validation."""
|
||||
|
||||
def test_valid_relative_path(self):
|
||||
assert validate_path("tools/file_tools.py") == "tools/file_tools.py"
|
||||
|
||||
def test_valid_absolute_path(self):
|
||||
assert validate_path("/tmp/test.txt") == "/tmp/test.txt"
|
||||
|
||||
def test_valid_hermes_home(self):
|
||||
assert validate_path(os.path.expanduser("~/.hermes/config.yaml")) is not None
|
||||
|
||||
def test_reject_users_hardcoded(self):
|
||||
with pytest.raises(PathGuardError, match="/Users/"):
|
||||
validate_path("/Users/someone_else/.hermes/config")
|
||||
|
||||
def test_reject_home_hardcoded(self):
|
||||
with pytest.raises(PathGuardError, match="/home/"):
|
||||
validate_path("/home/user/.hermes/config")
|
||||
|
||||
def test_empty_path(self):
|
||||
assert validate_path("") == ""
|
||||
assert validate_path(None) is None
|
||||
|
||||
def test_non_string(self):
|
||||
assert validate_path(42) == 42
|
||||
|
||||
|
||||
class TestValidateToolPaths:
|
||||
"""Batch path validation."""
|
||||
|
||||
def test_all_valid(self):
|
||||
paths = ["tools/file.py", "/tmp/x.txt", "relative/path.py"]
|
||||
assert validate_tool_paths(paths) == paths
|
||||
|
||||
def test_mixed_invalid(self):
|
||||
with pytest.raises(PathGuardError):
|
||||
validate_tool_paths(["tools/file.py", "/Users/someone_else/secret.txt"])
|
||||
|
||||
def test_skips_non_strings(self):
|
||||
assert validate_tool_paths([None, 42, "valid.py"]) == ["valid.py"]
|
||||
|
||||
|
||||
class TestScanFileForViolations:
|
||||
"""Static file scanning."""
|
||||
|
||||
def test_clean_file(self, tmp_path):
|
||||
f = tmp_path / "clean.py"
|
||||
f.write_text("import os\nHOME = os.environ['HOME']\n")
|
||||
assert scan_file_for_violations(str(f)) == []
|
||||
|
||||
def test_hardcoded_users(self, tmp_path):
|
||||
f = tmp_path / "bad.py"
|
||||
f.write_text("CONFIG = '/Users/apayne/.hermes/config.yaml'\n")
|
||||
violations = scan_file_for_violations(str(f))
|
||||
assert len(violations) == 1
|
||||
assert "/Users/<name>/" in violations[0][2]
|
||||
|
||||
def test_hardcoded_home(self, tmp_path):
|
||||
f = tmp_path / "bad2.py"
|
||||
f.write_text("PATH = '/home/deploy/.hermes/state.db'\n")
|
||||
violations = scan_file_for_violations(str(f))
|
||||
assert len(violations) == 1
|
||||
assert "/home/<name>/" in violations[0][2]
|
||||
|
||||
def test_tilde_in_expanduser_ok(self, tmp_path):
|
||||
f = tmp_path / "ok.py"
|
||||
f.write_text("p = os.path.expanduser('~/.hermes/config')\n")
|
||||
assert scan_file_for_violations(str(f)) == []
|
||||
|
||||
def test_tilde_in_display_ok(self, tmp_path):
|
||||
f = tmp_path / "ok2.py"
|
||||
f.write_text('print("~/config saved")\n')
|
||||
assert scan_file_for_violations(str(f)) == []
|
||||
|
||||
def test_noqa_escape(self, tmp_path):
|
||||
f = tmp_path / "noqa.py"
|
||||
f.write_text("PATH = '/Users/apayne/test' # noqa: hardcoded-path-ok\n")
|
||||
assert scan_file_for_violations(str(f)) == []
|
||||
|
||||
def test_comments_skipped(self, tmp_path):
|
||||
f = tmp_path / "comment.py"
|
||||
f.write_text("# PATH = '/Users/apayne/test'\n")
|
||||
assert scan_file_for_violations(str(f)) == []
|
||||
|
||||
|
||||
class TestScanDirectory:
|
||||
"""Directory scanning."""
|
||||
|
||||
def test_clean_tree(self, tmp_path):
|
||||
(tmp_path / "clean.py").write_text("import os\n")
|
||||
(tmp_path / "sub").mkdir()
|
||||
(tmp_path / "sub" / "also_clean.py").write_text("x = 1\n")
|
||||
assert scan_directory(str(tmp_path)) == []
|
||||
|
||||
def test_finds_violations(self, tmp_path):
|
||||
(tmp_path / "bad.py").write_text("P = '/Users/x/.hermes'\n")
|
||||
results = scan_directory(str(tmp_path))
|
||||
assert len(results) == 1
|
||||
assert results[0][0].endswith("bad.py")
|
||||
|
||||
def test_skips_tests(self, tmp_path):
|
||||
(tmp_path / "test_something.py").write_text("P = '/Users/x/.hermes'\n")
|
||||
assert scan_directory(str(tmp_path)) == []
|
||||
|
||||
def test_skips_pycache(self, tmp_path):
|
||||
cache = tmp_path / "__pycache__"
|
||||
cache.mkdir()
|
||||
(cache / "cached.py").write_text("P = '/Users/x/.hermes'\n")
|
||||
assert scan_directory(str(tmp_path)) == []
|
||||
@@ -1,165 +0,0 @@
|
||||
"""
|
||||
tools/path_guard.py — Poka-yoke: Prevent hardcoded home-directory paths.
|
||||
|
||||
Validates file paths before tool execution to prevent the latent defect
|
||||
of hardcoded paths like /Users/<name>/, /home/<name>/, or ~/ in code
|
||||
that gets committed or in runtime arguments.
|
||||
|
||||
Usage:
|
||||
from tools.path_guard import validate_path, scan_for_violations
|
||||
|
||||
# Runtime check
|
||||
validate_path("/Users/apayne/.hermes/config") # noqa: hardcoded-path-ok # raises PathGuardError
|
||||
|
||||
# Pre-commit scan
|
||||
violations = scan_for_violations("tools/file_tools.py")
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple
|
||||
|
||||
# ── Patterns ────────────────────────────────────────────────────────
|
||||
|
||||
# Matches hardcoded home-directory paths in string content
|
||||
HARDCODED_PATH_PATTERNS = [
|
||||
# /Users/<name>/... (macOS)
|
||||
(re.compile(r"""['"]/(Users)/[\w.-]+/"""), "/Users/<name>/"),
|
||||
# /home/<name>/... (Linux)
|
||||
(re.compile(r"""['"]/home/[\w.-]+/"""), "/home/<name>/"),
|
||||
# Bare ~/... (unexpanded tilde in code — NOT in expanduser() calls)
|
||||
(re.compile(r"""['"]~/[^'"]+['"]"""), "~/..."), # noqa: hardcoded-path-ok
|
||||
# /root/... (Linux root home)
|
||||
(re.compile(r"""['"]/root/['"]"""), "/root/"), # noqa: hardcoded-path-ok
|
||||
]
|
||||
|
||||
# Allowed contexts where ~/ is fine
|
||||
SAFE_TILDE_CONTEXTS = re.compile(
|
||||
r"""expanduser|display_path|relpath|os\.path|Path\(|str\(.*home|"""
|
||||
r"""noqa:\s*hardcoded-path-ok|""" # explicit escape hatch
|
||||
r"""\bprint\(|f['"]|\.format\(|""" # display/formatting contexts
|
||||
r"""["']~/["']\s*$""", # just displaying ~/ as prefix
|
||||
re.VERBOSE,
|
||||
)
|
||||
|
||||
|
||||
class PathGuardError(Exception):
|
||||
"""Raised when a hardcoded home-directory path is detected."""
|
||||
|
||||
def __init__(self, path: str, pattern_name: str, suggestion: str):
|
||||
self.path = path
|
||||
self.pattern_name = pattern_name
|
||||
self.suggestion = suggestion
|
||||
super().__init__(
|
||||
f"Hardcoded path detected: {path} matches {pattern_name}. "
|
||||
f"Suggestion: {suggestion}. "
|
||||
f"Use get_hermes_home(), os.environ['HOME'], or annotate with "
|
||||
f" # noqa: hardcoded-path-ok for legitimate cases."
|
||||
)
|
||||
|
||||
|
||||
# ── Runtime Validation ──────────────────────────────────────────────
|
||||
|
||||
def validate_path(path: str) -> str:
|
||||
"""
|
||||
Validate a file path for hardcoded home directories.
|
||||
Returns the path if valid, raises PathGuardError if not.
|
||||
|
||||
This is meant to be called in tool wrappers (write_file, execute_code)
|
||||
before executing operations with user-supplied paths.
|
||||
|
||||
Note: At runtime, paths from os.path.expanduser() will resolve to
|
||||
/Users/<name>/... — this is expected and allowed. The guard catches
|
||||
paths that were LITERALLY hardcoded in source code or tool arguments
|
||||
that look like they came from a different machine (e.g., a path
|
||||
containing a different username than the current user).
|
||||
"""
|
||||
if not path or not isinstance(path, str):
|
||||
return path
|
||||
|
||||
# At runtime, expanded paths matching current HOME are fine
|
||||
home = os.environ.get("HOME", "")
|
||||
if home and path.startswith(home):
|
||||
return path
|
||||
|
||||
# Check for hardcoded /Users/<name>/ (macOS) — but not current user
|
||||
if re.match(r"^/Users/[\w.-]+/", path):
|
||||
raise PathGuardError(
|
||||
path, "/Users/<name>/",
|
||||
f"Use $HOME or os.path.expanduser('~') instead. "
|
||||
f"Got: {path}"
|
||||
)
|
||||
|
||||
# Check for hardcoded /home/<name>/ (Linux)
|
||||
if re.match(r"^/home/[\w.-]+/", path):
|
||||
raise PathGuardError(
|
||||
path, "/home/<name>/",
|
||||
f"Use $HOME or os.path.expanduser('~') instead. "
|
||||
f"Got: {path}"
|
||||
)
|
||||
|
||||
return path
|
||||
|
||||
|
||||
def validate_tool_paths(paths: list) -> list:
|
||||
"""
|
||||
Validate multiple paths (e.g., from tool arguments).
|
||||
Returns validated list. Raises PathGuardError on first violation.
|
||||
"""
|
||||
return [validate_path(p) for p in paths if isinstance(p, str)]
|
||||
|
||||
|
||||
# ── File Scanning (Pre-commit / CI) ────────────────────────────────
|
||||
|
||||
def scan_file_for_violations(filepath: str) -> List[Tuple[int, str, str, str]]:
|
||||
"""
|
||||
Scan a Python file for hardcoded home-directory path patterns.
|
||||
Returns list of (line_number, line_content, pattern_name, suggestion).
|
||||
"""
|
||||
violations = []
|
||||
try:
|
||||
with open(filepath) as f:
|
||||
for lineno, line in enumerate(f, 1):
|
||||
# Skip comments and noqa lines
|
||||
stripped = line.strip()
|
||||
if stripped.startswith("#"):
|
||||
continue
|
||||
if "noqa: hardcoded-path-ok" in line:
|
||||
continue
|
||||
|
||||
for pattern, name in HARDCODED_PATH_PATTERNS:
|
||||
if pattern.search(line):
|
||||
# Special case: ~/ in expanduser/display context is OK
|
||||
if name == "~/..." and SAFE_TILDE_CONTEXTS.search(line): # noqa: hardcoded-path-ok
|
||||
continue
|
||||
violations.append((lineno, line.rstrip(), name,
|
||||
f"Use get_hermes_home(), os.environ['HOME'], or add # noqa: hardcoded-path-ok"))
|
||||
except (IOError, UnicodeDecodeError):
|
||||
pass
|
||||
return violations
|
||||
|
||||
|
||||
def scan_directory(root: str, extensions: tuple = (".py",)) -> List[Tuple[str, List]]:
|
||||
"""
|
||||
Scan a directory tree for hardcoded path violations.
|
||||
Returns list of (filepath, violations) tuples.
|
||||
"""
|
||||
results = []
|
||||
for dirpath, _, filenames in os.walk(root):
|
||||
# Skip hidden dirs, __pycache__, venv, test dirs
|
||||
skip_dirs = {"__pycache__", ".git", "venv", "node_modules", ".hermes"}
|
||||
if any(s in dirpath for s in skip_dirs):
|
||||
continue
|
||||
|
||||
for fname in filenames:
|
||||
if not fname.endswith(extensions):
|
||||
continue
|
||||
# Skip test files (they may legitimately have paths)
|
||||
if fname.startswith("test_") or "/tests/" in dirpath:
|
||||
continue
|
||||
fpath = os.path.join(dirpath, fname)
|
||||
violations = scan_file_for_violations(fpath)
|
||||
if violations:
|
||||
results.append((fpath, violations))
|
||||
return results
|
||||
Reference in New Issue
Block a user