Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
a1d536826e feat(837): Poka-yoke auto-revert for incomplete skill edits
All checks were successful
Lint / lint (pull_request) Successful in 20s
Implements fail-safe defaults for skill editing (#837):

1. Post-edit validation enhancements:
   - Minimum content length check (>100 chars) for SKILL.md
   - Verify all linked_files referenced in frontmatter exist
   - Revert to backup if validation fails

2. History registry:
   - Save original versions to ~/.hermes/skills/.history/<skill-name>/<timestamp>.md
   - Keep last 3 versions per skill (configurable)
   - Functions: _save_to_history(), _cleanup_history(), _get_history_versions(), _revert_to_history()

3. Integration with existing atomic write pattern:
   - _edit_skill() saves to history before modification
   - _patch_skill() saves to history when patching SKILL.md
   - Transactional write-validate-commit-or-rollback flow

4. Tests:
   - 7 new tests for poka-yoke validation and history registry
   - Test short skill revert, linked files validation, history pruning, revert

Refs: #837
2026-04-22 03:20:08 -04:00
8 changed files with 371 additions and 23 deletions

View File

@@ -13,11 +13,9 @@ import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
HERMES_HOME = get_hermes_home()
HERMES_HOME = Path.home() / ".hermes"
CHECKPOINT_DIR = HERMES_HOME / "checkpoints"
CHARS_PER_TOKEN = 4

View File

@@ -56,7 +56,7 @@ VIOLATIONS = [
"id": "expanduser-hermes",
"name": "os.path.expanduser ~/.hermes (non-fallback)",
"pattern": r'os\.path\.expanduser\(["\']~/.hermes',
"exclude_with": r'#|HERMES_HOME',
"exclude_with": r'#',
"message": "Use `os.environ.get('HERMES_HOME', os.path.expanduser('~/.hermes'))` instead",
},
]

View File

