Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
a1d536826e feat(837): Poka-yoke auto-revert for incomplete skill edits
All checks were successful
Lint / lint (pull_request) Successful in 20s
Implements fail-safe defaults for skill editing (#837):

1. Post-edit validation enhancements:
   - Minimum content length check (>100 chars) for SKILL.md
   - Verify all linked_files referenced in frontmatter exist
   - Revert to backup if validation fails

2. History registry:
   - Save original versions to ~/.hermes/skills/.history/<skill-name>/<timestamp>.md
   - Keep last 3 versions per skill (configurable)
   - Functions: _save_to_history(), _cleanup_history(), _get_history_versions(), _revert_to_history()

3. Integration with existing atomic write pattern:
   - _edit_skill() saves to history before modification
   - _patch_skill() saves to history when patching SKILL.md
   - Transactional write-validate-commit-or-rollback flow

4. Tests:
   - 7 new tests for poka-yoke validation and history registry
   - Test short skill revert, linked files validation, history pruning, revert

Refs: #837
2026-04-22 03:20:08 -04:00
6 changed files with 358 additions and 505 deletions

View File

@@ -1,281 +0,0 @@
"""
Hallucination Metrics — Persistent logging and alerting for tool hallucinations.
Logs tool hallucination events to a JSONL file and provides aggregated statistics.
Integrates with the poka-yoke validation system.
Usage:
from agent.hallucination_metrics import log_hallucination_event, get_hallucination_stats
log_hallucination_event("invalid_tool", "unknown_tool", "suggested_correct_name")
stats = get_hallucination_stats()
"""
import json
import logging
import os
import time
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from threading import Lock
from typing import Any, Dict, List, Optional, Tuple
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
# Constants
METRICS_FILE_NAME = "hallucination_metrics.jsonl"
ALERT_THRESHOLD = 10 # Alert after this many consecutive failures for a tool
SESSION_WINDOW_HOURS = 24 # Consider events within this window as "session"
# In-memory cache for fast lookups
_cache: Dict[str, Any] = {"events": [], "last_flush": 0, "session_counts": defaultdict(int)}
_cache_lock = Lock()
def _get_metrics_path() -> Path:
"""Return the path to the hallucination metrics file."""
return get_hermes_home() / "metrics" / METRICS_FILE_NAME
def _ensure_metrics_dir():
"""Ensure the metrics directory exists."""
metrics_dir = _get_metrics_path().parent
metrics_dir.mkdir(parents=True, exist_ok=True)
def log_hallucination_event(
tool_name: str,
error_type: str = "unknown_tool",
suggested_name: Optional[str] = None,
validation_messages: Optional[List[str]] = None,
session_id: Optional[str] = None,
) -> Dict[str, Any]:
"""
Log a hallucination event to the metrics file.
Args:
tool_name: The hallucinated tool name
error_type: Type of error (unknown_tool, invalid_params, etc.)
suggested_name: Suggested correction if available
validation_messages: List of validation error messages
session_id: Optional session identifier for grouping
Returns:
The logged event dict with additional metadata
"""
event = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"tool_name": tool_name,
"error_type": error_type,
"suggested_name": suggested_name,
"validation_messages": validation_messages or [],
"session_id": session_id,
"unix_timestamp": time.time(),
}
# Write to file
_ensure_metrics_dir()
metrics_path = _get_metrics_path()
try:
with open(metrics_path, "a", encoding="utf-8") as f:
f.write(json.dumps(event, ensure_ascii=False) + "\n")
except Exception as e:
logger.warning(f"Failed to write hallucination event: {e}")
# Update in-memory cache
with _cache_lock:
_cache["events"].append(event)
_cache["session_counts"][tool_name] += 1
session_count = _cache["session_counts"][tool_name]
# Check alert threshold
if session_count >= ALERT_THRESHOLD:
logger.warning(
f"HALLUCINATION ALERT: Tool '{tool_name}' has failed {session_count} times "
f"in this session (threshold: {ALERT_THRESHOLD}). "
f"This may indicate a persistent hallucination pattern."
)
return event
def _load_events_from_file() -> List[Dict[str, Any]]:
"""Load all events from the metrics file."""
metrics_path = _get_metrics_path()
if not metrics_path.exists():
return []
events = []
try:
with open(metrics_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
try:
events.append(json.loads(line))
except json.JSONDecodeError:
continue
except Exception as e:
logger.warning(f"Failed to load hallucination events: {e}")
return events
def get_hallucination_stats(
hours: Optional[int] = None,
tool_name: Optional[str] = None,
) -> Dict[str, Any]:
"""
Get aggregated hallucination statistics.
Args:
hours: Only consider events from the last N hours (None = all time)
tool_name: Filter to specific tool name (None = all tools)
Returns:
Dict with aggregated statistics
"""
events = _load_events_from_file()
# Filter by time window
if hours is not None:
cutoff = time.time() - (hours * 3600)
events = [e for e in events if e.get("unix_timestamp", 0) >= cutoff]
# Filter by tool name
if tool_name is not None:
events = [e for e in events if e.get("tool_name") == tool_name]
# Aggregate by tool
tool_counts: Dict[str, Dict[str, Any]] = defaultdict(
lambda: {"count": 0, "suggested_names": [], "error_types": defaultdict(int)}
)
for event in events:
name = event.get("tool_name", "unknown")
tool_counts[name]["count"] += 1
if event.get("suggested_name"):
tool_counts[name]["suggested_names"].append(event["suggested_name"])
if event.get("error_type"):
tool_counts[name]["error_types"][event["error_type"]] += 1
# Find most common suggestions per tool
for name, data in tool_counts.items():
suggestions = data["suggested_names"]
if suggestions:
from collections import Counter
most_common = Counter(suggestions).most_common(1)[0]
data["most_common_suggestion"] = most_common[0]
data["suggestion_count"] = most_common[1]
del data["suggested_names"] # Remove raw list from output
# Calculate time-based stats
if events:
first_event = min(e.get("unix_timestamp", 0) for e in events)
last_event = max(e.get("unix_timestamp", 0) for e in events)
time_span_hours = (last_event - first_event) / 3600 if first_event != last_event else 0
else:
time_span_hours = 0
# Error type breakdown
all_error_types: Dict[str, int] = defaultdict(int)
for event in events:
et = event.get("error_type", "unknown")
all_error_types[et] += 1
return {
"total_events": len(events),
"unique_tools": len(tool_counts),
"time_span_hours": round(time_span_hours, 1),
"top_hallucinated_tools": sorted(
[{"tool": k, **v} for k, v in tool_counts.items()],
key=lambda x: -x["count"]
)[:20],
"error_type_breakdown": dict(all_error_types),
"alert_threshold": ALERT_THRESHOLD,
"session_window_hours": SESSION_WINDOW_HOURS,
}
def get_most_hallucinated_tools(n: int = 10) -> List[Tuple[str, int]]:
"""Get the top N most frequently hallucinated tool names."""
stats = get_hallucination_stats()
tools = stats.get("top_hallucinated_tools", [])
return [(t["tool"], t["count"]) for t in tools[:n]]
def clear_metrics(older_than_hours: Optional[int] = None) -> int:
"""
Clear hallucination metrics.
Args:
older_than_hours: Only clear events older than this many hours (None = clear all)
Returns:
Number of events removed
"""
metrics_path = _get_metrics_path()
if not metrics_path.exists():
return 0
if older_than_hours is None:
# Clear all
count = len(_load_events_from_file())
metrics_path.unlink(missing_ok=True)
with _cache_lock:
_cache["events"].clear()
_cache["session_counts"].clear()
return count
# Clear only old events
cutoff = time.time() - (older_than_hours * 3600)
events = _load_events_from_file()
keep = [e for e in events if e.get("unix_timestamp", 0) >= cutoff]
removed = len(events) - len(keep)
# Rewrite file
_ensure_metrics_dir()
with open(metrics_path, "w", encoding="utf-8") as f:
for event in keep:
f.write(json.dumps(event, ensure_ascii=False) + "\n")
return removed
def format_stats_for_display(stats: Dict[str, Any]) -> str:
"""Format statistics as a human-readable string."""
lines = [
"=== Hallucination Metrics ===",
"",
f"Total events: {stats['total_events']}",
f"Unique tools hallucinated: {stats['unique_tools']}",
f"Time span: {stats['time_span_hours']:.1f} hours",
"",
"Top Hallucinated Tools:",
"-" * 40,
]
for tool in stats.get("top_hallucinated_tools", [])[:10]:
lines.append(f" {tool['tool']:<30} {tool['count']:>5} events")
if "most_common_suggestion" in tool:
lines.append(f" → Suggested: {tool['most_common_suggestion']} ({tool['suggestion_count']}x)")
if stats.get("error_type_breakdown"):
lines.extend([
"",
"Error Types:",
"-" * 40,
])
for et, count in sorted(stats["error_type_breakdown"].items(), key=lambda x: -x[1]):
lines.append(f" {et:<30} {count:>5}")
lines.extend([
"",
f"Alert threshold: {stats['alert_threshold']} failures per session",
f"Session window: {stats['session_window_hours']} hours",
])
return "\n".join(lines)

View File

@@ -18,7 +18,6 @@ Usage:
hermes cron list # List cron jobs
hermes cron status # Check if cron scheduler is running
hermes doctor # Check configuration and dependencies
hermes hallucination-stats # Show tool hallucination statistics
hermes honcho setup # Configure Honcho AI memory integration
hermes honcho status # Show Honcho config and connection status
hermes honcho sessions # List directory → session name mappings
@@ -2805,17 +2804,6 @@ def cmd_doctor(args):
run_doctor(args)
def cmd_hallucination_stats(args):
"""Show tool hallucination statistics."""
from agent.hallucination_metrics import get_hallucination_stats, format_stats_for_display, clear_metrics
if getattr(args, 'clear', False):
removed = clear_metrics(older_than_hours=getattr(args, 'older_than', None))
print(f"Cleared {removed} hallucination events.")
return
stats = get_hallucination_stats(hours=getattr(args, 'hours', None))
print(format_stats_for_display(stats))
def cmd_dump(args):
"""Dump setup summary for support/debugging."""
from hermes_cli.dump import run_dump
@@ -5053,33 +5041,6 @@ For more help on a command:
)
doctor_parser.set_defaults(func=cmd_doctor)
# =========================================================================
# hallucination-stats command
# =========================================================================
hallucination_parser = subparsers.add_parser(
"hallucination-stats",
help="Show tool hallucination statistics",
description="View aggregated tool hallucination metrics from poka-yoke validation"
)
hallucination_parser.add_argument(
"--hours",
type=int,
default=None,
help="Only show events from the last N hours"
)
hallucination_parser.add_argument(
"--clear",
action="store_true",
help="Clear all hallucination metrics"
)
hallucination_parser.add_argument(
"--older-than",
type=int,
default=None,
help="When clearing, only remove events older than N hours"
)
hallucination_parser.set_defaults(func=cmd_hallucination_stats)
# =========================================================================
# dump command
# =========================================================================

