Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
a1d536826e feat(837): Poka-yoke auto-revert for incomplete skill edits
All checks were successful
Lint / lint (pull_request) Successful in 20s
Implements fail-safe defaults for skill editing (#837):

1. Post-edit validation enhancements:
   - Minimum content length check (>100 chars) for SKILL.md
   - Verify all linked_files referenced in frontmatter exist
   - Revert to backup if validation fails

2. History registry:
   - Save original versions to ~/.hermes/skills/.history/<skill-name>/<timestamp>.md
   - Keep last 3 versions per skill (configurable)
   - Functions: _save_to_history(), _cleanup_history(), _get_history_versions(), _revert_to_history()

3. Integration with existing atomic write pattern:
   - _edit_skill() saves to history before modification
   - _patch_skill() saves to history when patching SKILL.md
   - Transactional write-validate-commit-or-rollback flow

4. Tests:
   - 7 new tests for poka-yoke validation and history registry
   - Test short skill revert, linked files validation, history pruning, revert

Refs: #837
2026-04-22 03:20:08 -04:00
7 changed files with 366 additions and 178 deletions

View File

@@ -1,39 +0,0 @@
"""Tests for binary_extensions helpers."""
from tools.binary_extensions import has_binary_extension, has_image_extension
def test_has_image_extension_png():
assert has_image_extension("/tmp/test.png") is True
assert has_image_extension("/tmp/test.PNG") is True
def test_has_image_extension_jpg_variants():
assert has_image_extension("/tmp/test.jpg") is True
assert has_image_extension("/tmp/test.jpeg") is True
assert has_image_extension("/tmp/test.JPG") is True
def test_has_image_extension_webp():
assert has_image_extension("/tmp/test.webp") is True
def test_has_image_extension_gif():
assert has_image_extension("/tmp/test.gif") is True
def test_has_image_extension_no_ext():
assert has_image_extension("/tmp/test") is False
def test_has_image_extension_non_image():
assert has_image_extension("/tmp/test.txt") is False
assert has_image_extension("/tmp/test.exe") is False
assert has_image_extension("/tmp/test.pdf") is False
def test_has_binary_extension_includes_images():
"""All image extensions must also be in binary extensions."""
assert has_binary_extension("/tmp/test.png") is True
assert has_binary_extension("/tmp/test.jpg") is True
assert has_binary_extension("/tmp/test.webp") is True

View File

@@ -294,67 +294,3 @@ class TestSearchHints:
class TestReadFileImageRouting:
"""Tests that image files are routed through vision analysis."""
@patch("tools.file_tools._analyze_image_with_vision")
def test_image_png_routes_to_vision(self, mock_analyze, tmp_path):
mock_analyze.return_value = json.dumps({"analysis": "test image"})
img = tmp_path / "test.png"
img.write_bytes(b"fake png data")
from tools.file_tools import read_file_tool
result = read_file_tool(str(img))
mock_analyze.assert_called_once()
assert json.loads(result)["analysis"] == "test image"
@patch("tools.file_tools._analyze_image_with_vision")
def test_image_jpeg_routes_to_vision(self, mock_analyze, tmp_path):
mock_analyze.return_value = json.dumps({"analysis": "test image"})
img = tmp_path / "test.jpeg"
img.write_bytes(b"fake jpeg data")
from tools.file_tools import read_file_tool
result = read_file_tool(str(img))
mock_analyze.assert_called_once()
assert json.loads(result)["analysis"] == "test image"
@patch("tools.file_tools._analyze_image_with_vision")
def test_image_webp_routes_to_vision(self, mock_analyze, tmp_path):
mock_analyze.return_value = json.dumps({"analysis": "test image"})
img = tmp_path / "test.webp"
img.write_bytes(b"fake webp data")
from tools.file_tools import read_file_tool
result = read_file_tool(str(img))
mock_analyze.assert_called_once()
assert json.loads(result)["analysis"] == "test image"
def test_non_image_binary_blocked(self, tmp_path):
from tools.file_tools import read_file_tool
exe = tmp_path / "test.exe"
exe.write_bytes(b"fake exe data")
result = json.loads(read_file_tool(str(exe)))
assert "error" in result
assert "Cannot read binary" in result["error"]
class TestAnalyzeImageWithVision:
"""Tests for the _analyze_image_with_vision helper."""
def test_import_error_fallback(self):
with patch.dict("sys.modules", {"tools.vision_tools": None}):
from tools.file_tools import _analyze_image_with_vision
result = json.loads(_analyze_image_with_vision("/tmp/test.png"))
assert "error" in result
assert "vision_analyze tool is not available" in result["error"]

View File

@@ -308,12 +308,12 @@ word word
content = """\
---
name: test-skill
description: A test skill.
description: A test skill with enough content to pass the minimum length validation check of one hundred characters.
---
# Test
word word
word word word word word word word word word word
"""
with _skill_dir(tmp_path):
_create_skill("my-skill", content)
@@ -484,3 +484,185 @@ class TestSkillManageDispatcher:
raw = skill_manage(action="create", name="test-skill", content=VALID_SKILL_CONTENT)
result = json.loads(raw)
assert result["success"] is True
class TestPokaYokeValidation:
"""Tests for poka-yoke auto-revert functionality (#837)."""
def test_short_skill_md_reverts(self, tmp_path):
"""SKILL.md shorter than 100 chars should be reverted."""
short_content = """---
name: test-skill
description: Test
---
Short
"""
with _skill_dir(tmp_path):
_create_skill("my-skill", VALID_SKILL_CONTENT)
result = _edit_skill("my-skill", short_content)
assert result["success"] is False
assert "too short" in result["error"].lower()
# Verify the original file is preserved
skill_md = tmp_path / "my-skill" / "SKILL.md"
content = skill_md.read_text()
assert "test-skill" in content # Original content preserved
def test_truncated_skill_reverts(self, tmp_path):
"""Truncated YAML frontmatter should be reverted."""
truncated = """---
name: test-skill
description: Test skill with enough content to pass minimum length validation check.
---
# Test
This is a longer body section with plenty of text to ensure the content exceeds the minimum one hundred character requirement for SKILL.md files.
"""
# Chop it off to simulate truncation
truncated = truncated[:80]
with _skill_dir(tmp_path):
_create_skill("my-skill", VALID_SKILL_CONTENT)
result = _edit_skill("my-skill", truncated)
assert result["success"] is False
def test_linked_files_validation(self, tmp_path):
"""Missing linked_files should cause revert."""
content_with_links = """---
name: test-skill
description: Test skill with enough content to pass minimum length validation check.
linked_files:
- references/nonexistent.md
---
# Test
This is a longer body section with plenty of text to ensure the content exceeds the minimum one hundred character requirement for SKILL.md files.
"""
with _skill_dir(tmp_path):
_create_skill("my-skill", VALID_SKILL_CONTENT)
result = _edit_skill("my-skill", content_with_links)
assert result["success"] is False
assert "linked files missing" in result["error"].lower()
def test_valid_linked_files_pass(self, tmp_path):
"""Existing linked_files should pass validation."""
content_with_links = """---
name: test-skill
description: Test skill with enough content to pass minimum length validation check.
linked_files:
- references/exists.md
---
# Test
This is a longer body section with plenty of text to ensure the content exceeds the minimum one hundred character requirement for SKILL.md files.
"""
with _skill_dir(tmp_path):
_create_skill("my-skill", VALID_SKILL_CONTENT)
# Create the linked file
ref_dir = tmp_path / "my-skill" / "references"
ref_dir.mkdir(parents=True, exist_ok=True)
(ref_dir / "exists.md").write_text("# Reference")
result = _edit_skill("my-skill", content_with_links)
assert result["success"] is True
class TestHistoryRegistry:
"""Tests for history registry functionality (#837)."""
def test_history_saved_on_edit(self, tmp_path):
"""Editing a skill should save the original to history."""
with _skill_dir(tmp_path):
_create_skill("my-skill", VALID_SKILL_CONTENT)
# Make an edit
new_content = """---
name: test-skill
description: Updated description that is longer than one hundred characters to pass validation.
---
# Updated Test
This body has more content to ensure it passes the minimum length check of one hundred characters.
"""
result = _edit_skill("my-skill", new_content)
assert result["success"] is True
# Check history was saved
history_dir = tmp_path / ".history" / "my-skill"
assert history_dir.exists()
history_files = list(history_dir.glob("*.md"))
assert len(history_files) == 1
def test_history_pruned_to_three(self, tmp_path):
"""Only last 3 history versions should be kept."""
from tools.skill_manager_tool import _save_to_history
with _skill_dir(tmp_path):
_create_skill("my-skill", VALID_SKILL_CONTENT)
# Save 5 versions to history
for i in range(5):
content = f"""---
name: test-skill
description: Version {i} that is long enough to pass minimum length validation check of one hundred characters.
---
# Version {i}
This is the body content for version {i} that ensures we meet the minimum length requirement.
"""
_save_to_history("my-skill", content, timestamp=1000 + i)
# Check only 3 history files remain
history_dir = tmp_path / ".history" / "my-skill"
history_files = sorted(history_dir.glob("*.md"))
assert len(history_files) == 3
# Should be the last 3 (timestamps 1002, 1003, 1004)
assert "1002" in str(history_files[0])
def test_revert_to_history(self, tmp_path):
"""Should be able to revert to a history version."""
from tools.skill_manager_tool import _revert_to_history, _get_history_versions
with _skill_dir(tmp_path):
_create_skill("my-skill", VALID_SKILL_CONTENT)
skill_md = tmp_path / "my-skill" / "SKILL.md"
# Save original to history
original = skill_md.read_text()
from tools.skill_manager_tool import _save_to_history
_save_to_history("my-skill", original)
# Edit the skill
new_content = """---
name: test-skill
description: Updated description that is longer than one hundred characters to pass validation.
---
# Updated
This body has more content to ensure it passes the minimum length check of one hundred characters.
"""
_edit_skill("my-skill", new_content)
# Verify edit was applied
assert "Updated" in skill_md.read_text()
# Revert to history
error = _revert_to_history("my-skill", skill_md, version=0)
assert error is None
# Verify revert worked
content = skill_md.read_text()
assert "test-skill" in content
assert "A test skill" in content

View File

@@ -34,22 +34,9 @@ BINARY_EXTENSIONS = frozenset({
})
IMAGE_EXTENSIONS = frozenset({
".png", ".jpg", ".jpeg", ".gif", ".bmp", ".ico", ".webp", ".tiff", ".tif",
})
def has_binary_extension(path: str) -> bool:
"""Check if a file path has a binary extension. Pure string check, no I/O."""
dot = path.rfind(".")
if dot == -1:
return False
return path[dot:].lower() in BINARY_EXTENSIONS
def has_image_extension(path: str) -> bool:
"""Check if a file path has an image extension. Pure string check, no I/O."""
dot = path.rfind(".")
if dot == -1:
return False
return path[dot:].lower() in IMAGE_EXTENSIONS

View File

@@ -1893,13 +1893,11 @@ def browser_get_images(task_id: Optional[str] = None) -> str:
def browser_vision(question: str, annotate: bool = False, task_id: Optional[str] = None) -> str:
"""
Take a screenshot of the current page and analyze it with vision AI.
This tool captures what's visually displayed in the browser and sends it
to the configured vision model for analysis. When the active model is
natively multimodal (e.g. Gemma 4) it is used directly; otherwise the
auxiliary vision backend is used. Useful for understanding visual content
that the text-based snapshot may not capture (CAPTCHAs, verification
challenges, images, complex layouts, etc.).
to Gemini for analysis. Useful for understanding visual content that the
text-based snapshot may not capture (CAPTCHAs, verification challenges,
images, complex layouts, etc.).
The screenshot is saved persistently and its file path is returned alongside
the analysis, so it can be shared with users via MEDIA:<path> in the response.

View File

@@ -7,7 +7,7 @@ import logging
import os
import threading
from pathlib import Path
from tools.binary_extensions import has_binary_extension, has_image_extension
from tools.binary_extensions import has_binary_extension
from tools.file_operations import ShellFileOperations
from agent.redact import redact_sensitive_text
@@ -279,52 +279,6 @@ def clear_file_ops_cache(task_id: str = None):
_file_ops_cache.clear()
def _analyze_image_with_vision(image_path: str, task_id: str = "default") -> str:
"""Route an image file through the vision analysis pipeline.
Uses vision_analyze_tool with a default descriptive prompt. Falls back
to a manual error when no vision backend is available.
"""
import asyncio
try:
from tools.vision_tools import vision_analyze_tool
except ImportError:
return json.dumps({
"error": (
f"Image file '{image_path}' detected but vision_analyze tool "
"is not available. Use vision_analyze directly if configured."
),
})
prompt = (
"Describe this image in detail. If it contains text, transcribe "
"the text. If it is a diagram, chart, or UI screenshot, describe "
"the layout, colors, labels, and any visible data."
)
try:
result = asyncio.run(vision_analyze_tool(image_url=image_path, question=prompt))
except Exception as exc:
return json.dumps({
"error": (
f"Image file '{image_path}' detected but vision analysis failed: {exc}. "
"Use vision_analyze directly if configured."
),
})
try:
parsed = json.loads(result)
except json.JSONDecodeError:
parsed = {"content": result}
# Wrap the vision result so the caller knows it came from image analysis
return json.dumps({
"image_path": image_path,
"analysis": parsed.get("content") or parsed.get("analysis") or result,
"source": "vision_analyze",
}, ensure_ascii=False)
def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str = "default") -> str:
"""Read a file with pagination and line numbers."""
try:
@@ -341,13 +295,10 @@ def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str =
_resolved = Path(path).expanduser().resolve()
# ── Binary / image file guard ─────────────────────────────────
# Block binary files by extension (no I/O). Images are routed
# through the vision analysis pipeline when a backend is available.
# ── Binary file guard ─────────────────────────────────────────
# Block binary files by extension (no I/O).
if has_binary_extension(str(_resolved)):
_ext = _resolved.suffix.lower()
if has_image_extension(str(_resolved)):
return _analyze_image_with_vision(str(_resolved), task_id=task_id)
return json.dumps({
"error": (
f"Cannot read binary file '{path}' ({_ext}). "
@@ -778,7 +729,7 @@ def _check_file_reqs():
READ_FILE_SCHEMA = {
"name": "read_file",
"description": "Read a text file with line numbers and pagination. Use this instead of cat/head/tail in terminal. Output format: 'LINE_NUM|CONTENT'. Suggests similar filenames if not found. Use offset and limit for large files. Reads exceeding ~100K characters are rejected; use offset and limit to read specific sections of large files. NOTE: Image files (PNG, JPEG, WebP, GIF, etc.) are automatically analyzed via vision_analyze. Other binary files cannot be read as text.",
"description": "Read a text file with line numbers and pagination. Use this instead of cat/head/tail in terminal. Output format: 'LINE_NUM|CONTENT'. Suggests similar filenames if not found. Use offset and limit for large files. Reads exceeding ~100K characters are rejected; use offset and limit to read specific sections of large files. NOTE: Cannot read images or binary files — use vision_analyze for images.",
"parameters": {
"type": "object",
"properties": {

View File

@@ -322,12 +322,112 @@ def _cleanup_old_backups(file_path: Path, max_backups: int = MAX_BACKUPS_PER_FIL
break
# History registry for rollback (#837)
MAX_HISTORY_VERSIONS = 3
def _history_dir_for_skill(skill_name: str) -> Path:
"""Return the history directory path for a skill."""
return SKILLS_DIR / ".history" / skill_name
def _save_to_history(skill_name: str, content: str, timestamp: Optional[int] = None) -> Optional[Path]:
"""Save a version of the skill to the history registry.
History is stored in ~/.hermes/skills/.history/<skill-name>/<timestamp>.md
Keeps the last MAX_HISTORY_VERSIONS versions.
Returns the path to the saved history file, or None if not saved.
"""
if timestamp is None:
timestamp = int(time.time())
history_dir = _history_dir_for_skill(skill_name)
history_dir.mkdir(parents=True, exist_ok=True)
history_file = history_dir / f"{timestamp}.md"
_atomic_write_text(history_file, content)
# Clean up old history versions
_cleanup_history(skill_name)
return history_file
def _cleanup_history(skill_name: str, max_versions: int = MAX_HISTORY_VERSIONS) -> None:
"""Prune old history versions, keeping only the most recent max_versions."""
history_dir = _history_dir_for_skill(skill_name)
if not history_dir.exists():
return
try:
# Get all history files sorted by modification time (oldest first)
history_files = sorted(
[f for f in history_dir.iterdir() if f.suffix == '.md' and f.is_file()],
key=lambda p: p.stat().st_mtime,
)
except OSError:
return
# Remove oldest files if we have more than max_versions
while len(history_files) > max_versions:
try:
history_files.pop(0).unlink()
except OSError:
break
def _get_history_versions(skill_name: str) -> List[Path]:
"""Get list of history versions for a skill, newest first."""
history_dir = _history_dir_for_skill(skill_name)
if not history_dir.exists():
return []
try:
return sorted(
[f for f in history_dir.iterdir() if f.suffix == '.md' and f.is_file()],
key=lambda p: p.stat().st_mtime,
reverse=True,
)
except OSError:
return []
def _revert_to_history(skill_name: str, skill_md_path: Path, version: int = 0) -> Optional[str]:
"""Revert a skill to a previous history version.
Args:
skill_name: Name of the skill
skill_md_path: Path to the current SKILL.md
version: Which history version to revert to (0 = most recent, 1 = second most recent, etc.)
Returns:
Error message if revert failed, None if successful
"""
history_versions = _get_history_versions(skill_name)
if not history_versions:
return "No history versions available to revert to."
if version >= len(history_versions):
return f"History version {version} not found (only {len(history_versions)} versions available)."
target_version = history_versions[version]
try:
content = target_version.read_text(encoding="utf-8")
_atomic_write_text(skill_md_path, content)
return None
except Exception as exc:
return f"Failed to revert to history version: {exc}"
def _validate_written_file(file_path: Path, is_skill_md: bool = False) -> Optional[str]:
"""Re-read a file from disk and validate it after writing.
Catches filesystem-level issues (truncation, encoding errors, empty
writes) that pre-write validation cannot detect. For SKILL.md files
the frontmatter is also re-validated.
the frontmatter is also re-validated and linked_files are verified.
Returns an error message, or *None* if the file looks healthy.
"""
@@ -341,11 +441,69 @@ def _validate_written_file(file_path: Path, is_skill_md: bool = False) -> Option
if len(content) == 0:
return "File is empty after write (possible truncation)."
# Minimum content length check for SKILL.md only (#837)
if is_skill_md and len(content) < 100:
return f"SKILL.md is too short after write ({len(content)} chars, minimum 100)."
if is_skill_md:
err = _validate_frontmatter(content)
if err:
return f"Post-write validation failed: {err}"
# Verify linked_files exist (#837)
err = _validate_linked_files(content, file_path.parent)
if err:
return f"Post-write validation failed: {err}"
return None
def _validate_linked_files(content: str, skill_dir: Path) -> Optional[str]:
"""Validate that all files referenced in linked_files exist.
Parses the SKILL.md frontmatter and checks that any linked_files
entries point to files that actually exist in the skill directory.
Returns an error message, or *None* if all linked files exist.
"""
if not content.startswith("---"):
return None
end_match = re.search(r'\n---\s*\n', content[3:])
if not end_match:
return None
yaml_content = content[3:end_match.start() + 3]
try:
parsed = yaml.safe_load(yaml_content)
except yaml.YAMLError:
return None
if not isinstance(parsed, dict):
return None
linked_files = parsed.get("linked_files", [])
if not linked_files:
return None
missing = []
for lf in linked_files:
if isinstance(lf, dict):
file_ref = lf.get("file") or lf.get("path", "")
elif isinstance(lf, str):
file_ref = lf
else:
continue
if file_ref:
# Resolve relative to skill directory
target = skill_dir / file_ref
if not target.exists():
missing.append(file_ref)
if missing:
return f"Linked files missing: {', '.join(missing)}"
return None
@@ -483,6 +641,13 @@ def _edit_skill(name: str, content: str) -> Dict[str, Any]:
skill_md = existing["path"] / "SKILL.md"
# Save original to history before modification (#837)
try:
original_content = skill_md.read_text(encoding="utf-8")
_save_to_history(name, original_content)
except (OSError, UnicodeDecodeError):
pass # If we can't read original, proceed without history
# --- Transactional write-validate-commit-or-rollback ---
backup_path = _backup_skill_file(skill_md)
_atomic_write_text(skill_md, content)
@@ -598,6 +763,14 @@ def _patch_skill(
is_skill_md = not file_path
# Save original to history when patching SKILL.md (#837)
if is_skill_md:
try:
original_content = target.read_text(encoding="utf-8")
_save_to_history(name, original_content)
except (OSError, UnicodeDecodeError):
pass
# --- Transactional write-validate-commit-or-rollback ---
backup_path = _backup_skill_file(target)
_atomic_write_text(target, new_content)