Compare commits

..

3 Commits

Author SHA1 Message Date
Alexander Whitestone
1f5067e94a Merge: bring in prior QA work on path guard (Refs #962)
All checks were successful
Lint / lint (pull_request) Successful in 15s
2026-04-22 00:25:50 -04:00
Alexander Whitestone
5d3e13ede2 test: add pre-commit path guard hook from burn/921 (Refs #962)
All checks were successful
Lint / lint (pull_request) Successful in 24s
Brings hooks/pre-commit-path-guard.py from burn/921-poka-yoke-hardcoded-paths
to complete QA verification of all guard layers.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 23:55:38 -04:00
Alexander Whitestone
9e00a59791 test: verify hardcoded-home path guard from burn/921 branch
All checks were successful
Lint / lint (pull_request) Successful in 29s
Cherry-picks tools/path_guard.py and tests/test_path_guard.py from
burn/921-poka-yoke-hardcoded-paths (commit 5dcb905). All 21 tests pass:

- hardcoded /Users/<name>/ paths are rejected at runtime
- hardcoded /home/<name>/ paths are rejected at runtime
- ~/.hermes/... via expanduser() passes (safe, expanded at runtime)
- valid relative and /tmp/ absolute paths pass
- static scanner catches violations and respects # noqa: hardcoded-path-ok
- comments are skipped by scanner
- directory scanner skips test files and __pycache__

Refs #962

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 22:26:54 -04:00
8 changed files with 363 additions and 526 deletions

View File

@@ -1,281 +0,0 @@
"""
Hallucination Metrics — Persistent logging and alerting for tool hallucinations.
Logs tool hallucination events to a JSONL file and provides aggregated statistics.
Integrates with the poka-yoke validation system.
Usage:
from agent.hallucination_metrics import log_hallucination_event, get_hallucination_stats
log_hallucination_event("invalid_tool", "unknown_tool", "suggested_correct_name")
stats = get_hallucination_stats()
"""
import json
import logging
import os
import time
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from threading import Lock
from typing import Any, Dict, List, Optional, Tuple
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
# Constants
METRICS_FILE_NAME = "hallucination_metrics.jsonl"
ALERT_THRESHOLD = 10 # Alert after this many consecutive failures for a tool
SESSION_WINDOW_HOURS = 24 # Consider events within this window as "session"
# In-memory cache for fast lookups
_cache: Dict[str, Any] = {"events": [], "last_flush": 0, "session_counts": defaultdict(int)}
_cache_lock = Lock()
def _get_metrics_path() -> Path:
"""Return the path to the hallucination metrics file."""
return get_hermes_home() / "metrics" / METRICS_FILE_NAME
def _ensure_metrics_dir():
"""Ensure the metrics directory exists."""
metrics_dir = _get_metrics_path().parent
metrics_dir.mkdir(parents=True, exist_ok=True)
def log_hallucination_event(
tool_name: str,
error_type: str = "unknown_tool",
suggested_name: Optional[str] = None,
validation_messages: Optional[List[str]] = None,
session_id: Optional[str] = None,
) -> Dict[str, Any]:
"""
Log a hallucination event to the metrics file.
Args:
tool_name: The hallucinated tool name
error_type: Type of error (unknown_tool, invalid_params, etc.)
suggested_name: Suggested correction if available
validation_messages: List of validation error messages
session_id: Optional session identifier for grouping
Returns:
The logged event dict with additional metadata
"""
event = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"tool_name": tool_name,
"error_type": error_type,
"suggested_name": suggested_name,
"validation_messages": validation_messages or [],
"session_id": session_id,
"unix_timestamp": time.time(),
}
# Write to file
_ensure_metrics_dir()
metrics_path = _get_metrics_path()
try:
with open(metrics_path, "a", encoding="utf-8") as f:
f.write(json.dumps(event, ensure_ascii=False) + "\n")
except Exception as e:
logger.warning(f"Failed to write hallucination event: {e}")
# Update in-memory cache
with _cache_lock:
_cache["events"].append(event)
_cache["session_counts"][tool_name] += 1
session_count = _cache["session_counts"][tool_name]
# Check alert threshold
if session_count >= ALERT_THRESHOLD:
logger.warning(
f"HALLUCINATION ALERT: Tool '{tool_name}' has failed {session_count} times "
f"in this session (threshold: {ALERT_THRESHOLD}). "
f"This may indicate a persistent hallucination pattern."
)
return event
def _load_events_from_file() -> List[Dict[str, Any]]:
"""Load all events from the metrics file."""
metrics_path = _get_metrics_path()
if not metrics_path.exists():
return []
events = []
try:
with open(metrics_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
try:
events.append(json.loads(line))
except json.JSONDecodeError:
continue
except Exception as e:
logger.warning(f"Failed to load hallucination events: {e}")
return events
def get_hallucination_stats(
hours: Optional[int] = None,
tool_name: Optional[str] = None,
) -> Dict[str, Any]:
"""
Get aggregated hallucination statistics.
Args:
hours: Only consider events from the last N hours (None = all time)
tool_name: Filter to specific tool name (None = all tools)
Returns:
Dict with aggregated statistics
"""
events = _load_events_from_file()
# Filter by time window
if hours is not None:
cutoff = time.time() - (hours * 3600)
events = [e for e in events if e.get("unix_timestamp", 0) >= cutoff]
# Filter by tool name
if tool_name is not None:
events = [e for e in events if e.get("tool_name") == tool_name]
# Aggregate by tool
tool_counts: Dict[str, Dict[str, Any]] = defaultdict(
lambda: {"count": 0, "suggested_names": [], "error_types": defaultdict(int)}
)
for event in events:
name = event.get("tool_name", "unknown")
tool_counts[name]["count"] += 1
if event.get("suggested_name"):
tool_counts[name]["suggested_names"].append(event["suggested_name"])
if event.get("error_type"):
tool_counts[name]["error_types"][event["error_type"]] += 1
# Find most common suggestions per tool
for name, data in tool_counts.items():
suggestions = data["suggested_names"]
if suggestions:
from collections import Counter
most_common = Counter(suggestions).most_common(1)[0]
data["most_common_suggestion"] = most_common[0]
data["suggestion_count"] = most_common[1]
del data["suggested_names"] # Remove raw list from output
# Calculate time-based stats
if events:
first_event = min(e.get("unix_timestamp", 0) for e in events)
last_event = max(e.get("unix_timestamp", 0) for e in events)
time_span_hours = (last_event - first_event) / 3600 if first_event != last_event else 0
else:
time_span_hours = 0
# Error type breakdown
all_error_types: Dict[str, int] = defaultdict(int)
for event in events:
et = event.get("error_type", "unknown")
all_error_types[et] += 1
return {
"total_events": len(events),
"unique_tools": len(tool_counts),
"time_span_hours": round(time_span_hours, 1),
"top_hallucinated_tools": sorted(
[{"tool": k, **v} for k, v in tool_counts.items()],
key=lambda x: -x["count"]
)[:20],
"error_type_breakdown": dict(all_error_types),
"alert_threshold": ALERT_THRESHOLD,
"session_window_hours": SESSION_WINDOW_HOURS,
}
def get_most_hallucinated_tools(n: int = 10) -> List[Tuple[str, int]]:
"""Get the top N most frequently hallucinated tool names."""
stats = get_hallucination_stats()
tools = stats.get("top_hallucinated_tools", [])
return [(t["tool"], t["count"]) for t in tools[:n]]
def clear_metrics(older_than_hours: Optional[int] = None) -> int:
"""
Clear hallucination metrics.
Args:
older_than_hours: Only clear events older than this many hours (None = clear all)
Returns:
Number of events removed
"""
metrics_path = _get_metrics_path()
if not metrics_path.exists():
return 0
if older_than_hours is None:
# Clear all
count = len(_load_events_from_file())
metrics_path.unlink(missing_ok=True)
with _cache_lock:
_cache["events"].clear()
_cache["session_counts"].clear()
return count
# Clear only old events
cutoff = time.time() - (older_than_hours * 3600)
events = _load_events_from_file()
keep = [e for e in events if e.get("unix_timestamp", 0) >= cutoff]
removed = len(events) - len(keep)
# Rewrite file
_ensure_metrics_dir()
with open(metrics_path, "w", encoding="utf-8") as f:
for event in keep:
f.write(json.dumps(event, ensure_ascii=False) + "\n")
return removed
def format_stats_for_display(stats: Dict[str, Any]) -> str:
"""Format statistics as a human-readable string."""
lines = [
"=== Hallucination Metrics ===",
"",
f"Total events: {stats['total_events']}",
f"Unique tools hallucinated: {stats['unique_tools']}",
f"Time span: {stats['time_span_hours']:.1f} hours",
"",
"Top Hallucinated Tools:",
"-" * 40,
]
for tool in stats.get("top_hallucinated_tools", [])[:10]:
lines.append(f" {tool['tool']:<30} {tool['count']:>5} events")
if "most_common_suggestion" in tool:
lines.append(f" → Suggested: {tool['most_common_suggestion']} ({tool['suggestion_count']}x)")
if stats.get("error_type_breakdown"):
lines.extend([
"",
"Error Types:",
"-" * 40,
])
for et, count in sorted(stats["error_type_breakdown"].items(), key=lambda x: -x[1]):
lines.append(f" {et:<30} {count:>5}")
lines.extend([
"",
f"Alert threshold: {stats['alert_threshold']} failures per session",
f"Session window: {stats['session_window_hours']} hours",
])
return "\n".join(lines)

View File

@@ -18,7 +18,6 @@ Usage:
hermes cron list # List cron jobs
hermes cron status # Check if cron scheduler is running
hermes doctor # Check configuration and dependencies
hermes hallucination-stats # Show tool hallucination statistics
hermes honcho setup # Configure Honcho AI memory integration
hermes honcho status # Show Honcho config and connection status
hermes honcho sessions # List directory → session name mappings
@@ -2805,17 +2804,6 @@ def cmd_doctor(args):
run_doctor(args)
def cmd_hallucination_stats(args):
"""Show tool hallucination statistics."""
from agent.hallucination_metrics import get_hallucination_stats, format_stats_for_display, clear_metrics
if getattr(args, 'clear', False):
removed = clear_metrics(older_than_hours=getattr(args, 'older_than', None))
print(f"Cleared {removed} hallucination events.")
return
stats = get_hallucination_stats(hours=getattr(args, 'hours', None))
print(format_stats_for_display(stats))
def cmd_dump(args):
"""Dump setup summary for support/debugging."""
from hermes_cli.dump import run_dump
@@ -5053,33 +5041,6 @@ For more help on a command:
)
doctor_parser.set_defaults(func=cmd_doctor)
# =========================================================================
# hallucination-stats command
# =========================================================================
hallucination_parser = subparsers.add_parser(
"hallucination-stats",
help="Show tool hallucination statistics",
description="View aggregated tool hallucination metrics from poka-yoke validation"
)
hallucination_parser.add_argument(
"--hours",
type=int,
default=None,
help="Only show events from the last N hours"
)
hallucination_parser.add_argument(
"--clear",
action="store_true",
help="Clear all hallucination metrics"
)
hallucination_parser.add_argument(
"--older-than",
type=int,
default=None,
help="When clearing, only remove events older than N hours"
)
hallucination_parser.set_defaults(func=cmd_hallucination_stats)
# =========================================================================
# dump command
# =========================================================================

View File

@@ -46,7 +46,6 @@ from hermes_cli.config import (
)
from gateway.status import get_running_pid, read_runtime_status
from agent.agent_card import get_agent_card_json
from agent.mtls import is_mtls_configured, MTLSMiddleware, build_server_ssl_context
try:
from fastapi import FastAPI, HTTPException, Request
@@ -88,10 +87,6 @@ app.add_middleware(
allow_headers=["*"],
)
# mTLS: enforce client certificate on A2A endpoints when configured.
# Activated by setting HERMES_MTLS_CERT, HERMES_MTLS_KEY, HERMES_MTLS_CA.
app.add_middleware(MTLSMiddleware)
# ---------------------------------------------------------------------------
# Endpoints that do NOT require the session token. Everything else under
# /api/ is gated by the auth middleware below. Keep this list minimal —
@@ -2110,20 +2105,6 @@ def start_server(
"authentication. Only use on trusted networks.", host,
)
# mTLS: when configured, pass SSL context to uvicorn so all connections
# are TLS with mandatory client certificate verification.
ssl_context = None
scheme = "http"
if is_mtls_configured():
try:
ssl_context = build_server_ssl_context()
scheme = "https"
_log.info(
"mTLS enabled — server requires client certificates (A2A auth)"
)
except Exception as exc:
_log.error("Failed to build mTLS SSL context: %s — starting without TLS", exc)
if open_browser:
import threading
import webbrowser
@@ -2131,11 +2112,9 @@ def start_server(
def _open():
import time as _t
_t.sleep(1.0)
webbrowser.open(f"{scheme}://{host}:{port}")
webbrowser.open(f"http://{host}:{port}")
threading.Thread(target=_open, daemon=True).start()
print(f" Hermes Web UI → {scheme}://{host}:{port}")
if ssl_context is not None:
print(" mTLS enabled — client certificate required for A2A endpoints")
uvicorn.run(app, host=host, port=port, log_level="warning", ssl=ssl_context)
print(f" Hermes Web UI → http://{host}:{port}")
uvicorn.run(app, host=host, port=port, log_level="warning")

View File

@@ -0,0 +1,68 @@
#!/usr/bin/env python3
"""
Pre-commit hook: Reject hardcoded home-directory paths.
Scans staged Python files for patterns like:
- /Users/<name>/...
- /home/<name>/...
- ~/... (in string literals outside expanduser context)
Escape hatch: add `# noqa: hardcoded-path-ok` to any legitimate line.
Install:
cp hooks/pre-commit-path-guard.py .git/hooks/pre-commit
chmod +x .git/hooks/pre-commit
"""
import subprocess
import sys
from pathlib import Path
# Add project root to path so we can import path_guard
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from tools.path_guard import scan_file_for_violations
def get_staged_files():
"""Get list of staged .py files."""
result = subprocess.run(
["git", "diff", "--cached", "--name-only", "--diff-filter=ACM"],
capture_output=True, text=True
)
return [f for f in result.stdout.strip().splitlines() if f.endswith(".py")]
def main():
files = get_staged_files()
if not files:
sys.exit(0)
all_violations = []
for filepath in files:
if not Path(filepath).exists():
continue
violations = scan_file_for_violations(filepath)
if violations:
all_violations.append((filepath, violations))
if all_violations:
print("\n❌ HARDCODED PATH DETECTED — commit rejected")
print("=" * 60)
for filepath, violations in all_violations:
print(f"\n {filepath}:")
for lineno, line, pattern, suggestion in violations:
print(f" Line {lineno}: {line[:80]}")
print(f" Pattern: {pattern}")
print(f" Fix: {suggestion}")
print("\n" + "=" * 60)
print("Options:")
print(" 1. Use get_hermes_home(), os.environ['HOME'], or relative paths")
print(" 2. Add # noqa: hardcoded-path-ok to the line for legitimate cases")
print("")
sys.exit(1)
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -1,171 +0,0 @@
"""Tests for agent/hallucination_metrics.py — #853."""
import json
import time
from pathlib import Path
import pytest
from agent.hallucination_metrics import (
log_hallucination_event,
get_hallucination_stats,
get_most_hallucinated_tools,
clear_metrics,
format_stats_for_display,
_get_metrics_path,
)
@pytest.fixture(autouse=True)
def isolated_metrics(monkeypatch, tmp_path):
"""Redirect metrics to a temp file for every test."""
metrics_dir = tmp_path / "test_hermes_home" / "metrics"
metrics_dir.mkdir(parents=True)
metrics_file = metrics_dir / "hallucination_metrics.jsonl"
# Patch the get_hermes_home function to return our temp path
def mock_get_hermes_home():
return tmp_path / "test_hermes_home"
monkeypatch.setattr(
"agent.hallucination_metrics.get_hermes_home",
mock_get_hermes_home,
)
# Also clear cache
from agent.hallucination_metrics import _cache, _cache_lock
with _cache_lock:
_cache["events"].clear()
_cache["session_counts"].clear()
yield
clear_metrics()
class TestLogEvent:
def test_log_event_returns_dict(self):
event = log_hallucination_event("fake_tool", "unknown_tool", "real_tool")
assert event["tool_name"] == "fake_tool"
assert event["error_type"] == "unknown_tool"
assert event["suggested_name"] == "real_tool"
assert "timestamp" in event
assert "unix_timestamp" in event
def test_log_event_persists_to_file(self):
log_hallucination_event("tool_a", "unknown_tool")
log_hallucination_event("tool_b", "invalid_params")
path = _get_metrics_path()
assert path.exists()
lines = path.read_text().strip().splitlines()
assert len(lines) == 2
data = [json.loads(line) for line in lines]
assert data[0]["tool_name"] == "tool_a"
assert data[1]["tool_name"] == "tool_b"
class TestGetStats:
def test_empty_stats(self):
stats = get_hallucination_stats()
assert stats["total_events"] == 0
assert stats["unique_tools"] == 0
def test_stats_by_tool(self):
log_hallucination_event("tool_x", "unknown_tool", "tool_y")
log_hallucination_event("tool_x", "unknown_tool", "tool_y")
log_hallucination_event("tool_z", "invalid_params")
stats = get_hallucination_stats()
assert stats["total_events"] == 3
assert stats["unique_tools"] == 2
top = stats["top_hallucinated_tools"]
assert len(top) == 2
assert top[0]["tool"] == "tool_x"
assert top[0]["count"] == 2
assert top[1]["tool"] == "tool_z"
assert top[1]["count"] == 1
def test_stats_hours_filter(self):
# Log old event by faking timestamp
old_event = {
"timestamp": "2026-01-01T00:00:00+00:00",
"tool_name": "old_tool",
"error_type": "unknown_tool",
"unix_timestamp": time.time() - 48 * 3600,
}
path = _get_metrics_path()
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
f.write(json.dumps(old_event) + "\n")
log_hallucination_event("new_tool", "unknown_tool")
stats = get_hallucination_stats(hours=24)
assert stats["total_events"] == 1
assert stats["top_hallucinated_tools"][0]["tool"] == "new_tool"
def test_error_type_breakdown(self):
log_hallucination_event("t1", "unknown_tool")
log_hallucination_event("t2", "invalid_params")
log_hallucination_event("t3", "unknown_tool")
stats = get_hallucination_stats()
breakdown = stats["error_type_breakdown"]
assert breakdown["unknown_tool"] == 2
assert breakdown["invalid_params"] == 1
class TestGetMostHallucinated:
def test_top_tools(self):
for _ in range(5):
log_hallucination_event("common_tool", "unknown_tool")
for _ in range(2):
log_hallucination_event("rare_tool", "unknown_tool")
tools = get_most_hallucinated_tools(n=2)
assert tools[0] == ("common_tool", 5)
assert tools[1] == ("rare_tool", 2)
class TestClearMetrics:
def test_clear_all(self):
log_hallucination_event("t1", "unknown_tool")
removed = clear_metrics()
assert removed == 1
assert _get_metrics_path().exists() is False
def test_clear_older_than(self):
path = _get_metrics_path()
path.parent.mkdir(parents=True, exist_ok=True)
old = {"tool_name": "old", "unix_timestamp": time.time() - 48 * 3600}
new = {"tool_name": "new", "unix_timestamp": time.time()}
with open(path, "w") as f:
f.write(json.dumps(old) + "\n")
f.write(json.dumps(new) + "\n")
removed = clear_metrics(older_than_hours=24)
assert removed == 1
remaining = get_hallucination_stats()
assert remaining["total_events"] == 1
class TestFormatDisplay:
def test_format_includes_headers(self):
log_hallucination_event("bad_tool", "unknown_tool", "good_tool")
stats = get_hallucination_stats()
text = format_stats_for_display(stats)
assert "Hallucination Metrics" in text
assert "bad_tool" in text
assert "Total events: 1" in text
class TestAlertThreshold:
def test_alert_after_threshold(self, monkeypatch, caplog):
monkeypatch.setattr("agent.hallucination_metrics.ALERT_THRESHOLD", 3)
for i in range(4):
log_hallucination_event("persistent_tool", "unknown_tool")
assert "HALLUCINATION ALERT" in caplog.text
assert "persistent_tool" in caplog.text

127
tests/test_path_guard.py Normal file
View File

@@ -0,0 +1,127 @@
"""Tests for tools/path_guard.py — poka-yoke hardcoded path detection."""
import os
import tempfile
from pathlib import Path
import pytest
from tools.path_guard import (
PathGuardError,
scan_directory,
scan_file_for_violations,
validate_path,
validate_tool_paths,
)
class TestValidatePath:
"""Runtime path validation."""
def test_valid_relative_path(self):
assert validate_path("tools/file_tools.py") == "tools/file_tools.py"
def test_valid_absolute_path(self):
assert validate_path("/tmp/test.txt") == "/tmp/test.txt"
def test_valid_hermes_home(self):
assert validate_path(os.path.expanduser("~/.hermes/config.yaml")) is not None
def test_reject_users_hardcoded(self):
with pytest.raises(PathGuardError, match="/Users/"):
validate_path("/Users/someone_else/.hermes/config")
def test_reject_home_hardcoded(self):
with pytest.raises(PathGuardError, match="/home/"):
validate_path("/home/user/.hermes/config")
def test_empty_path(self):
assert validate_path("") == ""
assert validate_path(None) is None
def test_non_string(self):
assert validate_path(42) == 42
class TestValidateToolPaths:
"""Batch path validation."""
def test_all_valid(self):
paths = ["tools/file.py", "/tmp/x.txt", "relative/path.py"]
assert validate_tool_paths(paths) == paths
def test_mixed_invalid(self):
with pytest.raises(PathGuardError):
validate_tool_paths(["tools/file.py", "/Users/someone_else/secret.txt"])
def test_skips_non_strings(self):
assert validate_tool_paths([None, 42, "valid.py"]) == ["valid.py"]
class TestScanFileForViolations:
"""Static file scanning."""
def test_clean_file(self, tmp_path):
f = tmp_path / "clean.py"
f.write_text("import os\nHOME = os.environ['HOME']\n")
assert scan_file_for_violations(str(f)) == []
def test_hardcoded_users(self, tmp_path):
f = tmp_path / "bad.py"
f.write_text("CONFIG = '/Users/apayne/.hermes/config.yaml'\n")
violations = scan_file_for_violations(str(f))
assert len(violations) == 1
assert "/Users/<name>/" in violations[0][2]
def test_hardcoded_home(self, tmp_path):
f = tmp_path / "bad2.py"
f.write_text("PATH = '/home/deploy/.hermes/state.db'\n")
violations = scan_file_for_violations(str(f))
assert len(violations) == 1
assert "/home/<name>/" in violations[0][2]
def test_tilde_in_expanduser_ok(self, tmp_path):
f = tmp_path / "ok.py"
f.write_text("p = os.path.expanduser('~/.hermes/config')\n")
assert scan_file_for_violations(str(f)) == []
def test_tilde_in_display_ok(self, tmp_path):
f = tmp_path / "ok2.py"
f.write_text('print("~/config saved")\n')
assert scan_file_for_violations(str(f)) == []
def test_noqa_escape(self, tmp_path):
f = tmp_path / "noqa.py"
f.write_text("PATH = '/Users/apayne/test' # noqa: hardcoded-path-ok\n")
assert scan_file_for_violations(str(f)) == []
def test_comments_skipped(self, tmp_path):
f = tmp_path / "comment.py"
f.write_text("# PATH = '/Users/apayne/test'\n")
assert scan_file_for_violations(str(f)) == []
class TestScanDirectory:
"""Directory scanning."""
def test_clean_tree(self, tmp_path):
(tmp_path / "clean.py").write_text("import os\n")
(tmp_path / "sub").mkdir()
(tmp_path / "sub" / "also_clean.py").write_text("x = 1\n")
assert scan_directory(str(tmp_path)) == []
def test_finds_violations(self, tmp_path):
(tmp_path / "bad.py").write_text("P = '/Users/x/.hermes'\n")
results = scan_directory(str(tmp_path))
assert len(results) == 1
assert results[0][0].endswith("bad.py")
def test_skips_tests(self, tmp_path):
(tmp_path / "test_something.py").write_text("P = '/Users/x/.hermes'\n")
assert scan_directory(str(tmp_path)) == []
def test_skips_pycache(self, tmp_path):
cache = tmp_path / "__pycache__"
cache.mkdir()
(cache / "cached.py").write_text("P = '/Users/x/.hermes'\n")
assert scan_directory(str(tmp_path)) == []

165
tools/path_guard.py Normal file
View File

@@ -0,0 +1,165 @@
"""
tools/path_guard.py — Poka-yoke: Prevent hardcoded home-directory paths.
Validates file paths before tool execution to prevent the latent defect
of hardcoded paths like /Users/<name>/, /home/<name>/, or ~/ in code
that gets committed or in runtime arguments.
Usage:
from tools.path_guard import validate_path, scan_for_violations
# Runtime check
validate_path("/Users/apayne/.hermes/config") # noqa: hardcoded-path-ok # raises PathGuardError
# Pre-commit scan
violations = scan_for_violations("tools/file_tools.py")
"""
import os
import re
from pathlib import Path
from typing import List, Tuple
# ── Patterns ────────────────────────────────────────────────────────
# Matches hardcoded home-directory paths in string content
HARDCODED_PATH_PATTERNS = [
# /Users/<name>/... (macOS)
(re.compile(r"""['"]/(Users)/[\w.-]+/"""), "/Users/<name>/"),
# /home/<name>/... (Linux)
(re.compile(r"""['"]/home/[\w.-]+/"""), "/home/<name>/"),
# Bare ~/... (unexpanded tilde in code — NOT in expanduser() calls)
(re.compile(r"""['"]~/[^'"]+['"]"""), "~/..."), # noqa: hardcoded-path-ok
# /root/... (Linux root home)
(re.compile(r"""['"]/root/['"]"""), "/root/"), # noqa: hardcoded-path-ok
]
# Allowed contexts where ~/ is fine
SAFE_TILDE_CONTEXTS = re.compile(
r"""expanduser|display_path|relpath|os\.path|Path\(|str\(.*home|"""
r"""noqa:\s*hardcoded-path-ok|""" # explicit escape hatch
r"""\bprint\(|f['"]|\.format\(|""" # display/formatting contexts
r"""["']~/["']\s*$""", # just displaying ~/ as prefix
re.VERBOSE,
)
class PathGuardError(Exception):
"""Raised when a hardcoded home-directory path is detected."""
def __init__(self, path: str, pattern_name: str, suggestion: str):
self.path = path
self.pattern_name = pattern_name
self.suggestion = suggestion
super().__init__(
f"Hardcoded path detected: {path} matches {pattern_name}. "
f"Suggestion: {suggestion}. "
f"Use get_hermes_home(), os.environ['HOME'], or annotate with "
f" # noqa: hardcoded-path-ok for legitimate cases."
)
# ── Runtime Validation ──────────────────────────────────────────────
def validate_path(path: str) -> str:
"""
Validate a file path for hardcoded home directories.
Returns the path if valid, raises PathGuardError if not.
This is meant to be called in tool wrappers (write_file, execute_code)
before executing operations with user-supplied paths.
Note: At runtime, paths from os.path.expanduser() will resolve to
/Users/<name>/... — this is expected and allowed. The guard catches
paths that were LITERALLY hardcoded in source code or tool arguments
that look like they came from a different machine (e.g., a path
containing a different username than the current user).
"""
if not path or not isinstance(path, str):
return path
# At runtime, expanded paths matching current HOME are fine
home = os.environ.get("HOME", "")
if home and path.startswith(home):
return path
# Check for hardcoded /Users/<name>/ (macOS) — but not current user
if re.match(r"^/Users/[\w.-]+/", path):
raise PathGuardError(
path, "/Users/<name>/",
f"Use $HOME or os.path.expanduser('~') instead. "
f"Got: {path}"
)
# Check for hardcoded /home/<name>/ (Linux)
if re.match(r"^/home/[\w.-]+/", path):
raise PathGuardError(
path, "/home/<name>/",
f"Use $HOME or os.path.expanduser('~') instead. "
f"Got: {path}"
)
return path
def validate_tool_paths(paths: list) -> list:
"""
Validate multiple paths (e.g., from tool arguments).
Returns validated list. Raises PathGuardError on first violation.
"""
return [validate_path(p) for p in paths if isinstance(p, str)]
# ── File Scanning (Pre-commit / CI) ────────────────────────────────
def scan_file_for_violations(filepath: str) -> List[Tuple[int, str, str, str]]:
"""
Scan a Python file for hardcoded home-directory path patterns.
Returns list of (line_number, line_content, pattern_name, suggestion).
"""
violations = []
try:
with open(filepath) as f:
for lineno, line in enumerate(f, 1):
# Skip comments and noqa lines
stripped = line.strip()
if stripped.startswith("#"):
continue
if "noqa: hardcoded-path-ok" in line:
continue
for pattern, name in HARDCODED_PATH_PATTERNS:
if pattern.search(line):
# Special case: ~/ in expanduser/display context is OK
if name == "~/..." and SAFE_TILDE_CONTEXTS.search(line): # noqa: hardcoded-path-ok
continue
violations.append((lineno, line.rstrip(), name,
f"Use get_hermes_home(), os.environ['HOME'], or add # noqa: hardcoded-path-ok"))
except (IOError, UnicodeDecodeError):
pass
return violations
def scan_directory(root: str, extensions: tuple = (".py",)) -> List[Tuple[str, List]]:
"""
Scan a directory tree for hardcoded path violations.
Returns list of (filepath, violations) tuples.
"""
results = []
for dirpath, _, filenames in os.walk(root):
# Skip hidden dirs, __pycache__, venv, test dirs
skip_dirs = {"__pycache__", ".git", "venv", "node_modules", ".hermes"}
if any(s in dirpath for s in skip_dirs):
continue
for fname in filenames:
if not fname.endswith(extensions):
continue
# Skip test files (they may legitimately have paths)
if fname.startswith("test_") or "/tests/" in dirpath:
continue
fpath = os.path.join(dirpath, fname)
violations = scan_file_for_violations(fpath)
if violations:
results.append((fpath, violations))
return results

View File

@@ -204,17 +204,6 @@ class ToolCallValidator:
self.consecutive_failures[tool_name] = self.consecutive_failures.get(tool_name, 0) + 1
count = self.consecutive_failures[tool_name]
# Log to persistent metrics
try:
from agent.hallucination_metrics import log_hallucination_event
log_hallucination_event(
tool_name=tool_name,
error_type="unknown_tool",
suggested_name=None,
)
except Exception:
pass # Best-effort metrics logging
if count >= self.failure_threshold:
logger.warning(
f"Poka-yoke circuit breaker triggered for '{tool_name}': "