View File

@@ -1,171 +0,0 @@
"""Tests for agent/hallucination_metrics.py — #853."""
import json
import time
from pathlib import Path
import pytest
from agent.hallucination_metrics import (
log_hallucination_event,
get_hallucination_stats,
get_most_hallucinated_tools,
clear_metrics,
format_stats_for_display,
_get_metrics_path,
)
@pytest.fixture(autouse=True)
def isolated_metrics(monkeypatch, tmp_path):
"""Redirect metrics to a temp file for every test."""
metrics_dir = tmp_path / "test_hermes_home" / "metrics"
metrics_dir.mkdir(parents=True)
metrics_file = metrics_dir / "hallucination_metrics.jsonl"
# Patch the get_hermes_home function to return our temp path
def mock_get_hermes_home():
return tmp_path / "test_hermes_home"
monkeypatch.setattr(
"agent.hallucination_metrics.get_hermes_home",
mock_get_hermes_home,
)
# Also clear cache
from agent.hallucination_metrics import _cache, _cache_lock
with _cache_lock:
_cache["events"].clear()
_cache["session_counts"].clear()
yield
clear_metrics()
class TestLogEvent:
def test_log_event_returns_dict(self):
event = log_hallucination_event("fake_tool", "unknown_tool", "real_tool")
assert event["tool_name"] == "fake_tool"
assert event["error_type"] == "unknown_tool"
assert event["suggested_name"] == "real_tool"
assert "timestamp" in event
assert "unix_timestamp" in event
def test_log_event_persists_to_file(self):
log_hallucination_event("tool_a", "unknown_tool")
log_hallucination_event("tool_b", "invalid_params")
path = _get_metrics_path()
assert path.exists()
lines = path.read_text().strip().splitlines()
assert len(lines) == 2
data = [json.loads(line) for line in lines]
assert data[0]["tool_name"] == "tool_a"
assert data[1]["tool_name"] == "tool_b"
class TestGetStats:
def test_empty_stats(self):
stats = get_hallucination_stats()
assert stats["total_events"] == 0
assert stats["unique_tools"] == 0
def test_stats_by_tool(self):
log_hallucination_event("tool_x", "unknown_tool", "tool_y")
log_hallucination_event("tool_x", "unknown_tool", "tool_y")
log_hallucination_event("tool_z", "invalid_params")
stats = get_hallucination_stats()
assert stats["total_events"] == 3
assert stats["unique_tools"] == 2
top = stats["top_hallucinated_tools"]
assert len(top) == 2
assert top[0]["tool"] == "tool_x"
assert top[0]["count"] == 2
assert top[1]["tool"] == "tool_z"
assert top[1]["count"] == 1
def test_stats_hours_filter(self):
# Log old event by faking timestamp
old_event = {
"timestamp": "2026-01-01T00:00:00+00:00",
"tool_name": "old_tool",
"error_type": "unknown_tool",
"unix_timestamp": time.time() - 48 * 3600,
}
path = _get_metrics_path()
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
f.write(json.dumps(old_event) + "\n")
log_hallucination_event("new_tool", "unknown_tool")
stats = get_hallucination_stats(hours=24)
assert stats["total_events"] == 1
assert stats["top_hallucinated_tools"][0]["tool"] == "new_tool"
def test_error_type_breakdown(self):
log_hallucination_event("t1", "unknown_tool")
log_hallucination_event("t2", "invalid_params")
log_hallucination_event("t3", "unknown_tool")
stats = get_hallucination_stats()
breakdown = stats["error_type_breakdown"]
assert breakdown["unknown_tool"] == 2
assert breakdown["invalid_params"] == 1
class TestGetMostHallucinated:
def test_top_tools(self):
for _ in range(5):
log_hallucination_event("common_tool", "unknown_tool")
for _ in range(2):
log_hallucination_event("rare_tool", "unknown_tool")
tools = get_most_hallucinated_tools(n=2)
assert tools[0] == ("common_tool", 5)
assert tools[1] == ("rare_tool", 2)
class TestClearMetrics:
def test_clear_all(self):
log_hallucination_event("t1", "unknown_tool")
removed = clear_metrics()
assert removed == 1
assert _get_metrics_path().exists() is False
def test_clear_older_than(self):
path = _get_metrics_path()
path.parent.mkdir(parents=True, exist_ok=True)
old = {"tool_name": "old", "unix_timestamp": time.time() - 48 * 3600}
new = {"tool_name": "new", "unix_timestamp": time.time()}
with open(path, "w") as f:
f.write(json.dumps(old) + "\n")
f.write(json.dumps(new) + "\n")
removed = clear_metrics(older_than_hours=24)
assert removed == 1
remaining = get_hallucination_stats()
assert remaining["total_events"] == 1
class TestFormatDisplay:
def test_format_includes_headers(self):
log_hallucination_event("bad_tool", "unknown_tool", "good_tool")
stats = get_hallucination_stats()
text = format_stats_for_display(stats)
assert "Hallucination Metrics" in text
assert "bad_tool" in text
assert "Total events: 1" in text
class TestAlertThreshold:
def test_alert_after_threshold(self, monkeypatch, caplog):
monkeypatch.setattr("agent.hallucination_metrics.ALERT_THRESHOLD", 3)
for i in range(4):
log_hallucination_event("persistent_tool", "unknown_tool")
assert "HALLUCINATION ALERT" in caplog.text
assert "persistent_tool" in caplog.text