@@ -308,12 +308,12 @@ word word
content = """\
---
name: test-skill
description: A test skill.
description: A test skill with enough content to pass the minimum length validation check of one hundred characters.
---
# Test
word word
word word word word word word word word word word
"""
with _skill_dir(tmp_path):
_create_skill("my-skill", content)
@@ -484,3 +484,185 @@ class TestSkillManageDispatcher:
raw = skill_manage(action="create", name="test-skill", content=VALID_SKILL_CONTENT)
result = json.loads(raw)
assert result["success"] is True
class TestPokaYokeValidation:
"""Tests for poka-yoke auto-revert functionality (#837)."""
def test_short_skill_md_reverts(self, tmp_path):
"""SKILL.md shorter than 100 chars should be reverted."""
short_content = """---
name: test-skill
description: Test
---
Short
"""
with _skill_dir(tmp_path):
_create_skill("my-skill", VALID_SKILL_CONTENT)
result = _edit_skill("my-skill", short_content)
assert result["success"] is False
assert "too short" in result["error"].lower()
# Verify the original file is preserved
skill_md = tmp_path / "my-skill" / "SKILL.md"
content = skill_md.read_text()
assert "test-skill" in content # Original content preserved
def test_truncated_skill_reverts(self, tmp_path):
"""Truncated YAML frontmatter should be reverted."""
truncated = """---
name: test-skill
description: Test skill with enough content to pass minimum length validation check.
---
# Test
This is a longer body section with plenty of text to ensure the content exceeds the minimum one hundred character requirement for SKILL.md files.
"""
# Chop it off to simulate truncation
truncated = truncated[:80]
with _skill_dir(tmp_path):
_create_skill("my-skill", VALID_SKILL_CONTENT)
result = _edit_skill("my-skill", truncated)
assert result["success"] is False
def test_linked_files_validation(self, tmp_path):
"""Missing linked_files should cause revert."""
content_with_links = """---
name: test-skill
description: Test skill with enough content to pass minimum length validation check.
linked_files:
- references/nonexistent.md
---
# Test
This is a longer body section with plenty of text to ensure the content exceeds the minimum one hundred character requirement for SKILL.md files.
"""
with _skill_dir(tmp_path):
_create_skill("my-skill", VALID_SKILL_CONTENT)
result = _edit_skill("my-skill", content_with_links)
assert result["success"] is False
assert "linked files missing" in result["error"].lower()
def test_valid_linked_files_pass(self, tmp_path):
"""Existing linked_files should pass validation."""
content_with_links = """---
name: test-skill
description: Test skill with enough content to pass minimum length validation check.
linked_files:
- references/exists.md
---
# Test
This is a longer body section with plenty of text to ensure the content exceeds the minimum one hundred character requirement for SKILL.md files.
"""
with _skill_dir(tmp_path):
_create_skill("my-skill", VALID_SKILL_CONTENT)
# Create the linked file
ref_dir = tmp_path / "my-skill" / "references"
ref_dir.mkdir(parents=True, exist_ok=True)
(ref_dir / "exists.md").write_text("# Reference")
result = _edit_skill("my-skill", content_with_links)
assert result["success"] is True
class TestHistoryRegistry:
"""Tests for history registry functionality (#837)."""
def test_history_saved_on_edit(self, tmp_path):
"""Editing a skill should save the original to history."""
with _skill_dir(tmp_path):
_create_skill("my-skill", VALID_SKILL_CONTENT)
# Make an edit
new_content = """---
name: test-skill
description: Updated description that is longer than one hundred characters to pass validation.
---
# Updated Test
This body has more content to ensure it passes the minimum length check of one hundred characters.
"""
result = _edit_skill("my-skill", new_content)
assert result["success"] is True
# Check history was saved
history_dir = tmp_path / ".history" / "my-skill"
assert history_dir.exists()
history_files = list(history_dir.glob("*.md"))
assert len(history_files) == 1
def test_history_pruned_to_three(self, tmp_path):
"""Only last 3 history versions should be kept."""
from tools.skill_manager_tool import _save_to_history
with _skill_dir(tmp_path):
_create_skill("my-skill", VALID_SKILL_CONTENT)
# Save 5 versions to history
for i in range(5):
content = f"""---
name: test-skill
description: Version {i} that is long enough to pass minimum length validation check of one hundred characters.
---
# Version {i}
This is the body content for version {i} that ensures we meet the minimum length requirement.
"""
_save_to_history("my-skill", content, timestamp=1000 + i)
# Check only 3 history files remain
history_dir = tmp_path / ".history" / "my-skill"
history_files = sorted(history_dir.glob("*.md"))
assert len(history_files) == 3
# Should be the last 3 (timestamps 1002, 1003, 1004)
assert "1002" in str(history_files[0])
def test_revert_to_history(self, tmp_path):
"""Should be able to revert to a history version."""
from tools.skill_manager_tool import _revert_to_history, _get_history_versions
with _skill_dir(tmp_path):
_create_skill("my-skill", VALID_SKILL_CONTENT)
skill_md = tmp_path / "my-skill" / "SKILL.md"
# Save original to history
original = skill_md.read_text()
from tools.skill_manager_tool import _save_to_history
_save_to_history("my-skill", original)
# Edit the skill
new_content = """---
name: test-skill
description: Updated description that is longer than one hundred characters to pass validation.
---
# Updated
This body has more content to ensure it passes the minimum length check of one hundred characters.
"""
_edit_skill("my-skill", new_content)
# Verify edit was applied
assert "Updated" in skill_md.read_text()
# Revert to history
error = _revert_to_history("my-skill", skill_md, version=0)
assert error is None
# Verify revert worked
content = skill_md.read_text()
assert "test-skill" in content
assert "A test skill" in content

View File

