Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
969ef22f99 feat: tighten tool hallucination circuit breaker for #836
All checks were successful
Lint / lint (pull_request) Successful in 8s
2026-04-22 03:10:06 -04:00
Alexander Whitestone
4674889c0f test: lock tool hallucination poka-yoke for #836 2026-04-22 03:09:57 -04:00
9 changed files with 51 additions and 32 deletions

View File

@@ -13,11 +13,9 @@ import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
HERMES_HOME = get_hermes_home()
HERMES_HOME = Path.home() / ".hermes"
CHECKPOINT_DIR = HERMES_HOME / "checkpoints"
CHARS_PER_TOKEN = 4

View File

@@ -56,7 +56,7 @@ VIOLATIONS = [
"id": "expanduser-hermes",
"name": "os.path.expanduser ~/.hermes (non-fallback)",
"pattern": r'os\.path\.expanduser\(["\']~/.hermes',
"exclude_with": r'#|HERMES_HOME',
"exclude_with": r'#',
"message": "Use `os.environ.get('HERMES_HOME', os.path.expanduser('~/.hermes'))` instead",
},
]

View File

@@ -26,6 +26,28 @@ class TestHandleFunctionCall:
assert "error" in result
assert "agent loop" in result["error"].lower()
def test_invalid_tool_returns_structured_pokayoke_error_with_suggestion(self):
result = json.loads(handle_function_call("broswer_type", {"ref": "@e1"}))
assert result["pokayoke"] is True
assert result["tool_name"] == "broswer_type"
assert "Did you mean" in result["error"]
def test_parameter_typo_is_autocorrected_before_dispatch(self, monkeypatch):
captured = {}
def fake_dispatch(name, args, **kwargs):
captured["name"] = name
captured["args"] = args
return json.dumps({"ok": True})
monkeypatch.setattr("model_tools.registry.dispatch", fake_dispatch)
result = json.loads(handle_function_call("read_file", {"pathe": "test.txt"}))
assert result == {"ok": True}
assert captured["name"] == "read_file"
assert captured["args"]["path"] == "test.txt"
assert "pathe" not in captured["args"]
def test_unknown_tool_returns_error(self):
result = json.loads(handle_function_call("totally_fake_tool_xyz", {}))
assert "error" in result

View File

@@ -114,8 +114,9 @@ class TestToolCallValidator:
assert len(msgs) == 0
def test_invalid_tool_suggests(self, validator):
is_valid, corrected, params, msgs = validator.validate("browser_typo", {"ref": "@e1"})
is_valid, corrected, params, msgs = validator.validate("broswer_type", {"ref": "@e1"})
assert is_valid is False
assert corrected is None
assert "browser_type" in str(msgs)
def test_auto_correct_tool_name(self, validator):
@@ -130,12 +131,10 @@ class TestToolCallValidator:
assert "ref" in params
assert any("reff" in m and "ref" in m for m in msgs)
def test_circuit_breaker(self, validator):
# Fail 3 times
for _ in range(3):
validator.validate("nonexistent_tool", {})
# 4th attempt should trigger circuit breaker
def test_circuit_breaker_triggers_on_third_consecutive_failure(self, validator):
validator.validate("nonexistent_tool", {})
validator.validate("nonexistent_tool", {})
is_valid, corrected, params, msgs = validator.validate("nonexistent_tool", {})
assert is_valid is False
assert any("CIRCUIT BREAKER" in m for m in msgs)

View File

