Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
969ef22f99 | ||
|
|
4674889c0f |
@@ -13,11 +13,9 @@ import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from hermes_constants import get_hermes_home
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
HERMES_HOME = get_hermes_home()
|
||||
HERMES_HOME = Path.home() / ".hermes"
|
||||
CHECKPOINT_DIR = HERMES_HOME / "checkpoints"
|
||||
CHARS_PER_TOKEN = 4
|
||||
|
||||
|
||||
@@ -56,7 +56,7 @@ VIOLATIONS = [
|
||||
"id": "expanduser-hermes",
|
||||
"name": "os.path.expanduser ~/.hermes (non-fallback)",
|
||||
"pattern": r'os\.path\.expanduser\(["\']~/.hermes',
|
||||
"exclude_with": r'#|HERMES_HOME',
|
||||
"exclude_with": r'#',
|
||||
"message": "Use `os.environ.get('HERMES_HOME', os.path.expanduser('~/.hermes'))` instead",
|
||||
},
|
||||
]
|
||||
|
||||
@@ -26,6 +26,28 @@ class TestHandleFunctionCall:
|
||||
assert "error" in result
|
||||
assert "agent loop" in result["error"].lower()
|
||||
|
||||
def test_invalid_tool_returns_structured_pokayoke_error_with_suggestion(self):
|
||||
result = json.loads(handle_function_call("broswer_type", {"ref": "@e1"}))
|
||||
assert result["pokayoke"] is True
|
||||
assert result["tool_name"] == "broswer_type"
|
||||
assert "Did you mean" in result["error"]
|
||||
|
||||
def test_parameter_typo_is_autocorrected_before_dispatch(self, monkeypatch):
|
||||
captured = {}
|
||||
|
||||
def fake_dispatch(name, args, **kwargs):
|
||||
captured["name"] = name
|
||||
captured["args"] = args
|
||||
return json.dumps({"ok": True})
|
||||
|
||||
monkeypatch.setattr("model_tools.registry.dispatch", fake_dispatch)
|
||||
|
||||
result = json.loads(handle_function_call("read_file", {"pathe": "test.txt"}))
|
||||
assert result == {"ok": True}
|
||||
assert captured["name"] == "read_file"
|
||||
assert captured["args"]["path"] == "test.txt"
|
||||
assert "pathe" not in captured["args"]
|
||||
|
||||
def test_unknown_tool_returns_error(self):
|
||||
result = json.loads(handle_function_call("totally_fake_tool_xyz", {}))
|
||||
assert "error" in result
|
||||
|
||||
@@ -114,8 +114,9 @@ class TestToolCallValidator:
|
||||
assert len(msgs) == 0
|
||||
|
||||
def test_invalid_tool_suggests(self, validator):
|
||||
is_valid, corrected, params, msgs = validator.validate("browser_typo", {"ref": "@e1"})
|
||||
is_valid, corrected, params, msgs = validator.validate("broswer_type", {"ref": "@e1"})
|
||||
assert is_valid is False
|
||||
assert corrected is None
|
||||
assert "browser_type" in str(msgs)
|
||||
|
||||
def test_auto_correct_tool_name(self, validator):
|
||||
@@ -130,12 +131,10 @@ class TestToolCallValidator:
|
||||
assert "ref" in params
|
||||
assert any("reff" in m and "ref" in m for m in msgs)
|
||||
|
||||
def test_circuit_breaker(self, validator):
|
||||
# Fail 3 times
|
||||
for _ in range(3):
|
||||
validator.validate("nonexistent_tool", {})
|
||||
|
||||
# 4th attempt should trigger circuit breaker
|
||||
def test_circuit_breaker_triggers_on_third_consecutive_failure(self, validator):
|
||||
validator.validate("nonexistent_tool", {})
|
||||
validator.validate("nonexistent_tool", {})
|
||||
|
||||
is_valid, corrected, params, msgs = validator.validate("nonexistent_tool", {})
|
||||
assert is_valid is False
|
||||
assert any("CIRCUIT BREAKER" in m for m in msgs)
|
||||
|
||||
@@ -13,11 +13,9 @@ from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Tuple
|
||||
|
||||
from hermes_constants import get_hermes_home
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
HERMES_HOME = get_hermes_home()
|
||||
HERMES_HOME = Path.home() / ".hermes"
|
||||
AUDIT_DIR = HERMES_HOME / "audit"
|
||||
|
||||
# Credential patterns to detect and redact
|
||||
@@ -34,14 +32,14 @@ CREDENTIAL_PATTERNS = [
|
||||
(r"bearer\s+[a-zA-Z0-9._-]{20,}", "[REDACTED: Bearer token]"),
|
||||
|
||||
# Generic tokens/passwords
|
||||
("(?:token|TOKEN|Token)[:=]\\s*['\"]?[a-zA-Z0-9._-]{20,}['\"]?", "[REDACTED: Token]"),
|
||||
("(?:password|PASSWORD|Password)[:=]\\s*['\"]?[^\\s\"']{8,}['\"]?", "[REDACTED: Password]"),
|
||||
("(?:secret|SECRET|Secret)[:=]\\s*['\"]?[a-zA-Z0-9._-]{20,}['\"]?", "[REDACTED: Secret]"),
|
||||
("(?:api_key|API_KEY|apiKey|ApiKey)[:=]\\s*['\"]?[a-zA-Z0-9._-]{20,}['\"]?", "[REDACTED: API key]"),
|
||||
(r"(?:token|TOKEN|Token)[:=]\s*["']?[a-zA-Z0-9._-]{20,}["']?", "[REDACTED: Token]"),
|
||||
(r"(?:password|PASSWORD|Password)[:=]\s*["']?[^\s"']{8,}["']?", "[REDACTED: Password]"),
|
||||
(r"(?:secret|SECRET|Secret)[:=]\s*["']?[a-zA-Z0-9._-]{20,}["']?", "[REDACTED: Secret]"),
|
||||
(r"(?:api_key|API_KEY|apiKey|ApiKey)[:=]\s*["']?[a-zA-Z0-9._-]{20,}["']?", "[REDACTED: API key]"),
|
||||
|
||||
# AWS keys
|
||||
(r"AKIA[0-9A-Z]{16}", "[REDACTED: AWS access key]"),
|
||||
("(?:aws_secret_access_key|AWS_SECRET_ACCESS_KEY)[:=]\\s*['\"]?[a-zA-Z0-9/+=]{40}['\"]?", "[REDACTED: AWS secret]"),
|
||||
(r"(?:aws_secret_access_key|AWS_SECRET_ACCESS_KEY)[:=]\s*["']?[a-zA-Z0-9/+=]{40}["']?", "[REDACTED: AWS secret]"),
|
||||
|
||||
# Private keys
|
||||
(r"-----BEGIN (?:RSA |EC |OPENSSH )?PRIVATE KEY-----", "[REDACTED: Private key header]"),
|
||||
|
||||
@@ -249,8 +249,7 @@ def detect_crisis(text: str) -> CrisisDetectionResult:
|
||||
# ── Escalation Logging ────────────────────────────────────────────────────
|
||||
|
||||
BRIDGE_URL = os.environ.get("CRISIS_BRIDGE_URL", "")
|
||||
_HERMES_HOME = os.environ.get("HERMES_HOME")
|
||||
LOG_PATH = os.path.join(_HERMES_HOME or os.path.expanduser("~/.hermes"), "crisis_escalations.jsonl")
|
||||
LOG_PATH = os.path.expanduser("~/.hermes/crisis_escalations.jsonl")
|
||||
|
||||
|
||||
def _log_escalation(result: CrisisDetectionResult, text_preview: str = ""):
|
||||
|
||||
@@ -10,10 +10,10 @@ Usage:
|
||||
from tools.hardcoded_path_guard import check_path, validate_tool_args
|
||||
|
||||
# Check a single path
|
||||
err = check_path("/Users/apayne/.hermes/config.yaml") # noqa: hardcoded-path-ok
|
||||
err = check_path("/Users/apayne/.hermes/config.yaml")
|
||||
|
||||
# Validate all path-like args in a tool call
|
||||
clean_args, warnings = validate_tool_args("read_file", {"path": "/home/user/file.txt"}) # noqa: hardcoded-path-ok
|
||||
clean_args, warnings = validate_tool_args("read_file", {"path": "/home/user/file.txt"})
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
@@ -14,11 +14,9 @@ from typing import Dict, List, Optional, Any
|
||||
from dataclasses import dataclass, asdict, field
|
||||
from enum import Enum
|
||||
|
||||
from hermes_constants import get_hermes_home
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
TEMPLATE_DIR = get_hermes_home() / "session-templates"
|
||||
TEMPLATE_DIR = Path.home() / ".hermes" / "session-templates"
|
||||
|
||||
|
||||
class TaskType(Enum):
|
||||
@@ -108,7 +106,7 @@ class Templates:
|
||||
return TaskType.MIXED
|
||||
|
||||
def extract(self, session_id, max_n=10):
|
||||
db = get_hermes_home() / "state.db"
|
||||
db = Path.home() / ".hermes" / "state.db"
|
||||
if not db.exists():
|
||||
return []
|
||||
try:
|
||||
|
||||
@@ -182,7 +182,10 @@ class ToolCallValidator:
|
||||
name_valid, corrected_name, name_messages = self.validate_tool_name(tool_name)
|
||||
|
||||
if not name_valid:
|
||||
self._record_failure(tool_name)
|
||||
failure_count = self._record_failure(tool_name)
|
||||
if failure_count >= self.failure_threshold:
|
||||
_, _, breaker_messages = self.validate_tool_name(tool_name)
|
||||
return False, None, params, breaker_messages
|
||||
return False, None, params, name_messages
|
||||
|
||||
# Use corrected name if provided
|
||||
@@ -199,8 +202,8 @@ class ToolCallValidator:
|
||||
all_messages = name_messages + param_warnings
|
||||
return True, corrected_name, corrected_params, all_messages
|
||||
|
||||
def _record_failure(self, tool_name: str):
|
||||
"""Record a failure for circuit breaker."""
|
||||
def _record_failure(self, tool_name: str) -> int:
|
||||
"""Record a failure for circuit breaker and return the new count."""
|
||||
self.consecutive_failures[tool_name] = self.consecutive_failures.get(tool_name, 0) + 1
|
||||
count = self.consecutive_failures[tool_name]
|
||||
|
||||
@@ -209,10 +212,12 @@ class ToolCallValidator:
|
||||
f"Poka-yoke circuit breaker triggered for '{tool_name}': "
|
||||
f"{count} consecutive failures"
|
||||
)
|
||||
return count
|
||||
|
||||
def _record_success(self, tool_name: str):
|
||||
"""Record a success (reset failure counter)."""
|
||||
self.consecutive_failures.pop(tool_name, None)
|
||||
"""Record a success (reset consecutive failure streaks)."""
|
||||
if self.consecutive_failures:
|
||||
self.consecutive_failures.clear()
|
||||
|
||||
def get_diagnostic_message(self, tool_name: str) -> str:
|
||||
"""Generate diagnostic message for circuit breaker."""
|
||||
|
||||
Reference in New Issue
Block a user