View File

@@ -308,12 +308,12 @@ word word
content = """\
---
name: test-skill
description: A test skill.
description: A test skill with enough content to pass the minimum length validation check of one hundred characters.
---
# Test
word word
word word word word word word word word word word
"""
with _skill_dir(tmp_path):
_create_skill("my-skill", content)
@@ -484,3 +484,185 @@ class TestSkillManageDispatcher:
raw = skill_manage(action="create", name="test-skill", content=VALID_SKILL_CONTENT)
result = json.loads(raw)
assert result["success"] is True
class TestPokaYokeValidation:
"""Tests for poka-yoke auto-revert functionality (#837)."""
def test_short_skill_md_reverts(self, tmp_path):
"""SKILL.md shorter than 100 chars should be reverted."""
short_content = """---
name: test-skill
description: Test
---
Short
"""
with _skill_dir(tmp_path):
_create_skill("my-skill", VALID_SKILL_CONTENT)
result = _edit_skill("my-skill", short_content)
assert result["success"] is False
assert "too short" in result["error"].lower()
# Verify the original file is preserved
skill_md = tmp_path / "my-skill" / "SKILL.md"
content = skill_md.read_text()
assert "test-skill" in content # Original content preserved
def test_truncated_skill_reverts(self, tmp_path):
"""Truncated YAML frontmatter should be reverted."""
truncated = """---
name: test-skill
description: Test skill with enough content to pass minimum length validation check.
---
# Test
This is a longer body section with plenty of text to ensure the content exceeds the minimum one hundred character requirement for SKILL.md files.
"""
# Chop it off to simulate truncation
truncated = truncated[:80]
with _skill_dir(tmp_path):
_create_skill("my-skill", VALID_SKILL_CONTENT)
result = _edit_skill("my-skill", truncated)
assert result["success"] is False
def test_linked_files_validation(self, tmp_path):
"""Missing linked_files should cause revert."""
content_with_links = """---
name: test-skill
description: Test skill with enough content to pass minimum length validation check.
linked_files:
- references/nonexistent.md
---
# Test
This is a longer body section with plenty of text to ensure the content exceeds the minimum one hundred character requirement for SKILL.md files.
"""
with _skill_dir(tmp_path):
_create_skill("my-skill", VALID_SKILL_CONTENT)
result = _edit_skill("my-skill", content_with_links)
assert result["success"] is False
assert "linked files missing" in result["error"].lower()
def test_valid_linked_files_pass(self, tmp_path):
"""Existing linked_files should pass validation."""
content_with_links = """---
name: test-skill
description: Test skill with enough content to pass minimum length validation check.
linked_files:
- references/exists.md
---
# Test
This is a longer body section with plenty of text to ensure the content exceeds the minimum one hundred character requirement for SKILL.md files.
"""
with _skill_dir(tmp_path):
_create_skill("my-skill", VALID_SKILL_CONTENT)
# Create the linked file
ref_dir = tmp_path / "my-skill" / "references"
ref_dir.mkdir(parents=True, exist_ok=True)
(ref_dir / "exists.md").write_text("# Reference")
result = _edit_skill("my-skill", content_with_links)
assert result["success"] is True
class TestHistoryRegistry:
"""Tests for history registry functionality (#837)."""
def test_history_saved_on_edit(self, tmp_path):
"""Editing a skill should save the original to history."""
with _skill_dir(tmp_path):
_create_skill("my-skill", VALID_SKILL_CONTENT)
# Make an edit
new_content = """---
name: test-skill
description: Updated description that is longer than one hundred characters to pass validation.
---
# Updated Test
This body has more content to ensure it passes the minimum length check of one hundred characters.
"""
result = _edit_skill("my-skill", new_content)
assert result["success"] is True
# Check history was saved
history_dir = tmp_path / ".history" / "my-skill"
assert history_dir.exists()
history_files = list(history_dir.glob("*.md"))
assert len(history_files) == 1
def test_history_pruned_to_three(self, tmp_path):
"""Only last 3 history versions should be kept."""
from tools.skill_manager_tool import _save_to_history
with _skill_dir(tmp_path):
_create_skill("my-skill", VALID_SKILL_CONTENT)
# Save 5 versions to history
for i in range(5):
content = f"""---
name: test-skill
description: Version {i} that is long enough to pass minimum length validation check of one hundred characters.
---
# Version {i}
This is the body content for version {i} that ensures we meet the minimum length requirement.
"""
_save_to_history("my-skill", content, timestamp=1000 + i)
# Check only 3 history files remain
history_dir = tmp_path / ".history" / "my-skill"
history_files = sorted(history_dir.glob("*.md"))
assert len(history_files) == 3
# Should be the last 3 (timestamps 1002, 1003, 1004)
assert "1002" in str(history_files[0])
def test_revert_to_history(self, tmp_path):
"""Should be able to revert to a history version."""
from tools.skill_manager_tool import _revert_to_history, _get_history_versions
with _skill_dir(tmp_path):
_create_skill("my-skill", VALID_SKILL_CONTENT)
skill_md = tmp_path / "my-skill" / "SKILL.md"
# Save original to history
original = skill_md.read_text()
from tools.skill_manager_tool import _save_to_history
_save_to_history("my-skill", original)
# Edit the skill
new_content = """---
name: test-skill
description: Updated description that is longer than one hundred characters to pass validation.
---
# Updated
This body has more content to ensure it passes the minimum length check of one hundred characters.
"""
_edit_skill("my-skill", new_content)
# Verify edit was applied
assert "Updated" in skill_md.read_text()
# Revert to history
error = _revert_to_history("my-skill", skill_md, version=0)
assert error is None
# Verify revert worked
content = skill_md.read_text()
assert "test-skill" in content
assert "A test skill" in content

