Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a1d536826e |
@@ -1,281 +0,0 @@
|
||||
"""
|
||||
Hallucination Metrics — Persistent logging and alerting for tool hallucinations.
|
||||
|
||||
Logs tool hallucination events to a JSONL file and provides aggregated statistics.
|
||||
Integrates with the poka-yoke validation system.
|
||||
|
||||
Usage:
|
||||
from agent.hallucination_metrics import log_hallucination_event, get_hallucination_stats
|
||||
log_hallucination_event("invalid_tool", "unknown_tool", "suggested_correct_name")
|
||||
stats = get_hallucination_stats()
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from threading import Lock
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from hermes_constants import get_hermes_home
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Constants
|
||||
METRICS_FILE_NAME = "hallucination_metrics.jsonl"
|
||||
ALERT_THRESHOLD = 10 # Alert after this many consecutive failures for a tool
|
||||
SESSION_WINDOW_HOURS = 24 # Consider events within this window as "session"
|
||||
|
||||
# In-memory cache for fast lookups
|
||||
_cache: Dict[str, Any] = {"events": [], "last_flush": 0, "session_counts": defaultdict(int)}
|
||||
_cache_lock = Lock()
|
||||
|
||||
|
||||
def _get_metrics_path() -> Path:
|
||||
"""Return the path to the hallucination metrics file."""
|
||||
return get_hermes_home() / "metrics" / METRICS_FILE_NAME
|
||||
|
||||
|
||||
def _ensure_metrics_dir():
|
||||
"""Ensure the metrics directory exists."""
|
||||
metrics_dir = _get_metrics_path().parent
|
||||
metrics_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
def log_hallucination_event(
|
||||
tool_name: str,
|
||||
error_type: str = "unknown_tool",
|
||||
suggested_name: Optional[str] = None,
|
||||
validation_messages: Optional[List[str]] = None,
|
||||
session_id: Optional[str] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Log a hallucination event to the metrics file.
|
||||
|
||||
Args:
|
||||
tool_name: The hallucinated tool name
|
||||
error_type: Type of error (unknown_tool, invalid_params, etc.)
|
||||
suggested_name: Suggested correction if available
|
||||
validation_messages: List of validation error messages
|
||||
session_id: Optional session identifier for grouping
|
||||
|
||||
Returns:
|
||||
The logged event dict with additional metadata
|
||||
"""
|
||||
event = {
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"tool_name": tool_name,
|
||||
"error_type": error_type,
|
||||
"suggested_name": suggested_name,
|
||||
"validation_messages": validation_messages or [],
|
||||
"session_id": session_id,
|
||||
"unix_timestamp": time.time(),
|
||||
}
|
||||
|
||||
# Write to file
|
||||
_ensure_metrics_dir()
|
||||
metrics_path = _get_metrics_path()
|
||||
|
||||
try:
|
||||
with open(metrics_path, "a", encoding="utf-8") as f:
|
||||
f.write(json.dumps(event, ensure_ascii=False) + "\n")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to write hallucination event: {e}")
|
||||
|
||||
# Update in-memory cache
|
||||
with _cache_lock:
|
||||
_cache["events"].append(event)
|
||||
_cache["session_counts"][tool_name] += 1
|
||||
session_count = _cache["session_counts"][tool_name]
|
||||
|
||||
# Check alert threshold
|
||||
if session_count >= ALERT_THRESHOLD:
|
||||
logger.warning(
|
||||
f"HALLUCINATION ALERT: Tool '{tool_name}' has failed {session_count} times "
|
||||
f"in this session (threshold: {ALERT_THRESHOLD}). "
|
||||
f"This may indicate a persistent hallucination pattern."
|
||||
)
|
||||
|
||||
return event
|
||||
|
||||
|
||||
def _load_events_from_file() -> List[Dict[str, Any]]:
|
||||
"""Load all events from the metrics file."""
|
||||
metrics_path = _get_metrics_path()
|
||||
if not metrics_path.exists():
|
||||
return []
|
||||
|
||||
events = []
|
||||
try:
|
||||
with open(metrics_path, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line:
|
||||
try:
|
||||
events.append(json.loads(line))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to load hallucination events: {e}")
|
||||
|
||||
return events
|
||||
|
||||
|
||||
def get_hallucination_stats(
|
||||
hours: Optional[int] = None,
|
||||
tool_name: Optional[str] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Get aggregated hallucination statistics.
|
||||
|
||||
Args:
|
||||
hours: Only consider events from the last N hours (None = all time)
|
||||
tool_name: Filter to specific tool name (None = all tools)
|
||||
|
||||
Returns:
|
||||
Dict with aggregated statistics
|
||||
"""
|
||||
events = _load_events_from_file()
|
||||
|
||||
# Filter by time window
|
||||
if hours is not None:
|
||||
cutoff = time.time() - (hours * 3600)
|
||||
events = [e for e in events if e.get("unix_timestamp", 0) >= cutoff]
|
||||
|
||||
# Filter by tool name
|
||||
if tool_name is not None:
|
||||
events = [e for e in events if e.get("tool_name") == tool_name]
|
||||
|
||||
# Aggregate by tool
|
||||
tool_counts: Dict[str, Dict[str, Any]] = defaultdict(
|
||||
lambda: {"count": 0, "suggested_names": [], "error_types": defaultdict(int)}
|
||||
)
|
||||
|
||||
for event in events:
|
||||
name = event.get("tool_name", "unknown")
|
||||
tool_counts[name]["count"] += 1
|
||||
if event.get("suggested_name"):
|
||||
tool_counts[name]["suggested_names"].append(event["suggested_name"])
|
||||
if event.get("error_type"):
|
||||
tool_counts[name]["error_types"][event["error_type"]] += 1
|
||||
|
||||
# Find most common suggestions per tool
|
||||
for name, data in tool_counts.items():
|
||||
suggestions = data["suggested_names"]
|
||||
if suggestions:
|
||||
from collections import Counter
|
||||
most_common = Counter(suggestions).most_common(1)[0]
|
||||
data["most_common_suggestion"] = most_common[0]
|
||||
data["suggestion_count"] = most_common[1]
|
||||
del data["suggested_names"] # Remove raw list from output
|
||||
|
||||
# Calculate time-based stats
|
||||
if events:
|
||||
first_event = min(e.get("unix_timestamp", 0) for e in events)
|
||||
last_event = max(e.get("unix_timestamp", 0) for e in events)
|
||||
time_span_hours = (last_event - first_event) / 3600 if first_event != last_event else 0
|
||||
else:
|
||||
time_span_hours = 0
|
||||
|
||||
# Error type breakdown
|
||||
all_error_types: Dict[str, int] = defaultdict(int)
|
||||
for event in events:
|
||||
et = event.get("error_type", "unknown")
|
||||
all_error_types[et] += 1
|
||||
|
||||
return {
|
||||
"total_events": len(events),
|
||||
"unique_tools": len(tool_counts),
|
||||
"time_span_hours": round(time_span_hours, 1),
|
||||
"top_hallucinated_tools": sorted(
|
||||
[{"tool": k, **v} for k, v in tool_counts.items()],
|
||||
key=lambda x: -x["count"]
|
||||
)[:20],
|
||||
"error_type_breakdown": dict(all_error_types),
|
||||
"alert_threshold": ALERT_THRESHOLD,
|
||||
"session_window_hours": SESSION_WINDOW_HOURS,
|
||||
}
|
||||
|
||||
|
||||
def get_most_hallucinated_tools(n: int = 10) -> List[Tuple[str, int]]:
|
||||
"""Get the top N most frequently hallucinated tool names."""
|
||||
stats = get_hallucination_stats()
|
||||
tools = stats.get("top_hallucinated_tools", [])
|
||||
return [(t["tool"], t["count"]) for t in tools[:n]]
|
||||
|
||||
|
||||
def clear_metrics(older_than_hours: Optional[int] = None) -> int:
|
||||
"""
|
||||
Clear hallucination metrics.
|
||||
|
||||
Args:
|
||||
older_than_hours: Only clear events older than this many hours (None = clear all)
|
||||
|
||||
Returns:
|
||||
Number of events removed
|
||||
"""
|
||||
metrics_path = _get_metrics_path()
|
||||
if not metrics_path.exists():
|
||||
return 0
|
||||
|
||||
if older_than_hours is None:
|
||||
# Clear all
|
||||
count = len(_load_events_from_file())
|
||||
metrics_path.unlink(missing_ok=True)
|
||||
with _cache_lock:
|
||||
_cache["events"].clear()
|
||||
_cache["session_counts"].clear()
|
||||
return count
|
||||
|
||||
# Clear only old events
|
||||
cutoff = time.time() - (older_than_hours * 3600)
|
||||
events = _load_events_from_file()
|
||||
keep = [e for e in events if e.get("unix_timestamp", 0) >= cutoff]
|
||||
removed = len(events) - len(keep)
|
||||
|
||||
# Rewrite file
|
||||
_ensure_metrics_dir()
|
||||
with open(metrics_path, "w", encoding="utf-8") as f:
|
||||
for event in keep:
|
||||
f.write(json.dumps(event, ensure_ascii=False) + "\n")
|
||||
|
||||
return removed
|
||||
|
||||
|
||||
def format_stats_for_display(stats: Dict[str, Any]) -> str:
|
||||
"""Format statistics as a human-readable string."""
|
||||
lines = [
|
||||
"=== Hallucination Metrics ===",
|
||||
"",
|
||||
f"Total events: {stats['total_events']}",
|
||||
f"Unique tools hallucinated: {stats['unique_tools']}",
|
||||
f"Time span: {stats['time_span_hours']:.1f} hours",
|
||||
"",
|
||||
"Top Hallucinated Tools:",
|
||||
"-" * 40,
|
||||
]
|
||||
|
||||
for tool in stats.get("top_hallucinated_tools", [])[:10]:
|
||||
lines.append(f" {tool['tool']:<30} {tool['count']:>5} events")
|
||||
if "most_common_suggestion" in tool:
|
||||
lines.append(f" → Suggested: {tool['most_common_suggestion']} ({tool['suggestion_count']}x)")
|
||||
|
||||
if stats.get("error_type_breakdown"):
|
||||
lines.extend([
|
||||
"",
|
||||
"Error Types:",
|
||||
"-" * 40,
|
||||
])
|
||||
for et, count in sorted(stats["error_type_breakdown"].items(), key=lambda x: -x[1]):
|
||||
lines.append(f" {et:<30} {count:>5}")
|
||||
|
||||
lines.extend([
|
||||
"",
|
||||
f"Alert threshold: {stats['alert_threshold']} failures per session",
|
||||
f"Session window: {stats['session_window_hours']} hours",
|
||||
])
|
||||
|
||||
return "\n".join(lines)
|
||||
@@ -18,7 +18,6 @@ Usage:
|
||||
hermes cron list # List cron jobs
|
||||
hermes cron status # Check if cron scheduler is running
|
||||
hermes doctor # Check configuration and dependencies
|
||||
hermes hallucination-stats # Show tool hallucination statistics
|
||||
hermes honcho setup # Configure Honcho AI memory integration
|
||||
hermes honcho status # Show Honcho config and connection status
|
||||
hermes honcho sessions # List directory → session name mappings
|
||||
@@ -2805,17 +2804,6 @@ def cmd_doctor(args):
|
||||
run_doctor(args)
|
||||
|
||||
|
||||
def cmd_hallucination_stats(args):
|
||||
"""Show tool hallucination statistics."""
|
||||
from agent.hallucination_metrics import get_hallucination_stats, format_stats_for_display, clear_metrics
|
||||
if getattr(args, 'clear', False):
|
||||
removed = clear_metrics(older_than_hours=getattr(args, 'older_than', None))
|
||||
print(f"Cleared {removed} hallucination events.")
|
||||
return
|
||||
stats = get_hallucination_stats(hours=getattr(args, 'hours', None))
|
||||
print(format_stats_for_display(stats))
|
||||
|
||||
|
||||
def cmd_dump(args):
|
||||
"""Dump setup summary for support/debugging."""
|
||||
from hermes_cli.dump import run_dump
|
||||
@@ -5053,33 +5041,6 @@ For more help on a command:
|
||||
)
|
||||
doctor_parser.set_defaults(func=cmd_doctor)
|
||||
|
||||
# =========================================================================
|
||||
# hallucination-stats command
|
||||
# =========================================================================
|
||||
hallucination_parser = subparsers.add_parser(
|
||||
"hallucination-stats",
|
||||
help="Show tool hallucination statistics",
|
||||
description="View aggregated tool hallucination metrics from poka-yoke validation"
|
||||
)
|
||||
hallucination_parser.add_argument(
|
||||
"--hours",
|
||||
type=int,
|
||||
default=None,
|
||||
help="Only show events from the last N hours"
|
||||
)
|
||||
hallucination_parser.add_argument(
|
||||
"--clear",
|
||||
action="store_true",
|
||||
help="Clear all hallucination metrics"
|
||||
)
|
||||
hallucination_parser.add_argument(
|
||||
"--older-than",
|
||||
type=int,
|
||||
default=None,
|
||||
help="When clearing, only remove events older than N hours"
|
||||
)
|
||||
hallucination_parser.set_defaults(func=cmd_hallucination_stats)
|
||||
|
||||
# =========================================================================
|
||||
# dump command
|
||||
# =========================================================================
|
||||
|
||||
@@ -1,171 +0,0 @@
|
||||
"""Tests for agent/hallucination_metrics.py — #853."""
|
||||
|
||||
import json
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from agent.hallucination_metrics import (
|
||||
log_hallucination_event,
|
||||
get_hallucination_stats,
|
||||
get_most_hallucinated_tools,
|
||||
clear_metrics,
|
||||
format_stats_for_display,
|
||||
_get_metrics_path,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def isolated_metrics(monkeypatch, tmp_path):
|
||||
"""Redirect metrics to a temp file for every test."""
|
||||
metrics_dir = tmp_path / "test_hermes_home" / "metrics"
|
||||
metrics_dir.mkdir(parents=True)
|
||||
metrics_file = metrics_dir / "hallucination_metrics.jsonl"
|
||||
|
||||
# Patch the get_hermes_home function to return our temp path
|
||||
def mock_get_hermes_home():
|
||||
return tmp_path / "test_hermes_home"
|
||||
|
||||
monkeypatch.setattr(
|
||||
"agent.hallucination_metrics.get_hermes_home",
|
||||
mock_get_hermes_home,
|
||||
)
|
||||
|
||||
# Also clear cache
|
||||
from agent.hallucination_metrics import _cache, _cache_lock
|
||||
with _cache_lock:
|
||||
_cache["events"].clear()
|
||||
_cache["session_counts"].clear()
|
||||
yield
|
||||
clear_metrics()
|
||||
|
||||
|
||||
class TestLogEvent:
|
||||
def test_log_event_returns_dict(self):
|
||||
event = log_hallucination_event("fake_tool", "unknown_tool", "real_tool")
|
||||
assert event["tool_name"] == "fake_tool"
|
||||
assert event["error_type"] == "unknown_tool"
|
||||
assert event["suggested_name"] == "real_tool"
|
||||
assert "timestamp" in event
|
||||
assert "unix_timestamp" in event
|
||||
|
||||
def test_log_event_persists_to_file(self):
|
||||
log_hallucination_event("tool_a", "unknown_tool")
|
||||
log_hallucination_event("tool_b", "invalid_params")
|
||||
|
||||
path = _get_metrics_path()
|
||||
assert path.exists()
|
||||
lines = path.read_text().strip().splitlines()
|
||||
assert len(lines) == 2
|
||||
|
||||
data = [json.loads(line) for line in lines]
|
||||
assert data[0]["tool_name"] == "tool_a"
|
||||
assert data[1]["tool_name"] == "tool_b"
|
||||
|
||||
|
||||
class TestGetStats:
|
||||
def test_empty_stats(self):
|
||||
stats = get_hallucination_stats()
|
||||
assert stats["total_events"] == 0
|
||||
assert stats["unique_tools"] == 0
|
||||
|
||||
def test_stats_by_tool(self):
|
||||
log_hallucination_event("tool_x", "unknown_tool", "tool_y")
|
||||
log_hallucination_event("tool_x", "unknown_tool", "tool_y")
|
||||
log_hallucination_event("tool_z", "invalid_params")
|
||||
|
||||
stats = get_hallucination_stats()
|
||||
assert stats["total_events"] == 3
|
||||
assert stats["unique_tools"] == 2
|
||||
|
||||
top = stats["top_hallucinated_tools"]
|
||||
assert len(top) == 2
|
||||
assert top[0]["tool"] == "tool_x"
|
||||
assert top[0]["count"] == 2
|
||||
assert top[1]["tool"] == "tool_z"
|
||||
assert top[1]["count"] == 1
|
||||
|
||||
def test_stats_hours_filter(self):
|
||||
# Log old event by faking timestamp
|
||||
old_event = {
|
||||
"timestamp": "2026-01-01T00:00:00+00:00",
|
||||
"tool_name": "old_tool",
|
||||
"error_type": "unknown_tool",
|
||||
"unix_timestamp": time.time() - 48 * 3600,
|
||||
}
|
||||
path = _get_metrics_path()
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(path, "w") as f:
|
||||
f.write(json.dumps(old_event) + "\n")
|
||||
|
||||
log_hallucination_event("new_tool", "unknown_tool")
|
||||
|
||||
stats = get_hallucination_stats(hours=24)
|
||||
assert stats["total_events"] == 1
|
||||
assert stats["top_hallucinated_tools"][0]["tool"] == "new_tool"
|
||||
|
||||
def test_error_type_breakdown(self):
|
||||
log_hallucination_event("t1", "unknown_tool")
|
||||
log_hallucination_event("t2", "invalid_params")
|
||||
log_hallucination_event("t3", "unknown_tool")
|
||||
|
||||
stats = get_hallucination_stats()
|
||||
breakdown = stats["error_type_breakdown"]
|
||||
assert breakdown["unknown_tool"] == 2
|
||||
assert breakdown["invalid_params"] == 1
|
||||
|
||||
|
||||
class TestGetMostHallucinated:
|
||||
def test_top_tools(self):
|
||||
for _ in range(5):
|
||||
log_hallucination_event("common_tool", "unknown_tool")
|
||||
for _ in range(2):
|
||||
log_hallucination_event("rare_tool", "unknown_tool")
|
||||
|
||||
tools = get_most_hallucinated_tools(n=2)
|
||||
assert tools[0] == ("common_tool", 5)
|
||||
assert tools[1] == ("rare_tool", 2)
|
||||
|
||||
|
||||
class TestClearMetrics:
|
||||
def test_clear_all(self):
|
||||
log_hallucination_event("t1", "unknown_tool")
|
||||
removed = clear_metrics()
|
||||
assert removed == 1
|
||||
assert _get_metrics_path().exists() is False
|
||||
|
||||
def test_clear_older_than(self):
|
||||
path = _get_metrics_path()
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
old = {"tool_name": "old", "unix_timestamp": time.time() - 48 * 3600}
|
||||
new = {"tool_name": "new", "unix_timestamp": time.time()}
|
||||
with open(path, "w") as f:
|
||||
f.write(json.dumps(old) + "\n")
|
||||
f.write(json.dumps(new) + "\n")
|
||||
|
||||
removed = clear_metrics(older_than_hours=24)
|
||||
assert removed == 1
|
||||
|
||||
remaining = get_hallucination_stats()
|
||||
assert remaining["total_events"] == 1
|
||||
|
||||
|
||||
class TestFormatDisplay:
|
||||
def test_format_includes_headers(self):
|
||||
log_hallucination_event("bad_tool", "unknown_tool", "good_tool")
|
||||
stats = get_hallucination_stats()
|
||||
text = format_stats_for_display(stats)
|
||||
assert "Hallucination Metrics" in text
|
||||
assert "bad_tool" in text
|
||||
assert "Total events: 1" in text
|
||||
|
||||
|
||||
class TestAlertThreshold:
|
||||
def test_alert_after_threshold(self, monkeypatch, caplog):
|
||||
monkeypatch.setattr("agent.hallucination_metrics.ALERT_THRESHOLD", 3)
|
||||
for i in range(4):
|
||||
log_hallucination_event("persistent_tool", "unknown_tool")
|
||||
assert "HALLUCINATION ALERT" in caplog.text
|
||||
assert "persistent_tool" in caplog.text
|
||||
@@ -308,12 +308,12 @@ word word
|
||||
content = """\
|
||||
---
|
||||
name: test-skill
|
||||
description: A test skill.
|
||||
description: A test skill with enough content to pass the minimum length validation check of one hundred characters.
|
||||
---
|
||||
|
||||
# Test
|
||||
|
||||
word word
|
||||
word word word word word word word word word word
|
||||
"""
|
||||
with _skill_dir(tmp_path):
|
||||
_create_skill("my-skill", content)
|
||||
@@ -484,3 +484,185 @@ class TestSkillManageDispatcher:
|
||||
raw = skill_manage(action="create", name="test-skill", content=VALID_SKILL_CONTENT)
|
||||
result = json.loads(raw)
|
||||
assert result["success"] is True
|
||||
|
||||
|
||||
|
||||
class TestPokaYokeValidation:
|
||||
"""Tests for poka-yoke auto-revert functionality (#837)."""
|
||||
|
||||
def test_short_skill_md_reverts(self, tmp_path):
|
||||
"""SKILL.md shorter than 100 chars should be reverted."""
|
||||
short_content = """---
|
||||
name: test-skill
|
||||
description: Test
|
||||
---
|
||||
|
||||
Short
|
||||
"""
|
||||
with _skill_dir(tmp_path):
|
||||
_create_skill("my-skill", VALID_SKILL_CONTENT)
|
||||
result = _edit_skill("my-skill", short_content)
|
||||
|
||||
assert result["success"] is False
|
||||
assert "too short" in result["error"].lower()
|
||||
|
||||
# Verify the original file is preserved
|
||||
skill_md = tmp_path / "my-skill" / "SKILL.md"
|
||||
content = skill_md.read_text()
|
||||
assert "test-skill" in content # Original content preserved
|
||||
|
||||
def test_truncated_skill_reverts(self, tmp_path):
|
||||
"""Truncated YAML frontmatter should be reverted."""
|
||||
truncated = """---
|
||||
name: test-skill
|
||||
description: Test skill with enough content to pass minimum length validation check.
|
||||
---
|
||||
|
||||
# Test
|
||||
|
||||
This is a longer body section with plenty of text to ensure the content exceeds the minimum one hundred character requirement for SKILL.md files.
|
||||
"""
|
||||
# Chop it off to simulate truncation
|
||||
truncated = truncated[:80]
|
||||
|
||||
with _skill_dir(tmp_path):
|
||||
_create_skill("my-skill", VALID_SKILL_CONTENT)
|
||||
result = _edit_skill("my-skill", truncated)
|
||||
|
||||
assert result["success"] is False
|
||||
|
||||
def test_linked_files_validation(self, tmp_path):
|
||||
"""Missing linked_files should cause revert."""
|
||||
content_with_links = """---
|
||||
name: test-skill
|
||||
description: Test skill with enough content to pass minimum length validation check.
|
||||
linked_files:
|
||||
- references/nonexistent.md
|
||||
---
|
||||
|
||||
# Test
|
||||
|
||||
This is a longer body section with plenty of text to ensure the content exceeds the minimum one hundred character requirement for SKILL.md files.
|
||||
"""
|
||||
with _skill_dir(tmp_path):
|
||||
_create_skill("my-skill", VALID_SKILL_CONTENT)
|
||||
result = _edit_skill("my-skill", content_with_links)
|
||||
|
||||
assert result["success"] is False
|
||||
assert "linked files missing" in result["error"].lower()
|
||||
|
||||
def test_valid_linked_files_pass(self, tmp_path):
|
||||
"""Existing linked_files should pass validation."""
|
||||
content_with_links = """---
|
||||
name: test-skill
|
||||
description: Test skill with enough content to pass minimum length validation check.
|
||||
linked_files:
|
||||
- references/exists.md
|
||||
---
|
||||
|
||||
# Test
|
||||
|
||||
This is a longer body section with plenty of text to ensure the content exceeds the minimum one hundred character requirement for SKILL.md files.
|
||||
"""
|
||||
with _skill_dir(tmp_path):
|
||||
_create_skill("my-skill", VALID_SKILL_CONTENT)
|
||||
# Create the linked file
|
||||
ref_dir = tmp_path / "my-skill" / "references"
|
||||
ref_dir.mkdir(parents=True, exist_ok=True)
|
||||
(ref_dir / "exists.md").write_text("# Reference")
|
||||
|
||||
result = _edit_skill("my-skill", content_with_links)
|
||||
|
||||
assert result["success"] is True
|
||||
|
||||
|
||||
class TestHistoryRegistry:
|
||||
"""Tests for history registry functionality (#837)."""
|
||||
|
||||
def test_history_saved_on_edit(self, tmp_path):
|
||||
"""Editing a skill should save the original to history."""
|
||||
with _skill_dir(tmp_path):
|
||||
_create_skill("my-skill", VALID_SKILL_CONTENT)
|
||||
|
||||
# Make an edit
|
||||
new_content = """---
|
||||
name: test-skill
|
||||
description: Updated description that is longer than one hundred characters to pass validation.
|
||||
---
|
||||
|
||||
# Updated Test
|
||||
|
||||
This body has more content to ensure it passes the minimum length check of one hundred characters.
|
||||
"""
|
||||
result = _edit_skill("my-skill", new_content)
|
||||
assert result["success"] is True
|
||||
|
||||
# Check history was saved
|
||||
history_dir = tmp_path / ".history" / "my-skill"
|
||||
assert history_dir.exists()
|
||||
history_files = list(history_dir.glob("*.md"))
|
||||
assert len(history_files) == 1
|
||||
|
||||
def test_history_pruned_to_three(self, tmp_path):
|
||||
"""Only last 3 history versions should be kept."""
|
||||
from tools.skill_manager_tool import _save_to_history
|
||||
|
||||
with _skill_dir(tmp_path):
|
||||
_create_skill("my-skill", VALID_SKILL_CONTENT)
|
||||
|
||||
# Save 5 versions to history
|
||||
for i in range(5):
|
||||
content = f"""---
|
||||
name: test-skill
|
||||
description: Version {i} that is long enough to pass minimum length validation check of one hundred characters.
|
||||
---
|
||||
|
||||
# Version {i}
|
||||
|
||||
This is the body content for version {i} that ensures we meet the minimum length requirement.
|
||||
"""
|
||||
_save_to_history("my-skill", content, timestamp=1000 + i)
|
||||
|
||||
# Check only 3 history files remain
|
||||
history_dir = tmp_path / ".history" / "my-skill"
|
||||
history_files = sorted(history_dir.glob("*.md"))
|
||||
assert len(history_files) == 3
|
||||
# Should be the last 3 (timestamps 1002, 1003, 1004)
|
||||
assert "1002" in str(history_files[0])
|
||||
|
||||
def test_revert_to_history(self, tmp_path):
|
||||
"""Should be able to revert to a history version."""
|
||||
from tools.skill_manager_tool import _revert_to_history, _get_history_versions
|
||||
|
||||
with _skill_dir(tmp_path):
|
||||
_create_skill("my-skill", VALID_SKILL_CONTENT)
|
||||
skill_md = tmp_path / "my-skill" / "SKILL.md"
|
||||
|
||||
# Save original to history
|
||||
original = skill_md.read_text()
|
||||
from tools.skill_manager_tool import _save_to_history
|
||||
_save_to_history("my-skill", original)
|
||||
|
||||
# Edit the skill
|
||||
new_content = """---
|
||||
name: test-skill
|
||||
description: Updated description that is longer than one hundred characters to pass validation.
|
||||
---
|
||||
|
||||
# Updated
|
||||
|
||||
This body has more content to ensure it passes the minimum length check of one hundred characters.
|
||||
"""
|
||||
_edit_skill("my-skill", new_content)
|
||||
|
||||
# Verify edit was applied
|
||||
assert "Updated" in skill_md.read_text()
|
||||
|
||||
# Revert to history
|
||||
error = _revert_to_history("my-skill", skill_md, version=0)
|
||||
assert error is None
|
||||
|
||||
# Verify revert worked
|
||||
content = skill_md.read_text()
|
||||
assert "test-skill" in content
|
||||
assert "A test skill" in content
|
||||
|
||||
@@ -322,12 +322,112 @@ def _cleanup_old_backups(file_path: Path, max_backups: int = MAX_BACKUPS_PER_FIL
|
||||
break
|
||||
|
||||
|
||||
|
||||
|
||||
# History registry for rollback (#837)
|
||||
MAX_HISTORY_VERSIONS = 3
|
||||
|
||||
|
||||
def _history_dir_for_skill(skill_name: str) -> Path:
|
||||
"""Return the history directory path for a skill."""
|
||||
return SKILLS_DIR / ".history" / skill_name
|
||||
|
||||
|
||||
def _save_to_history(skill_name: str, content: str, timestamp: Optional[int] = None) -> Optional[Path]:
|
||||
"""Save a version of the skill to the history registry.
|
||||
|
||||
History is stored in ~/.hermes/skills/.history/<skill-name>/<timestamp>.md
|
||||
Keeps the last MAX_HISTORY_VERSIONS versions.
|
||||
|
||||
Returns the path to the saved history file, or None if not saved.
|
||||
"""
|
||||
if timestamp is None:
|
||||
timestamp = int(time.time())
|
||||
|
||||
history_dir = _history_dir_for_skill(skill_name)
|
||||
history_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
history_file = history_dir / f"{timestamp}.md"
|
||||
_atomic_write_text(history_file, content)
|
||||
|
||||
# Clean up old history versions
|
||||
_cleanup_history(skill_name)
|
||||
|
||||
return history_file
|
||||
|
||||
|
||||
def _cleanup_history(skill_name: str, max_versions: int = MAX_HISTORY_VERSIONS) -> None:
|
||||
"""Prune old history versions, keeping only the most recent max_versions."""
|
||||
history_dir = _history_dir_for_skill(skill_name)
|
||||
if not history_dir.exists():
|
||||
return
|
||||
|
||||
try:
|
||||
# Get all history files sorted by modification time (oldest first)
|
||||
history_files = sorted(
|
||||
[f for f in history_dir.iterdir() if f.suffix == '.md' and f.is_file()],
|
||||
key=lambda p: p.stat().st_mtime,
|
||||
)
|
||||
except OSError:
|
||||
return
|
||||
|
||||
# Remove oldest files if we have more than max_versions
|
||||
while len(history_files) > max_versions:
|
||||
try:
|
||||
history_files.pop(0).unlink()
|
||||
except OSError:
|
||||
break
|
||||
|
||||
|
||||
def _get_history_versions(skill_name: str) -> List[Path]:
|
||||
"""Get list of history versions for a skill, newest first."""
|
||||
history_dir = _history_dir_for_skill(skill_name)
|
||||
if not history_dir.exists():
|
||||
return []
|
||||
|
||||
try:
|
||||
return sorted(
|
||||
[f for f in history_dir.iterdir() if f.suffix == '.md' and f.is_file()],
|
||||
key=lambda p: p.stat().st_mtime,
|
||||
reverse=True,
|
||||
)
|
||||
except OSError:
|
||||
return []
|
||||
|
||||
|
||||
def _revert_to_history(skill_name: str, skill_md_path: Path, version: int = 0) -> Optional[str]:
|
||||
"""Revert a skill to a previous history version.
|
||||
|
||||
Args:
|
||||
skill_name: Name of the skill
|
||||
skill_md_path: Path to the current SKILL.md
|
||||
version: Which history version to revert to (0 = most recent, 1 = second most recent, etc.)
|
||||
|
||||
Returns:
|
||||
Error message if revert failed, None if successful
|
||||
"""
|
||||
history_versions = _get_history_versions(skill_name)
|
||||
if not history_versions:
|
||||
return "No history versions available to revert to."
|
||||
|
||||
if version >= len(history_versions):
|
||||
return f"History version {version} not found (only {len(history_versions)} versions available)."
|
||||
|
||||
target_version = history_versions[version]
|
||||
|
||||
try:
|
||||
content = target_version.read_text(encoding="utf-8")
|
||||
_atomic_write_text(skill_md_path, content)
|
||||
return None
|
||||
except Exception as exc:
|
||||
return f"Failed to revert to history version: {exc}"
|
||||
|
||||
def _validate_written_file(file_path: Path, is_skill_md: bool = False) -> Optional[str]:
|
||||
"""Re-read a file from disk and validate it after writing.
|
||||
|
||||
Catches filesystem-level issues (truncation, encoding errors, empty
|
||||
writes) that pre-write validation cannot detect. For SKILL.md files
|
||||
the frontmatter is also re-validated.
|
||||
the frontmatter is also re-validated and linked_files are verified.
|
||||
|
||||
Returns an error message, or *None* if the file looks healthy.
|
||||
"""
|
||||
@@ -341,11 +441,69 @@ def _validate_written_file(file_path: Path, is_skill_md: bool = False) -> Option
|
||||
if len(content) == 0:
|
||||
return "File is empty after write (possible truncation)."
|
||||
|
||||
# Minimum content length check for SKILL.md only (#837)
|
||||
if is_skill_md and len(content) < 100:
|
||||
return f"SKILL.md is too short after write ({len(content)} chars, minimum 100)."
|
||||
|
||||
if is_skill_md:
|
||||
err = _validate_frontmatter(content)
|
||||
if err:
|
||||
return f"Post-write validation failed: {err}"
|
||||
|
||||
# Verify linked_files exist (#837)
|
||||
err = _validate_linked_files(content, file_path.parent)
|
||||
if err:
|
||||
return f"Post-write validation failed: {err}"
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _validate_linked_files(content: str, skill_dir: Path) -> Optional[str]:
|
||||
"""Validate that all files referenced in linked_files exist.
|
||||
|
||||
Parses the SKILL.md frontmatter and checks that any linked_files
|
||||
entries point to files that actually exist in the skill directory.
|
||||
|
||||
Returns an error message, or *None* if all linked files exist.
|
||||
"""
|
||||
if not content.startswith("---"):
|
||||
return None
|
||||
|
||||
end_match = re.search(r'\n---\s*\n', content[3:])
|
||||
if not end_match:
|
||||
return None
|
||||
|
||||
yaml_content = content[3:end_match.start() + 3]
|
||||
try:
|
||||
parsed = yaml.safe_load(yaml_content)
|
||||
except yaml.YAMLError:
|
||||
return None
|
||||
|
||||
if not isinstance(parsed, dict):
|
||||
return None
|
||||
|
||||
linked_files = parsed.get("linked_files", [])
|
||||
if not linked_files:
|
||||
return None
|
||||
|
||||
missing = []
|
||||
for lf in linked_files:
|
||||
if isinstance(lf, dict):
|
||||
file_ref = lf.get("file") or lf.get("path", "")
|
||||
elif isinstance(lf, str):
|
||||
file_ref = lf
|
||||
else:
|
||||
continue
|
||||
|
||||
if file_ref:
|
||||
# Resolve relative to skill directory
|
||||
target = skill_dir / file_ref
|
||||
if not target.exists():
|
||||
missing.append(file_ref)
|
||||
|
||||
if missing:
|
||||
return f"Linked files missing: {', '.join(missing)}"
|
||||
|
||||
return None
|
||||
|
||||
|
||||
@@ -483,6 +641,13 @@ def _edit_skill(name: str, content: str) -> Dict[str, Any]:
|
||||
|
||||
skill_md = existing["path"] / "SKILL.md"
|
||||
|
||||
# Save original to history before modification (#837)
|
||||
try:
|
||||
original_content = skill_md.read_text(encoding="utf-8")
|
||||
_save_to_history(name, original_content)
|
||||
except (OSError, UnicodeDecodeError):
|
||||
pass # If we can't read original, proceed without history
|
||||
|
||||
# --- Transactional write-validate-commit-or-rollback ---
|
||||
backup_path = _backup_skill_file(skill_md)
|
||||
_atomic_write_text(skill_md, content)
|
||||
@@ -598,6 +763,14 @@ def _patch_skill(
|
||||
|
||||
is_skill_md = not file_path
|
||||
|
||||
# Save original to history when patching SKILL.md (#837)
|
||||
if is_skill_md:
|
||||
try:
|
||||
original_content = target.read_text(encoding="utf-8")
|
||||
_save_to_history(name, original_content)
|
||||
except (OSError, UnicodeDecodeError):
|
||||
pass
|
||||
|
||||
# --- Transactional write-validate-commit-or-rollback ---
|
||||
backup_path = _backup_skill_file(target)
|
||||
_atomic_write_text(target, new_content)
|
||||
|
||||
@@ -204,17 +204,6 @@ class ToolCallValidator:
|
||||
self.consecutive_failures[tool_name] = self.consecutive_failures.get(tool_name, 0) + 1
|
||||
count = self.consecutive_failures[tool_name]
|
||||
|
||||
# Log to persistent metrics
|
||||
try:
|
||||
from agent.hallucination_metrics import log_hallucination_event
|
||||
log_hallucination_event(
|
||||
tool_name=tool_name,
|
||||
error_type="unknown_tool",
|
||||
suggested_name=None,
|
||||
)
|
||||
except Exception:
|
||||
pass # Best-effort metrics logging
|
||||
|
||||
if count >= self.failure_threshold:
|
||||
logger.warning(
|
||||
f"Poka-yoke circuit breaker triggered for '{tool_name}': "
|
||||
|
||||
Reference in New Issue
Block a user