Compare commits

..

3 Commits

Author SHA1 Message Date
4cdda8701d feat: integrate hardcoded path guard into tool dispatch
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Contributor Attribution Check / check-attribution (pull_request) Failing after 32s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 32s
Tests / e2e (pull_request) Successful in 2m56s
Tests / test (pull_request) Failing after 1h1m7s
2026-04-21 00:31:01 +00:00
a80d30b342 feat: add pre-commit hook for hardcoded path detection 2026-04-21 00:29:33 +00:00
f098cf8c4a feat: add hardcoded path guard module (#921)
- Detects /Users/, /home/, ~/ in tool arguments
- Source code scanner for CI/pre-commit
- Runtime guard for tool dispatch
- noqa: hardcoded-path-ok escape hatch

Closes #921
2026-04-21 00:29:12 +00:00
5 changed files with 198 additions and 370 deletions

View File

@@ -0,0 +1,78 @@
#!/usr/bin/env python3
"""
Pre-commit hook: Reject hardcoded home-directory paths.
Install:
cp pre-commit-hardcoded-path.py .git/hooks/pre-commit-hardcoded-path
chmod +x .git/hooks/pre-commit-hardcoded-path
Or add to .pre-commit-config.yaml
"""
import sys
import subprocess
import re
PATTERNS = [
(r"/Users/[\w.\-]+/", "macOS home directory"),
(r"/home/[\w.\-]+/", "Linux home directory"),
(r"(?<![\w/])~/", "unexpanded tilde"),
]
NOQA = re.compile(r"#\s*noqa:?\s*hardcoded-path-ok")
def get_staged_files():
result = subprocess.run(
["git", "diff", "--cached", "--name-only", "--diff-filter=ACM"],
capture_output=True, text=True
)
return [f for f in result.stdout.strip().split("\n") if f.endswith(".py")]
def check_file(filepath):
try:
result = subprocess.run(
["git", "show", f":{filepath}"],
capture_output=True, text=True
)
content = result.stdout
except Exception:
return []
violations = []
for i, line in enumerate(content.split("\n"), 1):
if line.strip().startswith("#"):
continue
if line.strip().startswith(("import ", "from ")):
continue
if NOQA.search(line):
continue
for pattern, desc in PATTERNS:
if re.search(pattern, line):
violations.append((filepath, i, line.strip(), desc))
break
return violations
def main():
files = get_staged_files()
if not files:
sys.exit(0)
all_violations = []
for f in files:
all_violations.extend(check_file(f))
if all_violations:
print("ERROR: Hardcoded home directory paths detected:")
print()
for filepath, line_no, line, desc in all_violations:
print(f" {filepath}:{line_no}: {desc}")
print(f" {line[:100]}")
print()
print("Fix: Use $HOME, relative paths, or get_hermes_home().")
print("Override: Add '# noqa: hardcoded-path-ok' to the line.")
sys.exit(1)
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -1,273 +0,0 @@
"""
Circuit Breaker for Error Cascading — #885
P(error | prev was error) = 58.6% vs P(error | prev was success) = 25.2%.
That's a 2.33x cascade factor. After 3 consecutive errors, the circuit
opens and the agent must take corrective action.
States:
- CLOSED: Normal operation, errors are counted
- OPEN: Too many consecutive errors, corrective action required
- HALF_OPEN: Testing if errors have cleared
Usage:
from agent.circuit_breaker import CircuitBreaker, ToolCircuitBreaker
cb = ToolCircuitBreaker()
# After each tool call
if not cb.record_result(success=True):
# Circuit is open — take corrective action
cb.get_recovery_action()
"""
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Too many errors, block execution
HALF_OPEN = "half_open" # Testing recovery
@dataclass
class CircuitBreaker:
"""
Generic circuit breaker with configurable thresholds.
Tracks consecutive errors and opens the circuit when the
error streak exceeds the threshold.
"""
failure_threshold: int = 3
recovery_timeout: float = 30.0 # seconds before trying half-open
success_threshold: int = 2 # successes needed to close from half-open
state: CircuitState = field(default=CircuitState.CLOSED, init=False)
consecutive_failures: int = field(default=0, init=False)
consecutive_successes: int = field(default=0, init=False)
last_failure_time: Optional[float] = field(default=None, init=False)
total_trips: int = field(default=0, init=False)
error_streaks: List[int] = field(default_factory=list, init=False)
def record_result(self, success: bool) -> bool:
"""
Record a tool call result. Returns True if circuit allows execution.
Returns:
True if circuit is CLOSED or HALF_OPEN (execution allowed)
False if circuit is OPEN (execution blocked)
"""
now = time.time()
if self.state == CircuitState.OPEN:
# Check if recovery timeout has passed
if self.last_failure_time and (now - self.last_failure_time) >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.consecutive_successes = 0
return True # Allow one test execution
return False # Still open
if success:
self.consecutive_failures = 0
self.consecutive_successes += 1
if self.state == CircuitState.HALF_OPEN:
if self.consecutive_successes >= self.success_threshold:
self.state = CircuitState.CLOSED
self.consecutive_successes = 0
return True
else:
self.consecutive_successes = 0
self.consecutive_failures += 1
self.last_failure_time = now
if self.state == CircuitState.HALF_OPEN:
# Failed during recovery — reopen immediately
self.state = CircuitState.OPEN
self.total_trips += 1
return False
if self.consecutive_failures >= self.failure_threshold:
self.state = CircuitState.OPEN
self.total_trips += 1
self.error_streaks.append(self.consecutive_failures)
return False
return True
def can_execute(self) -> bool:
"""Check if execution is allowed."""
if self.state == CircuitState.OPEN:
if self.last_failure_time:
now = time.time()
if (now - self.last_failure_time) >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.consecutive_successes = 0
return True
return False
return True
def get_state(self) -> Dict[str, Any]:
"""Get current circuit state."""
return {
"state": self.state.value,
"consecutive_failures": self.consecutive_failures,
"consecutive_successes": self.consecutive_successes,
"total_trips": self.total_trips,
"max_streak": max(self.error_streaks) if self.error_streaks else 0,
"can_execute": self.can_execute(),
}
def reset(self):
"""Reset the circuit breaker."""
self.state = CircuitState.CLOSED
self.consecutive_failures = 0
self.consecutive_successes = 0
self.last_failure_time = None
class ToolCircuitBreaker(CircuitBreaker):
"""
Circuit breaker specifically for tool call error cascading.
Provides recovery actions when the circuit opens.
"""
# Tools that are most effective at recovery (from audit data)
RECOVERY_TOOLS = [
"terminal", # Most effective — 2300 recoveries
"read_file", # Reset context by reading something
"search_files", # Find what went wrong
]
def get_recovery_action(self) -> Dict[str, Any]:
"""
Get the recommended recovery action when circuit is open.
Returns dict with action type and details.
"""
streak = self.consecutive_failures
if streak >= 9:
# After 9 errors: 41/46 recoveries via terminal
return {
"action": "terminal_only",
"reason": f"Error streak of {streak} — terminal is the only reliable recovery",
"suggested_tool": "terminal",
"suggested_command": "echo 'Resetting context'",
"severity": "critical",
}
elif streak >= 5:
return {
"action": "switch_tool_type",
"reason": f"Error streak of {streak} — switch to a different tool category",
"suggested_tools": ["read_file", "search_files", "terminal"],
"severity": "high",
}
elif streak >= self.failure_threshold:
return {
"action": "ask_user",
"reason": f"{streak} consecutive errors — ask user for guidance",
"suggested_response": "I'm encountering repeated errors. Would you like me to try a different approach?",
"severity": "medium",
}
else:
return {
"action": "continue",
"reason": f"Error streak of {streak} — within tolerance",
"severity": "low",
}
def should_compress_context(self) -> bool:
"""Determine if context compression would help recovery."""
return self.consecutive_failures >= 5
def get_blocked_tool(self) -> Optional[str]:
"""Get the tool that should be blocked (if any)."""
if self.state == CircuitState.OPEN:
return "last_failed_tool"
return None
class MultiToolCircuitBreaker:
"""
Manages per-tool circuit breakers and cross-tool cascade detection.
When one tool trips its breaker, related tools are also warned.
"""
def __init__(self):
self.breakers: Dict[str, ToolCircuitBreaker] = {}
self.global_streak: int = 0
self.last_tool: Optional[str] = None
self.last_success: bool = True
def get_breaker(self, tool_name: str) -> ToolCircuitBreaker:
"""Get or create a circuit breaker for a tool."""
if tool_name not in self.breakers:
self.breakers[tool_name] = ToolCircuitBreaker()
return self.breakers[tool_name]
def record_result(self, tool_name: str, success: bool) -> bool:
"""
Record a tool call result. Returns True if execution should continue.
"""
breaker = self.get_breaker(tool_name)
allowed = breaker.record_result(success)
# Track global streak
if success:
self.global_streak = 0
self.last_success = True
else:
self.global_streak += 1
self.last_success = False
self.last_tool = tool_name
return allowed
def can_execute(self, tool_name: str) -> bool:
"""Check if a specific tool can execute."""
breaker = self.get_breaker(tool_name)
return breaker.can_execute()
def get_global_state(self) -> Dict[str, Any]:
"""Get overall circuit breaker state."""
return {
"global_streak": self.global_streak,
"last_tool": self.last_tool,
"last_success": self.last_success,
"tool_states": {
name: breaker.get_state()
for name, breaker in self.breakers.items()
if breaker.consecutive_failures > 0 or breaker.total_trips > 0
},
"any_open": any(b.state == CircuitState.OPEN for b in self.breakers.values()),
}
def get_recovery_action(self) -> Dict[str, Any]:
"""Get recovery action based on global state."""
if self.global_streak == 0:
return {"action": "continue", "reason": "No errors"}
# Find the breaker with the worst streak
worst = max(self.breakers.values(), key=lambda b: b.consecutive_failures, default=None)
if worst and worst.consecutive_failures > 0:
return worst.get_recovery_action()
return {
"action": "continue",
"reason": f"Global streak: {self.global_streak}",
"severity": "low",
}
def reset_all(self):
"""Reset all circuit breakers."""
for breaker in self.breakers.values():
breaker.reset()
self.global_streak = 0
self.last_success = True

View File

@@ -28,6 +28,7 @@ from typing import Dict, Any, List, Optional, Tuple
from tools.registry import discover_builtin_tools, registry
from tools.tool_pokayoke import validate_tool_call, reset_circuit_breaker, get_hallucination_stats
from tools.hardcoded_path_guard import guard_tool_dispatch as _guard_hardcoded_paths
from toolsets import resolve_toolset, validate_toolset
from agent.tool_orchestrator import orchestrator
@@ -501,6 +502,12 @@ def handle_function_call(
# Prefer the caller-provided list so subagents can't overwrite
# the parent's tool set via the process-global.
sandbox_enabled = enabled_tools if enabled_tools is not None else _last_resolved_tool_names
# Poka-yoke #921: guard against hardcoded home-directory paths
_hardcoded_err = _guard_hardcoded_paths(function_name, function_args)
if _hardcoded_err:
logger.warning(f"Hardcoded path blocked: {function_name}")
return _hardcoded_err
# Poka-yoke: validate tool call before dispatch
is_valid, corrected_name, corrected_params, pokayoke_messages = validate_tool_call(function_name, function_args)
if not is_valid:

View File

@@ -1,97 +0,0 @@
"""Tests for circuit breaker (#885)."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from agent.circuit_breaker import CircuitBreaker, ToolCircuitBreaker, MultiToolCircuitBreaker, CircuitState
def test_closed_allows_execution():
cb = CircuitBreaker(failure_threshold=3)
assert cb.can_execute()
def test_opens_after_threshold():
cb = CircuitBreaker(failure_threshold=3)
cb.record_result(False)
cb.record_result(False)
assert cb.can_execute() # Still closed at 2
cb.record_result(False)
assert not cb.can_execute() # Open at 3
def test_closes_on_success():
cb = CircuitBreaker(failure_threshold=3)
cb.record_result(False)
cb.record_result(True)
assert cb.consecutive_failures == 0
def test_half_open_recovery():
cb = CircuitBreaker(failure_threshold=2, recovery_timeout=0.1, success_threshold=1)
cb.record_result(False)
cb.record_result(False)
assert cb.state == CircuitState.OPEN
import time
time.sleep(0.15)
assert cb.can_execute() # Moved to half-open
cb.record_result(True)
assert cb.state == CircuitState.CLOSED
def test_recovery_action_streak():
cb = ToolCircuitBreaker(failure_threshold=3)
for _ in range(5):
cb.record_result(False)
action = cb.get_recovery_action()
assert action["action"] == "switch_tool_type"
def test_recovery_action_critical():
cb = ToolCircuitBreaker(failure_threshold=3)
for _ in range(10):
cb.record_result(False)
action = cb.get_recovery_action()
assert action["action"] == "terminal_only"
assert action["severity"] == "critical"
def test_multi_tool_breaker():
mcb = MultiToolCircuitBreaker()
mcb.record_result("read_file", False)
mcb.record_result("read_file", False)
mcb.record_result("read_file", False)
assert not mcb.can_execute("read_file")
assert mcb.can_execute("terminal") # Different tool unaffected
def test_global_state():
mcb = MultiToolCircuitBreaker()
mcb.record_result("tool_a", False)
mcb.record_result("tool_b", False)
state = mcb.get_global_state()
assert state["global_streak"] == 2
def test_reset():
cb = CircuitBreaker(failure_threshold=2)
cb.record_result(False)
cb.record_result(False)
assert cb.state == CircuitState.OPEN
cb.reset()
assert cb.state == CircuitState.CLOSED
if __name__ == "__main__":
tests = [test_closed_allows_execution, test_opens_after_threshold,
test_closes_on_success, test_half_open_recovery,
test_recovery_action_streak, test_recovery_action_critical,
test_multi_tool_breaker, test_global_state, test_reset]
for t in tests:
print(f"Running {t.__name__}...")
t()
print(" PASS")
print("\nAll tests passed.")

View File

@@ -0,0 +1,113 @@
#!/usr/bin/env python3
"""
Hardcoded Path Guard — Poka-Yoke #921
Detects and blocks hardcoded home-directory paths in tool arguments.
These paths work on one machine but break on others, VPS deployments,
or when HOME changes.
Usage:
from tools.hardcoded_path_guard import check_path, validate_tool_args
# Check a single path
err = check_path("/Users/apayne/.hermes/config.yaml")
# Validate all path-like args in a tool call
clean_args, warnings = validate_tool_args("read_file", {"path": "/home/user/file.txt"})
"""
import os
import re
import json as _json
from typing import Dict, List, Optional, Tuple, Any
# Patterns that indicate hardcoded home directories
HARDCODED_PATTERNS = [
(r"/Users/[\w.\-]+/", "macOS home directory (/Users/...)"),
(r"/home/[\w.\-]+/", "Linux home directory (/home/...)"),
(r"(?<![\w/])~/", "unexpanded tilde (~/)"),
(r"/root/", "root home directory (/root/)"),
]
_COMPILED_PATTERNS = [(re.compile(p), desc) for p, desc in HARDCODED_PATTERNS]
_NOQA_PATTERN = re.compile(r"#\s*noqa:?\s*hardcoded-path-ok")
_PATH_ARG_NAMES = frozenset({
"path", "file_path", "filepath", "dir", "directory", "dest", "source",
"input", "output", "src", "dst", "target", "location", "file",
"image_path", "script", "config", "log_file",
})
def has_hardcoded_path(text: str) -> Optional[str]:
if _NOQA_PATTERN.search(text):
return None
for pattern, desc in _COMPILED_PATTERNS:
if pattern.search(text):
return desc
return None
def check_path(path_value: str) -> Optional[str]:
if not isinstance(path_value, str):
return None
match_desc = has_hardcoded_path(path_value)
if match_desc:
return (
f"Path contains hardcoded home directory ({match_desc}): '{path_value}'. "
f"Use $HOME, relative paths, or get_hermes_home(). "
f"Add '# noqa: hardcoded-path-ok' if intentional."
)
return None
def validate_tool_args(tool_name: str, args: Dict[str, Any]) -> Tuple[Dict[str, Any], List[str]]:
warnings = []
for key, value in args.items():
if key.lower() not in _PATH_ARG_NAMES:
continue
if isinstance(value, str):
err = check_path(value)
if err:
warnings.append(err)
elif isinstance(value, list):
for item in value:
if isinstance(item, str):
err = check_path(item)
if err:
warnings.append(err)
return args, warnings
def scan_source_for_violations(source_code: str, filename: str = "") -> List[Tuple[int, str, str]]:
violations = []
lines = source_code.split("\n")
for i, line in enumerate(lines, 1):
stripped = line.strip()
if stripped.startswith("#"):
if _NOQA_PATTERN.search(line):
continue
continue
if stripped.startswith("import ") or stripped.startswith("from "):
continue
for pattern, desc in _COMPILED_PATTERNS:
match = pattern.search(line)
if match:
if _NOQA_PATTERN.search(line):
continue
violations.append((i, line.strip(), desc))
break
return violations
def guard_tool_dispatch(tool_name: str, args: Dict[str, Any]) -> Optional[str]:
_, warnings = validate_tool_args(tool_name, args)
if warnings:
return _json.dumps({
"error": "Hardcoded home directory path detected",
"details": warnings,
"suggestion": "Use $HOME, relative paths, or get_hermes_home() instead of hardcoded paths.",
"pokayoke": True,
"rule": "hardcoded-path-guard"
})
return None