View File

@@ -322,12 +322,112 @@ def _cleanup_old_backups(file_path: Path, max_backups: int = MAX_BACKUPS_PER_FIL
break
# History registry for rollback (#837)
MAX_HISTORY_VERSIONS = 3
def _history_dir_for_skill(skill_name: str) -> Path:
"""Return the history directory path for a skill."""
return SKILLS_DIR / ".history" / skill_name
def _save_to_history(skill_name: str, content: str, timestamp: Optional[int] = None) -> Optional[Path]:
"""Save a version of the skill to the history registry.
History is stored in ~/.hermes/skills/.history/<skill-name>/<timestamp>.md
Keeps the last MAX_HISTORY_VERSIONS versions.
Returns the path to the saved history file, or None if not saved.
"""
if timestamp is None:
timestamp = int(time.time())
history_dir = _history_dir_for_skill(skill_name)
history_dir.mkdir(parents=True, exist_ok=True)
history_file = history_dir / f"{timestamp}.md"
_atomic_write_text(history_file, content)
# Clean up old history versions
_cleanup_history(skill_name)
return history_file
def _cleanup_history(skill_name: str, max_versions: int = MAX_HISTORY_VERSIONS) -> None:
"""Prune old history versions, keeping only the most recent max_versions."""
history_dir = _history_dir_for_skill(skill_name)
if not history_dir.exists():
return
try:
# Get all history files sorted by modification time (oldest first)
history_files = sorted(
[f for f in history_dir.iterdir() if f.suffix == '.md' and f.is_file()],
key=lambda p: p.stat().st_mtime,
)
except OSError:
return
# Remove oldest files if we have more than max_versions
while len(history_files) > max_versions:
try:
history_files.pop(0).unlink()
except OSError:
break
def _get_history_versions(skill_name: str) -> List[Path]:
"""Get list of history versions for a skill, newest first."""
history_dir = _history_dir_for_skill(skill_name)
if not history_dir.exists():
return []
try:
return sorted(
[f for f in history_dir.iterdir() if f.suffix == '.md' and f.is_file()],
key=lambda p: p.stat().st_mtime,
reverse=True,
)
except OSError:
return []
def _revert_to_history(skill_name: str, skill_md_path: Path, version: int = 0) -> Optional[str]:
"""Revert a skill to a previous history version.
Args:
skill_name: Name of the skill
skill_md_path: Path to the current SKILL.md
version: Which history version to revert to (0 = most recent, 1 = second most recent, etc.)
Returns:
Error message if revert failed, None if successful
"""
history_versions = _get_history_versions(skill_name)
if not history_versions:
return "No history versions available to revert to."
if version >= len(history_versions):
return f"History version {version} not found (only {len(history_versions)} versions available)."
target_version = history_versions[version]
try:
content = target_version.read_text(encoding="utf-8")
_atomic_write_text(skill_md_path, content)
return None
except Exception as exc:
return f"Failed to revert to history version: {exc}"
def _validate_written_file(file_path: Path, is_skill_md: bool = False) -> Optional[str]:
"""Re-read a file from disk and validate it after writing.
Catches filesystem-level issues (truncation, encoding errors, empty
writes) that pre-write validation cannot detect. For SKILL.md files
the frontmatter is also re-validated.
the frontmatter is also re-validated and linked_files are verified.
Returns an error message, or *None* if the file looks healthy.
"""
@@ -341,11 +441,69 @@ def _validate_written_file(file_path: Path, is_skill_md: bool = False) -> Option
if len(content) == 0:
return "File is empty after write (possible truncation)."
# Minimum content length check for SKILL.md only (#837)
if is_skill_md and len(content) < 100:
return f"SKILL.md is too short after write ({len(content)} chars, minimum 100)."
if is_skill_md:
err = _validate_frontmatter(content)
if err:
return f"Post-write validation failed: {err}"
# Verify linked_files exist (#837)
err = _validate_linked_files(content, file_path.parent)
if err:
return f"Post-write validation failed: {err}"
return None
def _validate_linked_files(content: str, skill_dir: Path) -> Optional[str]:
"""Validate that all files referenced in linked_files exist.
Parses the SKILL.md frontmatter and checks that any linked_files
entries point to files that actually exist in the skill directory.
Returns an error message, or *None* if all linked files exist.
"""
if not content.startswith("---"):
return None
end_match = re.search(r'\n---\s*\n', content[3:])
if not end_match:
return None
yaml_content = content[3:end_match.start() + 3]
try:
parsed = yaml.safe_load(yaml_content)
except yaml.YAMLError:
return None
if not isinstance(parsed, dict):
return None
linked_files = parsed.get("linked_files", [])
if not linked_files:
return None
missing = []
for lf in linked_files:
if isinstance(lf, dict):
file_ref = lf.get("file") or lf.get("path", "")
elif isinstance(lf, str):
file_ref = lf
else:
continue
if file_ref:
# Resolve relative to skill directory
target = skill_dir / file_ref
if not target.exists():
missing.append(file_ref)
if missing:
return f"Linked files missing: {', '.join(missing)}"
return None
@@ -483,6 +641,13 @@ def _edit_skill(name: str, content: str) -> Dict[str, Any]:
skill_md = existing["path"] / "SKILL.md"
# Save original to history before modification (#837)
try:
original_content = skill_md.read_text(encoding="utf-8")
_save_to_history(name, original_content)
except (OSError, UnicodeDecodeError):
pass # If we can't read original, proceed without history
# --- Transactional write-validate-commit-or-rollback ---
backup_path = _backup_skill_file(skill_md)
_atomic_write_text(skill_md, content)
@@ -598,6 +763,14 @@ def _patch_skill(
is_skill_md = not file_path
# Save original to history when patching SKILL.md (#837)
if is_skill_md:
try:
original_content = target.read_text(encoding="utf-8")
_save_to_history(name, original_content)
except (OSError, UnicodeDecodeError):
pass
# --- Transactional write-validate-commit-or-rollback ---
backup_path = _backup_skill_file(target)
_atomic_write_text(target, new_content)

View File

@@ -204,17 +204,6 @@ class ToolCallValidator:
self.consecutive_failures[tool_name] = self.consecutive_failures.get(tool_name, 0) + 1
count = self.consecutive_failures[tool_name]
# Log to persistent metrics
try:
from agent.hallucination_metrics import log_hallucination_event
log_hallucination_event(
tool_name=tool_name,
error_type="unknown_tool",
suggested_name=None,
)
except Exception:
pass # Best-effort metrics logging
if count >= self.failure_threshold:
logger.warning(
f"Poka-yoke circuit breaker triggered for '{tool_name}': "