@@ -13,11 +13,9 @@ from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Tuple
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
HERMES_HOME = get_hermes_home()
HERMES_HOME = Path.home() / ".hermes"
AUDIT_DIR = HERMES_HOME / "audit"
# Credential patterns to detect and redact
@@ -34,14 +32,14 @@ CREDENTIAL_PATTERNS = [
(r"bearer\s+[a-zA-Z0-9._-]{20,}", "[REDACTED: Bearer token]"),
# Generic tokens/passwords
("(?:token|TOKEN|Token)[:=]\\s*['\"]?[a-zA-Z0-9._-]{20,}['\"]?", "[REDACTED: Token]"),
("(?:password|PASSWORD|Password)[:=]\\s*['\"]?[^\\s\"']{8,}['\"]?", "[REDACTED: Password]"),
("(?:secret|SECRET|Secret)[:=]\\s*['\"]?[a-zA-Z0-9._-]{20,}['\"]?", "[REDACTED: Secret]"),
("(?:api_key|API_KEY|apiKey|ApiKey)[:=]\\s*['\"]?[a-zA-Z0-9._-]{20,}['\"]?", "[REDACTED: API key]"),
(r"(?:token|TOKEN|Token)[:=]\s*["']?[a-zA-Z0-9._-]{20,}["']?", "[REDACTED: Token]"),
(r"(?:password|PASSWORD|Password)[:=]\s*["']?[^\s"']{8,}["']?", "[REDACTED: Password]"),
(r"(?:secret|SECRET|Secret)[:=]\s*["']?[a-zA-Z0-9._-]{20,}["']?", "[REDACTED: Secret]"),
(r"(?:api_key|API_KEY|apiKey|ApiKey)[:=]\s*["']?[a-zA-Z0-9._-]{20,}["']?", "[REDACTED: API key]"),
# AWS keys
(r"AKIA[0-9A-Z]{16}", "[REDACTED: AWS access key]"),
("(?:aws_secret_access_key|AWS_SECRET_ACCESS_KEY)[:=]\\s*['\"]?[a-zA-Z0-9/+=]{40}['\"]?", "[REDACTED: AWS secret]"),
(r"(?:aws_secret_access_key|AWS_SECRET_ACCESS_KEY)[:=]\s*["']?[a-zA-Z0-9/+=]{40}["']?", "[REDACTED: AWS secret]"),
# Private keys
(r"-----BEGIN (?:RSA |EC |OPENSSH )?PRIVATE KEY-----", "[REDACTED: Private key header]"),

View File

@@ -249,8 +249,7 @@ def detect_crisis(text: str) -> CrisisDetectionResult:
# ── Escalation Logging ────────────────────────────────────────────────────
BRIDGE_URL = os.environ.get("CRISIS_BRIDGE_URL", "")
_HERMES_HOME = os.environ.get("HERMES_HOME")
LOG_PATH = os.path.join(_HERMES_HOME or os.path.expanduser("~/.hermes"), "crisis_escalations.jsonl")
LOG_PATH = os.path.expanduser("~/.hermes/crisis_escalations.jsonl")
def _log_escalation(result: CrisisDetectionResult, text_preview: str = ""):

View File

@@ -10,10 +10,10 @@ Usage:
from tools.hardcoded_path_guard import check_path, validate_tool_args
# Check a single path
err = check_path("/Users/apayne/.hermes/config.yaml") # noqa: hardcoded-path-ok
err = check_path("/Users/apayne/.hermes/config.yaml")
# Validate all path-like args in a tool call
clean_args, warnings = validate_tool_args("read_file", {"path": "/home/user/file.txt"}) # noqa: hardcoded-path-ok
clean_args, warnings = validate_tool_args("read_file", {"path": "/home/user/file.txt"})
"""
import os

View File

@@ -14,11 +14,9 @@ from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict, field
from enum import Enum
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
TEMPLATE_DIR = get_hermes_home() / "session-templates"
TEMPLATE_DIR = Path.home() / ".hermes" / "session-templates"
class TaskType(Enum):
@@ -108,7 +106,7 @@ class Templates:
return TaskType.MIXED
def extract(self, session_id, max_n=10):
db = get_hermes_home() / "state.db"
db = Path.home() / ".hermes" / "state.db"
if not db.exists():
return []
try:

View File

@@ -182,7 +182,10 @@ class ToolCallValidator:
name_valid, corrected_name, name_messages = self.validate_tool_name(tool_name)
if not name_valid:
self._record_failure(tool_name)
failure_count = self._record_failure(tool_name)
if failure_count >= self.failure_threshold:
_, _, breaker_messages = self.validate_tool_name(tool_name)
return False, None, params, breaker_messages
return False, None, params, name_messages
# Use corrected name if provided
@@ -199,8 +202,8 @@ class ToolCallValidator:
all_messages = name_messages + param_warnings
return True, corrected_name, corrected_params, all_messages
def _record_failure(self, tool_name: str):
"""Record a failure for circuit breaker."""
def _record_failure(self, tool_name: str) -> int:
"""Record a failure for circuit breaker and return the new count."""
self.consecutive_failures[tool_name] = self.consecutive_failures.get(tool_name, 0) + 1
count = self.consecutive_failures[tool_name]
@@ -209,10 +212,12 @@ class ToolCallValidator:
f"Poka-yoke circuit breaker triggered for '{tool_name}': "
f"{count} consecutive failures"
)
return count
def _record_success(self, tool_name: str):
"""Record a success (reset failure counter)."""
self.consecutive_failures.pop(tool_name, None)
"""Record a success (reset consecutive failure streaks)."""
if self.consecutive_failures:
self.consecutive_failures.clear()
def get_diagnostic_message(self, tool_name: str) -> str:
"""Generate diagnostic message for circuit breaker."""