Compare commits
1 Commits
fix/921-ha
...
fix/886
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ab968e910c |
@@ -1,78 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Pre-commit hook: Reject hardcoded home-directory paths.
|
||||
|
||||
Install:
|
||||
cp pre-commit-hardcoded-path.py .git/hooks/pre-commit-hardcoded-path
|
||||
chmod +x .git/hooks/pre-commit-hardcoded-path
|
||||
|
||||
Or add to .pre-commit-config.yaml
|
||||
"""
|
||||
|
||||
import sys
|
||||
import subprocess
|
||||
import re
|
||||
|
||||
PATTERNS = [
|
||||
(r"/Users/[\w.\-]+/", "macOS home directory"),
|
||||
(r"/home/[\w.\-]+/", "Linux home directory"),
|
||||
(r"(?<![\w/])~/", "unexpanded tilde"),
|
||||
]
|
||||
|
||||
NOQA = re.compile(r"#\s*noqa:?\s*hardcoded-path-ok")
|
||||
|
||||
def get_staged_files():
|
||||
result = subprocess.run(
|
||||
["git", "diff", "--cached", "--name-only", "--diff-filter=ACM"],
|
||||
capture_output=True, text=True
|
||||
)
|
||||
return [f for f in result.stdout.strip().split("\n") if f.endswith(".py")]
|
||||
|
||||
def check_file(filepath):
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "show", f":{filepath}"],
|
||||
capture_output=True, text=True
|
||||
)
|
||||
content = result.stdout
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
violations = []
|
||||
for i, line in enumerate(content.split("\n"), 1):
|
||||
if line.strip().startswith("#"):
|
||||
continue
|
||||
if line.strip().startswith(("import ", "from ")):
|
||||
continue
|
||||
if NOQA.search(line):
|
||||
continue
|
||||
for pattern, desc in PATTERNS:
|
||||
if re.search(pattern, line):
|
||||
violations.append((filepath, i, line.strip(), desc))
|
||||
break
|
||||
return violations
|
||||
|
||||
def main():
|
||||
files = get_staged_files()
|
||||
if not files:
|
||||
sys.exit(0)
|
||||
|
||||
all_violations = []
|
||||
for f in files:
|
||||
all_violations.extend(check_file(f))
|
||||
|
||||
if all_violations:
|
||||
print("ERROR: Hardcoded home directory paths detected:")
|
||||
print()
|
||||
for filepath, line_no, line, desc in all_violations:
|
||||
print(f" {filepath}:{line_no}: {desc}")
|
||||
print(f" {line[:100]}")
|
||||
print()
|
||||
print("Fix: Use $HOME, relative paths, or get_hermes_home().")
|
||||
print("Override: Add '# noqa: hardcoded-path-ok' to the line.")
|
||||
sys.exit(1)
|
||||
|
||||
sys.exit(0)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
156
agent/tool_fixation_detector.py
Normal file
156
agent/tool_fixation_detector.py
Normal file
@@ -0,0 +1,156 @@
|
||||
"""Tool fixation detection — break repetitive tool calling loops.
|
||||
|
||||
Detects when the agent latches onto one tool and calls it repeatedly
|
||||
without making progress. Injects a nudge prompt to break the loop.
|
||||
|
||||
Usage:
|
||||
from agent.tool_fixation_detector import ToolFixationDetector
|
||||
detector = ToolFixationDetector()
|
||||
nudge = detector.record("execute_code")
|
||||
if nudge:
|
||||
# Inject nudge into conversation
|
||||
messages.append({"role": "system", "content": nudge})
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
|
||||
# Default thresholds
|
||||
_DEFAULT_THRESHOLD = int(os.getenv("TOOL_FIXATION_THRESHOLD", "5"))
|
||||
_DEFAULT_WINDOW = int(os.getenv("TOOL_FIXATION_WINDOW", "10"))
|
||||
|
||||
|
||||
@dataclass
|
||||
class FixationEvent:
|
||||
"""Record of a fixation detection."""
|
||||
tool_name: str
|
||||
streak_length: int
|
||||
threshold: int
|
||||
nudge_sent: bool = False
|
||||
|
||||
|
||||
class ToolFixationDetector:
|
||||
"""Detects and breaks tool fixation loops.
|
||||
|
||||
Tracks the sequence of tool calls and detects when the same tool
|
||||
is called N times consecutively. When detected, returns a nudge
|
||||
prompt to inject into the conversation.
|
||||
"""
|
||||
|
||||
def __init__(self, threshold: int = 0, window: int = 0):
|
||||
self.threshold = threshold or _DEFAULT_THRESHOLD
|
||||
self.window = window or _DEFAULT_WINDOW
|
||||
self._history: List[str] = []
|
||||
self._current_streak: str = ""
|
||||
self._streak_count: int = 0
|
||||
self._nudges_sent: int = 0
|
||||
self._events: List[FixationEvent] = []
|
||||
|
||||
@property
|
||||
def nudges_sent(self) -> int:
|
||||
return self._nudges_sent
|
||||
|
||||
@property
|
||||
def events(self) -> List[FixationEvent]:
|
||||
return list(self._events)
|
||||
|
||||
def record(self, tool_name: str) -> Optional[str]:
|
||||
"""Record a tool call and return nudge prompt if fixation detected.
|
||||
|
||||
Args:
|
||||
tool_name: Name of the tool that was called.
|
||||
|
||||
Returns:
|
||||
Nudge prompt string if fixation detected, None otherwise.
|
||||
"""
|
||||
self._history.append(tool_name)
|
||||
|
||||
# Trim history to window
|
||||
if len(self._history) > self.window:
|
||||
self._history = self._history[-self.window:]
|
||||
|
||||
# Update streak
|
||||
if tool_name == self._current_streak:
|
||||
self._streak_count += 1
|
||||
else:
|
||||
self._current_streak = tool_name
|
||||
self._streak_count = 1
|
||||
|
||||
# Check for fixation
|
||||
if self._streak_count >= self.threshold:
|
||||
event = FixationEvent(
|
||||
tool_name=tool_name,
|
||||
streak_length=self._streak_count,
|
||||
threshold=self.threshold,
|
||||
nudge_sent=True,
|
||||
)
|
||||
self._events.append(event)
|
||||
self._nudges_sent += 1
|
||||
|
||||
return self._build_nudge(tool_name, self._streak_count)
|
||||
|
||||
return None
|
||||
|
||||
def _build_nudge(self, tool_name: str, count: int) -> str:
|
||||
"""Build a nudge prompt to break the fixation loop."""
|
||||
return (
|
||||
f"[SYSTEM: You have called `{tool_name}` {count} times in a row "
|
||||
f"without switching tools. This suggests a fixation loop. "
|
||||
f"Consider:\n"
|
||||
f"1. Is the tool returning an error? Read the error carefully.\n"
|
||||
f"2. Is there a different tool that could help?\n"
|
||||
f"3. Should you ask the user for clarification?\n"
|
||||
f"4. Is the task actually complete?\n"
|
||||
f"Break the loop by trying a different approach.]"
|
||||
)
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset the detector state."""
|
||||
self._history.clear()
|
||||
self._current_streak = ""
|
||||
self._streak_count = 0
|
||||
|
||||
def get_streak_info(self) -> dict:
|
||||
"""Get current streak information."""
|
||||
return {
|
||||
"current_tool": self._current_streak,
|
||||
"streak_count": self._streak_count,
|
||||
"threshold": self.threshold,
|
||||
"at_threshold": self._streak_count >= self.threshold,
|
||||
"nudges_sent": self._nudges_sent,
|
||||
}
|
||||
|
||||
def format_report(self) -> str:
|
||||
"""Format fixation events as a report."""
|
||||
if not self._events:
|
||||
return "No tool fixation detected."
|
||||
|
||||
lines = [
|
||||
f"Tool Fixation Report ({len(self._events)} events)",
|
||||
"=" * 40,
|
||||
]
|
||||
for e in self._events:
|
||||
lines.append(f" {e.tool_name}: {e.streak_length} consecutive calls (threshold: {e.threshold})")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# Singleton
|
||||
_detector: Optional[ToolFixationDetector] = None
|
||||
|
||||
|
||||
def get_fixation_detector() -> ToolFixationDetector:
|
||||
"""Get or create the singleton detector."""
|
||||
global _detector
|
||||
if _detector is None:
|
||||
_detector = ToolFixationDetector()
|
||||
return _detector
|
||||
|
||||
|
||||
def reset_fixation_detector() -> None:
|
||||
"""Reset the singleton."""
|
||||
global _detector
|
||||
_detector = None
|
||||
@@ -28,7 +28,6 @@ from typing import Dict, Any, List, Optional, Tuple
|
||||
|
||||
from tools.registry import discover_builtin_tools, registry
|
||||
from tools.tool_pokayoke import validate_tool_call, reset_circuit_breaker, get_hallucination_stats
|
||||
from tools.hardcoded_path_guard import guard_tool_dispatch as _guard_hardcoded_paths
|
||||
from toolsets import resolve_toolset, validate_toolset
|
||||
from agent.tool_orchestrator import orchestrator
|
||||
|
||||
@@ -502,12 +501,6 @@ def handle_function_call(
|
||||
# Prefer the caller-provided list so subagents can't overwrite
|
||||
# the parent's tool set via the process-global.
|
||||
sandbox_enabled = enabled_tools if enabled_tools is not None else _last_resolved_tool_names
|
||||
# Poka-yoke #921: guard against hardcoded home-directory paths
|
||||
_hardcoded_err = _guard_hardcoded_paths(function_name, function_args)
|
||||
if _hardcoded_err:
|
||||
logger.warning(f"Hardcoded path blocked: {function_name}")
|
||||
return _hardcoded_err
|
||||
|
||||
# Poka-yoke: validate tool call before dispatch
|
||||
is_valid, corrected_name, corrected_params, pokayoke_messages = validate_tool_call(function_name, function_args)
|
||||
if not is_valid:
|
||||
|
||||
76
tests/test_tool_fixation_detector.py
Normal file
76
tests/test_tool_fixation_detector.py
Normal file
@@ -0,0 +1,76 @@
|
||||
"""Tests for tool fixation detection."""
|
||||
|
||||
import pytest
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
from agent.tool_fixation_detector import ToolFixationDetector, get_fixation_detector
|
||||
|
||||
|
||||
class TestFixationDetection:
|
||||
def test_no_fixation_below_threshold(self):
|
||||
d = ToolFixationDetector(threshold=5)
|
||||
for i in range(4):
|
||||
assert d.record("execute_code") is None
|
||||
|
||||
def test_fixation_at_threshold(self):
|
||||
d = ToolFixationDetector(threshold=3)
|
||||
d.record("execute_code")
|
||||
d.record("execute_code")
|
||||
nudge = d.record("execute_code")
|
||||
assert nudge is not None
|
||||
assert "execute_code" in nudge
|
||||
assert "3 times" in nudge
|
||||
|
||||
def test_fixation_above_threshold(self):
|
||||
d = ToolFixationDetector(threshold=3)
|
||||
d.record("execute_code")
|
||||
d.record("execute_code")
|
||||
d.record("execute_code") # threshold hit
|
||||
nudge = d.record("execute_code") # still nudging
|
||||
assert nudge is not None
|
||||
|
||||
def test_streak_resets_on_different_tool(self):
|
||||
d = ToolFixationDetector(threshold=3)
|
||||
d.record("execute_code")
|
||||
d.record("execute_code")
|
||||
d.record("terminal") # breaks streak
|
||||
assert d._streak_count == 1
|
||||
assert d._current_streak == "terminal"
|
||||
|
||||
def test_nudges_sent_counter(self):
|
||||
d = ToolFixationDetector(threshold=2)
|
||||
d.record("a")
|
||||
d.record("a") # nudge 1
|
||||
d.record("a") # nudge 2
|
||||
assert d.nudges_sent == 2
|
||||
|
||||
def test_events_recorded(self):
|
||||
d = ToolFixationDetector(threshold=2)
|
||||
d.record("x")
|
||||
d.record("x")
|
||||
assert len(d.events) == 1
|
||||
assert d.events[0].tool_name == "x"
|
||||
assert d.events[0].streak_length == 2
|
||||
|
||||
def test_report(self):
|
||||
d = ToolFixationDetector(threshold=2)
|
||||
d.record("x")
|
||||
d.record("x")
|
||||
report = d.format_report()
|
||||
assert "x" in report
|
||||
|
||||
def test_reset(self):
|
||||
d = ToolFixationDetector(threshold=2)
|
||||
d.record("x")
|
||||
d.record("x")
|
||||
d.reset()
|
||||
assert d._streak_count == 0
|
||||
assert d._current_streak == ""
|
||||
|
||||
def test_singleton(self):
|
||||
d1 = get_fixation_detector()
|
||||
d2 = get_fixation_detector()
|
||||
assert d1 is d2
|
||||
@@ -1,113 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Hardcoded Path Guard — Poka-Yoke #921
|
||||
|
||||
Detects and blocks hardcoded home-directory paths in tool arguments.
|
||||
These paths work on one machine but break on others, VPS deployments,
|
||||
or when HOME changes.
|
||||
|
||||
Usage:
|
||||
from tools.hardcoded_path_guard import check_path, validate_tool_args
|
||||
|
||||
# Check a single path
|
||||
err = check_path("/Users/apayne/.hermes/config.yaml")
|
||||
|
||||
# Validate all path-like args in a tool call
|
||||
clean_args, warnings = validate_tool_args("read_file", {"path": "/home/user/file.txt"})
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import json as _json
|
||||
from typing import Dict, List, Optional, Tuple, Any
|
||||
|
||||
# Patterns that indicate hardcoded home directories
|
||||
HARDCODED_PATTERNS = [
|
||||
(r"/Users/[\w.\-]+/", "macOS home directory (/Users/...)"),
|
||||
(r"/home/[\w.\-]+/", "Linux home directory (/home/...)"),
|
||||
(r"(?<![\w/])~/", "unexpanded tilde (~/)"),
|
||||
(r"/root/", "root home directory (/root/)"),
|
||||
]
|
||||
|
||||
_COMPILED_PATTERNS = [(re.compile(p), desc) for p, desc in HARDCODED_PATTERNS]
|
||||
_NOQA_PATTERN = re.compile(r"#\s*noqa:?\s*hardcoded-path-ok")
|
||||
|
||||
_PATH_ARG_NAMES = frozenset({
|
||||
"path", "file_path", "filepath", "dir", "directory", "dest", "source",
|
||||
"input", "output", "src", "dst", "target", "location", "file",
|
||||
"image_path", "script", "config", "log_file",
|
||||
})
|
||||
|
||||
|
||||
def has_hardcoded_path(text: str) -> Optional[str]:
|
||||
if _NOQA_PATTERN.search(text):
|
||||
return None
|
||||
for pattern, desc in _COMPILED_PATTERNS:
|
||||
if pattern.search(text):
|
||||
return desc
|
||||
return None
|
||||
|
||||
|
||||
def check_path(path_value: str) -> Optional[str]:
|
||||
if not isinstance(path_value, str):
|
||||
return None
|
||||
match_desc = has_hardcoded_path(path_value)
|
||||
if match_desc:
|
||||
return (
|
||||
f"Path contains hardcoded home directory ({match_desc}): '{path_value}'. "
|
||||
f"Use $HOME, relative paths, or get_hermes_home(). "
|
||||
f"Add '# noqa: hardcoded-path-ok' if intentional."
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def validate_tool_args(tool_name: str, args: Dict[str, Any]) -> Tuple[Dict[str, Any], List[str]]:
|
||||
warnings = []
|
||||
for key, value in args.items():
|
||||
if key.lower() not in _PATH_ARG_NAMES:
|
||||
continue
|
||||
if isinstance(value, str):
|
||||
err = check_path(value)
|
||||
if err:
|
||||
warnings.append(err)
|
||||
elif isinstance(value, list):
|
||||
for item in value:
|
||||
if isinstance(item, str):
|
||||
err = check_path(item)
|
||||
if err:
|
||||
warnings.append(err)
|
||||
return args, warnings
|
||||
|
||||
|
||||
def scan_source_for_violations(source_code: str, filename: str = "") -> List[Tuple[int, str, str]]:
|
||||
violations = []
|
||||
lines = source_code.split("\n")
|
||||
for i, line in enumerate(lines, 1):
|
||||
stripped = line.strip()
|
||||
if stripped.startswith("#"):
|
||||
if _NOQA_PATTERN.search(line):
|
||||
continue
|
||||
continue
|
||||
if stripped.startswith("import ") or stripped.startswith("from "):
|
||||
continue
|
||||
for pattern, desc in _COMPILED_PATTERNS:
|
||||
match = pattern.search(line)
|
||||
if match:
|
||||
if _NOQA_PATTERN.search(line):
|
||||
continue
|
||||
violations.append((i, line.strip(), desc))
|
||||
break
|
||||
return violations
|
||||
|
||||
|
||||
def guard_tool_dispatch(tool_name: str, args: Dict[str, Any]) -> Optional[str]:
|
||||
_, warnings = validate_tool_args(tool_name, args)
|
||||
if warnings:
|
||||
return _json.dumps({
|
||||
"error": "Hardcoded home directory path detected",
|
||||
"details": warnings,
|
||||
"suggestion": "Use $HOME, relative paths, or get_hermes_home() instead of hardcoded paths.",
|
||||
"pokayoke": True,
|
||||
"rule": "hardcoded-path-guard"
|
||||
})
|
||||
return None
|
||||
@@ -44,34 +44,6 @@ from typing import Dict, Any, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _format_error(
|
||||
message: str,
|
||||
skill_name: str = None,
|
||||
file_path: str = None,
|
||||
suggestion: str = None,
|
||||
context: dict = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""Format an error with rich context for better debugging."""
|
||||
parts = [message]
|
||||
if skill_name:
|
||||
parts.append(f"Skill: {skill_name}")
|
||||
if file_path:
|
||||
parts.append(f"File: {file_path}")
|
||||
if suggestion:
|
||||
parts.append(f"Suggestion: {suggestion}")
|
||||
if context:
|
||||
for key, value in context.items():
|
||||
parts.append(f"{key}: {value}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": " | ".join(parts),
|
||||
"skill_name": skill_name,
|
||||
"file_path": file_path,
|
||||
"suggestion": suggestion,
|
||||
}
|
||||
|
||||
|
||||
# Import security scanner — agent-created skills get the same scrutiny as
|
||||
# community hub installs.
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user