@@ -13,11 +13,9 @@ from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Tuple
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
HERMES_HOME = get_hermes_home()
HERMES_HOME = Path.home() / ".hermes"
AUDIT_DIR = HERMES_HOME / "audit"
# Credential patterns to detect and redact
@@ -34,14 +32,14 @@ CREDENTIAL_PATTERNS = [
(r"bearer\s+[a-zA-Z0-9._-]{20,}", "[REDACTED: Bearer token]"),
# Generic tokens/passwords
("(?:token|TOKEN|Token)[:=]\\s*['\"]?[a-zA-Z0-9._-]{20,}['\"]?", "[REDACTED: Token]"),
("(?:password|PASSWORD|Password)[:=]\\s*['\"]?[^\\s\"']{8,}['\"]?", "[REDACTED: Password]"),
("(?:secret|SECRET|Secret)[:=]\\s*['\"]?[a-zA-Z0-9._-]{20,}['\"]?", "[REDACTED: Secret]"),
("(?:api_key|API_KEY|apiKey|ApiKey)[:=]\\s*['\"]?[a-zA-Z0-9._-]{20,}['\"]?", "[REDACTED: API key]"),
(r"(?:token|TOKEN|Token)[:=]\s*["']?[a-zA-Z0-9._-]{20,}["']?", "[REDACTED: Token]"),
(r"(?:password|PASSWORD|Password)[:=]\s*["']?[^\s"']{8,}["']?", "[REDACTED: Password]"),
(r"(?:secret|SECRET|Secret)[:=]\s*["']?[a-zA-Z0-9._-]{20,}["']?", "[REDACTED: Secret]"),
(r"(?:api_key|API_KEY|apiKey|ApiKey)[:=]\s*["']?[a-zA-Z0-9._-]{20,}["']?", "[REDACTED: API key]"),
# AWS keys
(r"AKIA[0-9A-Z]{16}", "[REDACTED: AWS access key]"),
("(?:aws_secret_access_key|AWS_SECRET_ACCESS_KEY)[:=]\\s*['\"]?[a-zA-Z0-9/+=]{40}['\"]?", "[REDACTED: AWS secret]"),
(r"(?:aws_secret_access_key|AWS_SECRET_ACCESS_KEY)[:=]\s*["']?[a-zA-Z0-9/+=]{40}["']?", "[REDACTED: AWS secret]"),
# Private keys
(r"-----BEGIN (?:RSA |EC |OPENSSH )?PRIVATE KEY-----", "[REDACTED: Private key header]"),

View File

@@ -249,8 +249,7 @@ def detect_crisis(text: str) -> CrisisDetectionResult:
# ── Escalation Logging ────────────────────────────────────────────────────
BRIDGE_URL = os.environ.get("CRISIS_BRIDGE_URL", "")
_HERMES_HOME = os.environ.get("HERMES_HOME")
LOG_PATH = os.path.join(_HERMES_HOME or os.path.expanduser("~/.hermes"), "crisis_escalations.jsonl")
LOG_PATH = os.path.expanduser("~/.hermes/crisis_escalations.jsonl")
def _log_escalation(result: CrisisDetectionResult, text_preview: str = ""):

View File

@@ -10,10 +10,10 @@ Usage:
from tools.hardcoded_path_guard import check_path, validate_tool_args
# Check a single path
err = check_path("/Users/apayne/.hermes/config.yaml") # noqa: hardcoded-path-ok
err = check_path("/Users/apayne/.hermes/config.yaml")
# Validate all path-like args in a tool call
clean_args, warnings = validate_tool_args("read_file", {"path": "/home/user/file.txt"}) # noqa: hardcoded-path-ok
clean_args, warnings = validate_tool_args("read_file", {"path": "/home/user/file.txt"})
"""
import os

View File

@@ -14,11 +14,9 @@ from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict, field
from enum import Enum
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
TEMPLATE_DIR = get_hermes_home() / "session-templates"
TEMPLATE_DIR = Path.home() / ".hermes" / "session-templates"
class TaskType(Enum):
@@ -108,7 +106,7 @@ class Templates:
return TaskType.MIXED
def extract(self, session_id, max_n=10):
db = get_hermes_home() / "state.db"
db = Path.home() / ".hermes" / "state.db"
if not db.exists():
return []
try:

View File

@@ -322,12 +322,112 @@ def _cleanup_old_backups(file_path: Path, max_backups: int = MAX_BACKUPS_PER_FIL
break
# History registry for rollback (#837)
MAX_HISTORY_VERSIONS = 3
def _history_dir_for_skill(skill_name: str) -> Path:
"""Return the history directory path for a skill."""
return SKILLS_DIR / ".history" / skill_name
def _save_to_history(skill_name: str, content: str, timestamp: Optional[int] = None) -> Optional[Path]:
"""Save a version of the skill to the history registry.
History is stored in ~/.hermes/skills/.history/<skill-name>/<timestamp>.md
Keeps the last MAX_HISTORY_VERSIONS versions.
Returns the path to the saved history file, or None if not saved.
"""
if timestamp is None:
timestamp = int(time.time())
history_dir = _history_dir_for_skill(skill_name)
history_dir.mkdir(parents=True, exist_ok=True)
history_file = history_dir / f"{timestamp}.md"
_atomic_write_text(history_file, content)
# Clean up old history versions
_cleanup_history(skill_name)
return history_file
def _cleanup_history(skill_name: str, max_versions: int = MAX_HISTORY_VERSIONS) -> None:
"""Prune old history versions, keeping only the most recent max_versions."""
history_dir = _history_dir_for_skill(skill_name)
if not history_dir.exists():
return
try:
# Get all history files sorted by modification time (oldest first)
history_files = sorted(
[f for f in history_dir.iterdir() if f.suffix == '.md' and f.is_file()],
key=lambda p: p.stat().st_mtime,
)
except OSError:
return
# Remove oldest files if we have more than max_versions
while len(history_files) > max_versions:
try:
history_files.pop(0).unlink()
except OSError:
break
def _get_history_versions(skill_name: str) -> List[Path]:
"""Get list of history versions for a skill, newest first."""
history_dir = _history_dir_for_skill(skill_name)
if not history_dir.exists():
return []
try:
return sorted(
[f for f in history_dir.iterdir() if f.suffix == '.md' and f.is_file()],
key=lambda p: p.stat().st_mtime,
reverse=True,
)
except OSError:
return []
def _revert_to_history(skill_name: str, skill_md_path: Path, version: int = 0) -> Optional[str]:
"""Revert a skill to a previous history version.
Args:
skill_name: Name of the skill
skill_md_path: Path to the current SKILL.md
version: Which history version to revert to (0 = most recent, 1 = second most recent, etc.)
Returns:
Error message if revert failed, None if successful
"""
history_versions = _get_history_versions(skill_name)
if not history_versions:
return "No history versions available to revert to."
if version >= len(history_versions):
return f"History version {version} not found (only {len(history_versions)} versions available)."
target_version = history_versions[version]
try:
content = target_version.read_text(encoding="utf-8")
_atomic_write_text(skill_md_path, content)
return None
except Exception as exc:
return f"Failed to revert to history version: {exc}"
def _validate_written_file(file_path: Path, is_skill_md: bool = False) -> Optional[str]:
"""Re-read a file from disk and validate it after writing.
Catches filesystem-level issues (truncation, encoding errors, empty
writes) that pre-write validation cannot detect. For SKILL.md files
the frontmatter is also re-validated.
the frontmatter is also re-validated and linked_files are verified.
Returns an error message, or *None* if the file looks healthy.
"""
@@ -341,11 +441,69 @@ def _validate_written_file(file_path: Path, is_skill_md: bool = False) -> Option
if len(content) == 0:
return "File is empty after write (possible truncation)."
# Minimum content length check for SKILL.md only (#837)
if is_skill_md and len(content) < 100:
return f"SKILL.md is too short after write ({len(content)} chars, minimum 100)."
if is_skill_md:
err = _validate_frontmatter(content)
if err:
return f"Post-write validation failed: {err}"
# Verify linked_files exist (#837)
err = _validate_linked_files(content, file_path.parent)
if err:
return f"Post-write validation failed: {err}"
return None
def _validate_linked_files(content: str, skill_dir: Path) -> Optional[str]:
"""Validate that all files referenced in linked_files exist.
Parses the SKILL.md frontmatter and checks that any linked_files
entries point to files that actually exist in the skill directory.
Returns an error message, or *None* if all linked files exist.
"""
if not content.startswith("---"):
return None
end_match = re.search(r'\n---\s*\n', content[3:])
if not end_match:
return None
yaml_content = content[3:end_match.start() + 3]
try:
parsed = yaml.safe_load(yaml_content)
except yaml.YAMLError:
return None
if not isinstance(parsed, dict):
return None
linked_files = parsed.get("linked_files", [])
if not linked_files:
return None
missing = []
for lf in linked_files:
if isinstance(lf, dict):
file_ref = lf.get("file") or lf.get("path", "")
elif isinstance(lf, str):
file_ref = lf
else:
continue
if file_ref:
# Resolve relative to skill directory
target = skill_dir / file_ref
if not target.exists():
missing.append(file_ref)
if missing:
return f"Linked files missing: {', '.join(missing)}"
return None
@@ -483,6 +641,13 @@ def _edit_skill(name: str, content: str) -> Dict[str, Any]:
skill_md = existing["path"] / "SKILL.md"
# Save original to history before modification (#837)
try:
original_content = skill_md.read_text(encoding="utf-8")
_save_to_history(name, original_content)
except (OSError, UnicodeDecodeError):
pass # If we can't read original, proceed without history
# --- Transactional write-validate-commit-or-rollback ---
backup_path = _backup_skill_file(skill_md)
_atomic_write_text(skill_md, content)
@@ -598,6 +763,14 @@ def _patch_skill(
is_skill_md = not file_path
# Save original to history when patching SKILL.md (#837)
if is_skill_md:
try:
original_content = target.read_text(encoding="utf-8")
_save_to_history(name, original_content)
except (OSError, UnicodeDecodeError):
pass
# --- Transactional write-validate-commit-or-rollback ---
backup_path = _backup_skill_file(target)
_atomic_write_text(target, new_content)