Compare commits

..

3 Commits

Author SHA1 Message Date
4cdda8701d feat: integrate hardcoded path guard into tool dispatch
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Contributor Attribution Check / check-attribution (pull_request) Failing after 32s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 32s
Tests / e2e (pull_request) Successful in 2m56s
Tests / test (pull_request) Failing after 1h1m7s
2026-04-21 00:31:01 +00:00
a80d30b342 feat: add pre-commit hook for hardcoded path detection 2026-04-21 00:29:33 +00:00
f098cf8c4a feat: add hardcoded path guard module (#921)
- Detects /Users/, /home/, ~/ in tool arguments
- Source code scanner for CI/pre-commit
- Runtime guard for tool dispatch
- noqa: hardcoded-path-ok escape hatch

Closes #921
2026-04-21 00:29:12 +00:00
4 changed files with 198 additions and 165 deletions

View File

@@ -0,0 +1,78 @@
#!/usr/bin/env python3
"""
Pre-commit hook: Reject hardcoded home-directory paths.
Install:
cp pre-commit-hardcoded-path.py .git/hooks/pre-commit-hardcoded-path
chmod +x .git/hooks/pre-commit-hardcoded-path
Or add to .pre-commit-config.yaml
"""
import sys
import subprocess
import re
PATTERNS = [
(r"/Users/[\w.\-]+/", "macOS home directory"),
(r"/home/[\w.\-]+/", "Linux home directory"),
(r"(?<![\w/])~/", "unexpanded tilde"),
]
NOQA = re.compile(r"#\s*noqa:?\s*hardcoded-path-ok")
def get_staged_files():
result = subprocess.run(
["git", "diff", "--cached", "--name-only", "--diff-filter=ACM"],
capture_output=True, text=True
)
return [f for f in result.stdout.strip().split("\n") if f.endswith(".py")]
def check_file(filepath):
try:
result = subprocess.run(
["git", "show", f":{filepath}"],
capture_output=True, text=True
)
content = result.stdout
except Exception:
return []
violations = []
for i, line in enumerate(content.split("\n"), 1):
if line.strip().startswith("#"):
continue
if line.strip().startswith(("import ", "from ")):
continue
if NOQA.search(line):
continue
for pattern, desc in PATTERNS:
if re.search(pattern, line):
violations.append((filepath, i, line.strip(), desc))
break
return violations
def main():
files = get_staged_files()
if not files:
sys.exit(0)
all_violations = []
for f in files:
all_violations.extend(check_file(f))
if all_violations:
print("ERROR: Hardcoded home directory paths detected:")
print()
for filepath, line_no, line, desc in all_violations:
print(f" {filepath}:{line_no}: {desc}")
print(f" {line[:100]}")
print()
print("Fix: Use $HOME, relative paths, or get_hermes_home().")
print("Override: Add '# noqa: hardcoded-path-ok' to the line.")
sys.exit(1)
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -1,165 +0,0 @@
"""Token Budget — Poka-yoke guard against context overflow.
Progressive warning system with circuit breakers:
- 60%: Log warning, suggest summarization
- 80%: Auto-compress, drop raw tool outputs
- 90%: Block verbose tools, force wrap-up
- 95%: Graceful termination with summary
Usage:
from agent.token_budget import TokenBudget
budget = TokenBudget(max_tokens=128000)
budget.record_usage(prompt_tokens=500, completion_tokens=200)
status = budget.check()
# status.level: ok, warning, compress, block, terminate
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
class BudgetLevel(Enum):
"""Token budget alert levels."""
OK = "ok" # < 60%
WARNING = "warning" # 60-80%
COMPRESS = "compress" # 80-90%
BLOCK = "block" # 90-95%
TERMINATE = "terminate" # > 95%
@dataclass
class BudgetStatus:
"""Current budget status."""
level: BudgetLevel
used_tokens: int
max_tokens: int
percentage: float
remaining: int
message: str
actions: List[str] = field(default_factory=list)
# Default thresholds
THRESHOLDS = {
BudgetLevel.WARNING: 0.60,
BudgetLevel.COMPRESS: 0.80,
BudgetLevel.BLOCK: 0.90,
BudgetLevel.TERMINATE: 0.95,
}
class TokenBudget:
"""Track token usage and enforce context limits."""
def __init__(self, max_tokens: int = 128000,
thresholds: Optional[Dict[BudgetLevel, float]] = None):
self._max_tokens = max_tokens
self._thresholds = thresholds or THRESHOLDS
self._prompt_tokens = 0
self._completion_tokens = 0
self._tool_output_tokens = 0
self._history: List[Dict[str, Any]] = []
@property
def used_tokens(self) -> int:
return self._prompt_tokens + self._completion_tokens
@property
def remaining(self) -> int:
return max(0, self._max_tokens - self.used_tokens)
@property
def percentage(self) -> float:
if self._max_tokens == 0:
return 0
return self.used_tokens / self._max_tokens
def record_usage(self, prompt_tokens: int = 0, completion_tokens: int = 0,
tool_output_tokens: int = 0):
"""Record token usage from an API call."""
self._prompt_tokens += prompt_tokens
self._completion_tokens += completion_tokens
self._tool_output_tokens += tool_output_tokens
self._history.append({
"time": time.time(),
"prompt": prompt_tokens,
"completion": completion_tokens,
"tool_output": tool_output_tokens,
"total_used": self.used_tokens,
})
def check(self) -> BudgetStatus:
"""Check current budget status and return appropriate actions."""
pct = self.percentage
if pct >= self._thresholds.get(BudgetLevel.TERMINATE, 0.95):
level = BudgetLevel.TERMINATE
msg = f"Context {pct:.0%} full. Session must terminate with summary."
actions = ["generate_summary", "terminate_session"]
elif pct >= self._thresholds.get(BudgetLevel.BLOCK, 0.90):
level = BudgetLevel.BLOCK
msg = f"Context {pct:.0%} full. Blocking verbose tool calls."
actions = ["block_verbose_tools", "force_wrap_up", "suggest_summary"]
elif pct >= self._thresholds.get(BudgetLevel.COMPRESS, 0.80):
level = BudgetLevel.COMPRESS
msg = f"Context {pct:.0%} full. Auto-compressing conversation."
actions = ["auto_compress", "drop_raw_tool_outputs", "suggest_summary"]
elif pct >= self._thresholds.get(BudgetLevel.WARNING, 0.60):
level = BudgetLevel.WARNING
msg = f"Context {pct:.0%} used. Consider summarizing."
actions = ["suggest_summary", "log_warning"]
else:
level = BudgetLevel.OK
msg = f"Context OK: {self.used_tokens}/{self._max_tokens} tokens ({pct:.0%})"
actions = []
return BudgetStatus(
level=level,
used_tokens=self.used_tokens,
max_tokens=self._max_tokens,
percentage=round(pct, 3),
remaining=self.remaining,
message=msg,
actions=actions,
)
def should_truncate_tool_output(self, estimated_tokens: int) -> bool:
"""Check if a tool output should be truncated."""
if self.used_tokens + estimated_tokens > self._max_tokens * 0.95:
return True
return False
def get_truncation_budget(self) -> int:
"""Get max tokens available for next tool output."""
budget = self.remaining - int(self._max_tokens * 0.05) # Reserve 5%
return max(0, budget)
def reset(self):
"""Reset budget for new session."""
self._prompt_tokens = 0
self._completion_tokens = 0
self._tool_output_tokens = 0
self._history.clear()
def get_report(self) -> Dict[str, Any]:
"""Generate usage report."""
status = self.check()
return {
"status": status.level.value,
"used_tokens": self.used_tokens,
"max_tokens": self._max_tokens,
"remaining": self.remaining,
"percentage": status.percentage,
"prompt_tokens": self._prompt_tokens,
"completion_tokens": self._completion_tokens,
"tool_output_tokens": self._tool_output_tokens,
"message": status.message,
"actions": status.actions,
}

View File

@@ -28,6 +28,7 @@ from typing import Dict, Any, List, Optional, Tuple
from tools.registry import discover_builtin_tools, registry
from tools.tool_pokayoke import validate_tool_call, reset_circuit_breaker, get_hallucination_stats
from tools.hardcoded_path_guard import guard_tool_dispatch as _guard_hardcoded_paths
from toolsets import resolve_toolset, validate_toolset
from agent.tool_orchestrator import orchestrator
@@ -501,6 +502,12 @@ def handle_function_call(
# Prefer the caller-provided list so subagents can't overwrite
# the parent's tool set via the process-global.
sandbox_enabled = enabled_tools if enabled_tools is not None else _last_resolved_tool_names
# Poka-yoke #921: guard against hardcoded home-directory paths
_hardcoded_err = _guard_hardcoded_paths(function_name, function_args)
if _hardcoded_err:
logger.warning(f"Hardcoded path blocked: {function_name}")
return _hardcoded_err
# Poka-yoke: validate tool call before dispatch
is_valid, corrected_name, corrected_params, pokayoke_messages = validate_tool_call(function_name, function_args)
if not is_valid:

View File

@@ -0,0 +1,113 @@
#!/usr/bin/env python3
"""
Hardcoded Path Guard — Poka-Yoke #921
Detects and blocks hardcoded home-directory paths in tool arguments.
These paths work on one machine but break on others, VPS deployments,
or when HOME changes.
Usage:
from tools.hardcoded_path_guard import check_path, validate_tool_args
# Check a single path
err = check_path("/Users/apayne/.hermes/config.yaml")
# Validate all path-like args in a tool call
clean_args, warnings = validate_tool_args("read_file", {"path": "/home/user/file.txt"})
"""
import os
import re
import json as _json
from typing import Dict, List, Optional, Tuple, Any
# Patterns that indicate hardcoded home directories
HARDCODED_PATTERNS = [
(r"/Users/[\w.\-]+/", "macOS home directory (/Users/...)"),
(r"/home/[\w.\-]+/", "Linux home directory (/home/...)"),
(r"(?<![\w/])~/", "unexpanded tilde (~/)"),
(r"/root/", "root home directory (/root/)"),
]
_COMPILED_PATTERNS = [(re.compile(p), desc) for p, desc in HARDCODED_PATTERNS]
_NOQA_PATTERN = re.compile(r"#\s*noqa:?\s*hardcoded-path-ok")
_PATH_ARG_NAMES = frozenset({
"path", "file_path", "filepath", "dir", "directory", "dest", "source",
"input", "output", "src", "dst", "target", "location", "file",
"image_path", "script", "config", "log_file",
})
def has_hardcoded_path(text: str) -> Optional[str]:
if _NOQA_PATTERN.search(text):
return None
for pattern, desc in _COMPILED_PATTERNS:
if pattern.search(text):
return desc
return None
def check_path(path_value: str) -> Optional[str]:
if not isinstance(path_value, str):
return None
match_desc = has_hardcoded_path(path_value)
if match_desc:
return (
f"Path contains hardcoded home directory ({match_desc}): '{path_value}'. "
f"Use $HOME, relative paths, or get_hermes_home(). "
f"Add '# noqa: hardcoded-path-ok' if intentional."
)
return None
def validate_tool_args(tool_name: str, args: Dict[str, Any]) -> Tuple[Dict[str, Any], List[str]]:
warnings = []
for key, value in args.items():
if key.lower() not in _PATH_ARG_NAMES:
continue
if isinstance(value, str):
err = check_path(value)
if err:
warnings.append(err)
elif isinstance(value, list):
for item in value:
if isinstance(item, str):
err = check_path(item)
if err:
warnings.append(err)
return args, warnings
def scan_source_for_violations(source_code: str, filename: str = "") -> List[Tuple[int, str, str]]:
violations = []
lines = source_code.split("\n")
for i, line in enumerate(lines, 1):
stripped = line.strip()
if stripped.startswith("#"):
if _NOQA_PATTERN.search(line):
continue
continue
if stripped.startswith("import ") or stripped.startswith("from "):
continue
for pattern, desc in _COMPILED_PATTERNS:
match = pattern.search(line)
if match:
if _NOQA_PATTERN.search(line):
continue
violations.append((i, line.strip(), desc))
break
return violations
def guard_tool_dispatch(tool_name: str, args: Dict[str, Any]) -> Optional[str]:
_, warnings = validate_tool_args(tool_name, args)
if warnings:
return _json.dumps({
"error": "Hardcoded home directory path detected",
"details": warnings,
"suggestion": "Use $HOME, relative paths, or get_hermes_home() instead of hardcoded paths.",
"pokayoke": True,
"rule": "hardcoded-path-guard"
})
return None