Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a1d536826e |
@@ -1,39 +0,0 @@
|
||||
"""Tests for binary_extensions helpers."""
|
||||
|
||||
from tools.binary_extensions import has_binary_extension, has_image_extension
|
||||
|
||||
|
||||
def test_has_image_extension_png():
|
||||
assert has_image_extension("/tmp/test.png") is True
|
||||
assert has_image_extension("/tmp/test.PNG") is True
|
||||
|
||||
|
||||
def test_has_image_extension_jpg_variants():
|
||||
assert has_image_extension("/tmp/test.jpg") is True
|
||||
assert has_image_extension("/tmp/test.jpeg") is True
|
||||
assert has_image_extension("/tmp/test.JPG") is True
|
||||
|
||||
|
||||
def test_has_image_extension_webp():
|
||||
assert has_image_extension("/tmp/test.webp") is True
|
||||
|
||||
|
||||
def test_has_image_extension_gif():
|
||||
assert has_image_extension("/tmp/test.gif") is True
|
||||
|
||||
|
||||
def test_has_image_extension_no_ext():
|
||||
assert has_image_extension("/tmp/test") is False
|
||||
|
||||
|
||||
def test_has_image_extension_non_image():
|
||||
assert has_image_extension("/tmp/test.txt") is False
|
||||
assert has_image_extension("/tmp/test.exe") is False
|
||||
assert has_image_extension("/tmp/test.pdf") is False
|
||||
|
||||
|
||||
def test_has_binary_extension_includes_images():
|
||||
"""All image extensions must also be in binary extensions."""
|
||||
assert has_binary_extension("/tmp/test.png") is True
|
||||
assert has_binary_extension("/tmp/test.jpg") is True
|
||||
assert has_binary_extension("/tmp/test.webp") is True
|
||||
@@ -294,67 +294,3 @@ class TestSearchHints:
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class TestReadFileImageRouting:
|
||||
"""Tests that image files are routed through vision analysis."""
|
||||
|
||||
@patch("tools.file_tools._analyze_image_with_vision")
|
||||
def test_image_png_routes_to_vision(self, mock_analyze, tmp_path):
|
||||
mock_analyze.return_value = json.dumps({"analysis": "test image"})
|
||||
img = tmp_path / "test.png"
|
||||
img.write_bytes(b"fake png data")
|
||||
|
||||
from tools.file_tools import read_file_tool
|
||||
result = read_file_tool(str(img))
|
||||
mock_analyze.assert_called_once()
|
||||
assert json.loads(result)["analysis"] == "test image"
|
||||
|
||||
@patch("tools.file_tools._analyze_image_with_vision")
|
||||
def test_image_jpeg_routes_to_vision(self, mock_analyze, tmp_path):
|
||||
mock_analyze.return_value = json.dumps({"analysis": "test image"})
|
||||
img = tmp_path / "test.jpeg"
|
||||
img.write_bytes(b"fake jpeg data")
|
||||
|
||||
from tools.file_tools import read_file_tool
|
||||
result = read_file_tool(str(img))
|
||||
mock_analyze.assert_called_once()
|
||||
assert json.loads(result)["analysis"] == "test image"
|
||||
|
||||
@patch("tools.file_tools._analyze_image_with_vision")
|
||||
def test_image_webp_routes_to_vision(self, mock_analyze, tmp_path):
|
||||
mock_analyze.return_value = json.dumps({"analysis": "test image"})
|
||||
img = tmp_path / "test.webp"
|
||||
img.write_bytes(b"fake webp data")
|
||||
|
||||
from tools.file_tools import read_file_tool
|
||||
result = read_file_tool(str(img))
|
||||
mock_analyze.assert_called_once()
|
||||
assert json.loads(result)["analysis"] == "test image"
|
||||
|
||||
def test_non_image_binary_blocked(self, tmp_path):
|
||||
from tools.file_tools import read_file_tool
|
||||
exe = tmp_path / "test.exe"
|
||||
exe.write_bytes(b"fake exe data")
|
||||
result = json.loads(read_file_tool(str(exe)))
|
||||
assert "error" in result
|
||||
assert "Cannot read binary" in result["error"]
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class TestAnalyzeImageWithVision:
|
||||
"""Tests for the _analyze_image_with_vision helper."""
|
||||
|
||||
def test_import_error_fallback(self):
|
||||
with patch.dict("sys.modules", {"tools.vision_tools": None}):
|
||||
from tools.file_tools import _analyze_image_with_vision
|
||||
result = json.loads(_analyze_image_with_vision("/tmp/test.png"))
|
||||
assert "error" in result
|
||||
assert "vision_analyze tool is not available" in result["error"]
|
||||
|
||||
@@ -308,12 +308,12 @@ word word
|
||||
content = """\
|
||||
---
|
||||
name: test-skill
|
||||
description: A test skill.
|
||||
description: A test skill with enough content to pass the minimum length validation check of one hundred characters.
|
||||
---
|
||||
|
||||
# Test
|
||||
|
||||
word word
|
||||
word word word word word word word word word word
|
||||
"""
|
||||
with _skill_dir(tmp_path):
|
||||
_create_skill("my-skill", content)
|
||||
@@ -484,3 +484,185 @@ class TestSkillManageDispatcher:
|
||||
raw = skill_manage(action="create", name="test-skill", content=VALID_SKILL_CONTENT)
|
||||
result = json.loads(raw)
|
||||
assert result["success"] is True
|
||||
|
||||
|
||||
|
||||
class TestPokaYokeValidation:
|
||||
"""Tests for poka-yoke auto-revert functionality (#837)."""
|
||||
|
||||
def test_short_skill_md_reverts(self, tmp_path):
|
||||
"""SKILL.md shorter than 100 chars should be reverted."""
|
||||
short_content = """---
|
||||
name: test-skill
|
||||
description: Test
|
||||
---
|
||||
|
||||
Short
|
||||
"""
|
||||
with _skill_dir(tmp_path):
|
||||
_create_skill("my-skill", VALID_SKILL_CONTENT)
|
||||
result = _edit_skill("my-skill", short_content)
|
||||
|
||||
assert result["success"] is False
|
||||
assert "too short" in result["error"].lower()
|
||||
|
||||
# Verify the original file is preserved
|
||||
skill_md = tmp_path / "my-skill" / "SKILL.md"
|
||||
content = skill_md.read_text()
|
||||
assert "test-skill" in content # Original content preserved
|
||||
|
||||
def test_truncated_skill_reverts(self, tmp_path):
|
||||
"""Truncated YAML frontmatter should be reverted."""
|
||||
truncated = """---
|
||||
name: test-skill
|
||||
description: Test skill with enough content to pass minimum length validation check.
|
||||
---
|
||||
|
||||
# Test
|
||||
|
||||
This is a longer body section with plenty of text to ensure the content exceeds the minimum one hundred character requirement for SKILL.md files.
|
||||
"""
|
||||
# Chop it off to simulate truncation
|
||||
truncated = truncated[:80]
|
||||
|
||||
with _skill_dir(tmp_path):
|
||||
_create_skill("my-skill", VALID_SKILL_CONTENT)
|
||||
result = _edit_skill("my-skill", truncated)
|
||||
|
||||
assert result["success"] is False
|
||||
|
||||
def test_linked_files_validation(self, tmp_path):
|
||||
"""Missing linked_files should cause revert."""
|
||||
content_with_links = """---
|
||||
name: test-skill
|
||||
description: Test skill with enough content to pass minimum length validation check.
|
||||
linked_files:
|
||||
- references/nonexistent.md
|
||||
---
|
||||
|
||||
# Test
|
||||
|
||||
This is a longer body section with plenty of text to ensure the content exceeds the minimum one hundred character requirement for SKILL.md files.
|
||||
"""
|
||||
with _skill_dir(tmp_path):
|
||||
_create_skill("my-skill", VALID_SKILL_CONTENT)
|
||||
result = _edit_skill("my-skill", content_with_links)
|
||||
|
||||
assert result["success"] is False
|
||||
assert "linked files missing" in result["error"].lower()
|
||||
|
||||
def test_valid_linked_files_pass(self, tmp_path):
|
||||
"""Existing linked_files should pass validation."""
|
||||
content_with_links = """---
|
||||
name: test-skill
|
||||
description: Test skill with enough content to pass minimum length validation check.
|
||||
linked_files:
|
||||
- references/exists.md
|
||||
---
|
||||
|
||||
# Test
|
||||
|
||||
This is a longer body section with plenty of text to ensure the content exceeds the minimum one hundred character requirement for SKILL.md files.
|
||||
"""
|
||||
with _skill_dir(tmp_path):
|
||||
_create_skill("my-skill", VALID_SKILL_CONTENT)
|
||||
# Create the linked file
|
||||
ref_dir = tmp_path / "my-skill" / "references"
|
||||
ref_dir.mkdir(parents=True, exist_ok=True)
|
||||
(ref_dir / "exists.md").write_text("# Reference")
|
||||
|
||||
result = _edit_skill("my-skill", content_with_links)
|
||||
|
||||
assert result["success"] is True
|
||||
|
||||
|
||||
class TestHistoryRegistry:
|
||||
"""Tests for history registry functionality (#837)."""
|
||||
|
||||
def test_history_saved_on_edit(self, tmp_path):
|
||||
"""Editing a skill should save the original to history."""
|
||||
with _skill_dir(tmp_path):
|
||||
_create_skill("my-skill", VALID_SKILL_CONTENT)
|
||||
|
||||
# Make an edit
|
||||
new_content = """---
|
||||
name: test-skill
|
||||
description: Updated description that is longer than one hundred characters to pass validation.
|
||||
---
|
||||
|
||||
# Updated Test
|
||||
|
||||
This body has more content to ensure it passes the minimum length check of one hundred characters.
|
||||
"""
|
||||
result = _edit_skill("my-skill", new_content)
|
||||
assert result["success"] is True
|
||||
|
||||
# Check history was saved
|
||||
history_dir = tmp_path / ".history" / "my-skill"
|
||||
assert history_dir.exists()
|
||||
history_files = list(history_dir.glob("*.md"))
|
||||
assert len(history_files) == 1
|
||||
|
||||
def test_history_pruned_to_three(self, tmp_path):
|
||||
"""Only last 3 history versions should be kept."""
|
||||
from tools.skill_manager_tool import _save_to_history
|
||||
|
||||
with _skill_dir(tmp_path):
|
||||
_create_skill("my-skill", VALID_SKILL_CONTENT)
|
||||
|
||||
# Save 5 versions to history
|
||||
for i in range(5):
|
||||
content = f"""---
|
||||
name: test-skill
|
||||
description: Version {i} that is long enough to pass minimum length validation check of one hundred characters.
|
||||
---
|
||||
|
||||
# Version {i}
|
||||
|
||||
This is the body content for version {i} that ensures we meet the minimum length requirement.
|
||||
"""
|
||||
_save_to_history("my-skill", content, timestamp=1000 + i)
|
||||
|
||||
# Check only 3 history files remain
|
||||
history_dir = tmp_path / ".history" / "my-skill"
|
||||
history_files = sorted(history_dir.glob("*.md"))
|
||||
assert len(history_files) == 3
|
||||
# Should be the last 3 (timestamps 1002, 1003, 1004)
|
||||
assert "1002" in str(history_files[0])
|
||||
|
||||
def test_revert_to_history(self, tmp_path):
|
||||
"""Should be able to revert to a history version."""
|
||||
from tools.skill_manager_tool import _revert_to_history, _get_history_versions
|
||||
|
||||
with _skill_dir(tmp_path):
|
||||
_create_skill("my-skill", VALID_SKILL_CONTENT)
|
||||
skill_md = tmp_path / "my-skill" / "SKILL.md"
|
||||
|
||||
# Save original to history
|
||||
original = skill_md.read_text()
|
||||
from tools.skill_manager_tool import _save_to_history
|
||||
_save_to_history("my-skill", original)
|
||||
|
||||
# Edit the skill
|
||||
new_content = """---
|
||||
name: test-skill
|
||||
description: Updated description that is longer than one hundred characters to pass validation.
|
||||
---
|
||||
|
||||
# Updated
|
||||
|
||||
This body has more content to ensure it passes the minimum length check of one hundred characters.
|
||||
"""
|
||||
_edit_skill("my-skill", new_content)
|
||||
|
||||
# Verify edit was applied
|
||||
assert "Updated" in skill_md.read_text()
|
||||
|
||||
# Revert to history
|
||||
error = _revert_to_history("my-skill", skill_md, version=0)
|
||||
assert error is None
|
||||
|
||||
# Verify revert worked
|
||||
content = skill_md.read_text()
|
||||
assert "test-skill" in content
|
||||
assert "A test skill" in content
|
||||
|
||||
@@ -34,22 +34,9 @@ BINARY_EXTENSIONS = frozenset({
|
||||
})
|
||||
|
||||
|
||||
IMAGE_EXTENSIONS = frozenset({
|
||||
".png", ".jpg", ".jpeg", ".gif", ".bmp", ".ico", ".webp", ".tiff", ".tif",
|
||||
})
|
||||
|
||||
|
||||
def has_binary_extension(path: str) -> bool:
|
||||
"""Check if a file path has a binary extension. Pure string check, no I/O."""
|
||||
dot = path.rfind(".")
|
||||
if dot == -1:
|
||||
return False
|
||||
return path[dot:].lower() in BINARY_EXTENSIONS
|
||||
|
||||
|
||||
def has_image_extension(path: str) -> bool:
|
||||
"""Check if a file path has an image extension. Pure string check, no I/O."""
|
||||
dot = path.rfind(".")
|
||||
if dot == -1:
|
||||
return False
|
||||
return path[dot:].lower() in IMAGE_EXTENSIONS
|
||||
|
||||
@@ -1893,13 +1893,11 @@ def browser_get_images(task_id: Optional[str] = None) -> str:
|
||||
def browser_vision(question: str, annotate: bool = False, task_id: Optional[str] = None) -> str:
|
||||
"""
|
||||
Take a screenshot of the current page and analyze it with vision AI.
|
||||
|
||||
|
||||
This tool captures what's visually displayed in the browser and sends it
|
||||
to the configured vision model for analysis. When the active model is
|
||||
natively multimodal (e.g. Gemma 4) it is used directly; otherwise the
|
||||
auxiliary vision backend is used. Useful for understanding visual content
|
||||
that the text-based snapshot may not capture (CAPTCHAs, verification
|
||||
challenges, images, complex layouts, etc.).
|
||||
to Gemini for analysis. Useful for understanding visual content that the
|
||||
text-based snapshot may not capture (CAPTCHAs, verification challenges,
|
||||
images, complex layouts, etc.).
|
||||
|
||||
The screenshot is saved persistently and its file path is returned alongside
|
||||
the analysis, so it can be shared with users via MEDIA:<path> in the response.
|
||||
|
||||
@@ -7,7 +7,7 @@ import logging
|
||||
import os
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from tools.binary_extensions import has_binary_extension, has_image_extension
|
||||
from tools.binary_extensions import has_binary_extension
|
||||
from tools.file_operations import ShellFileOperations
|
||||
from agent.redact import redact_sensitive_text
|
||||
|
||||
@@ -279,52 +279,6 @@ def clear_file_ops_cache(task_id: str = None):
|
||||
_file_ops_cache.clear()
|
||||
|
||||
|
||||
def _analyze_image_with_vision(image_path: str, task_id: str = "default") -> str:
|
||||
"""Route an image file through the vision analysis pipeline.
|
||||
|
||||
Uses vision_analyze_tool with a default descriptive prompt. Falls back
|
||||
to a manual error when no vision backend is available.
|
||||
"""
|
||||
import asyncio
|
||||
try:
|
||||
from tools.vision_tools import vision_analyze_tool
|
||||
except ImportError:
|
||||
return json.dumps({
|
||||
"error": (
|
||||
f"Image file '{image_path}' detected but vision_analyze tool "
|
||||
"is not available. Use vision_analyze directly if configured."
|
||||
),
|
||||
})
|
||||
|
||||
prompt = (
|
||||
"Describe this image in detail. If it contains text, transcribe "
|
||||
"the text. If it is a diagram, chart, or UI screenshot, describe "
|
||||
"the layout, colors, labels, and any visible data."
|
||||
)
|
||||
|
||||
try:
|
||||
result = asyncio.run(vision_analyze_tool(image_url=image_path, question=prompt))
|
||||
except Exception as exc:
|
||||
return json.dumps({
|
||||
"error": (
|
||||
f"Image file '{image_path}' detected but vision analysis failed: {exc}. "
|
||||
"Use vision_analyze directly if configured."
|
||||
),
|
||||
})
|
||||
|
||||
try:
|
||||
parsed = json.loads(result)
|
||||
except json.JSONDecodeError:
|
||||
parsed = {"content": result}
|
||||
|
||||
# Wrap the vision result so the caller knows it came from image analysis
|
||||
return json.dumps({
|
||||
"image_path": image_path,
|
||||
"analysis": parsed.get("content") or parsed.get("analysis") or result,
|
||||
"source": "vision_analyze",
|
||||
}, ensure_ascii=False)
|
||||
|
||||
|
||||
def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str = "default") -> str:
|
||||
"""Read a file with pagination and line numbers."""
|
||||
try:
|
||||
@@ -341,13 +295,10 @@ def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str =
|
||||
|
||||
_resolved = Path(path).expanduser().resolve()
|
||||
|
||||
# ── Binary / image file guard ─────────────────────────────────
|
||||
# Block binary files by extension (no I/O). Images are routed
|
||||
# through the vision analysis pipeline when a backend is available.
|
||||
# ── Binary file guard ─────────────────────────────────────────
|
||||
# Block binary files by extension (no I/O).
|
||||
if has_binary_extension(str(_resolved)):
|
||||
_ext = _resolved.suffix.lower()
|
||||
if has_image_extension(str(_resolved)):
|
||||
return _analyze_image_with_vision(str(_resolved), task_id=task_id)
|
||||
return json.dumps({
|
||||
"error": (
|
||||
f"Cannot read binary file '{path}' ({_ext}). "
|
||||
@@ -778,7 +729,7 @@ def _check_file_reqs():
|
||||
|
||||
READ_FILE_SCHEMA = {
|
||||
"name": "read_file",
|
||||
"description": "Read a text file with line numbers and pagination. Use this instead of cat/head/tail in terminal. Output format: 'LINE_NUM|CONTENT'. Suggests similar filenames if not found. Use offset and limit for large files. Reads exceeding ~100K characters are rejected; use offset and limit to read specific sections of large files. NOTE: Image files (PNG, JPEG, WebP, GIF, etc.) are automatically analyzed via vision_analyze. Other binary files cannot be read as text.",
|
||||
"description": "Read a text file with line numbers and pagination. Use this instead of cat/head/tail in terminal. Output format: 'LINE_NUM|CONTENT'. Suggests similar filenames if not found. Use offset and limit for large files. Reads exceeding ~100K characters are rejected; use offset and limit to read specific sections of large files. NOTE: Cannot read images or binary files — use vision_analyze for images.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
||||
@@ -322,12 +322,112 @@ def _cleanup_old_backups(file_path: Path, max_backups: int = MAX_BACKUPS_PER_FIL
|
||||
break
|
||||
|
||||
|
||||
|
||||
|
||||
# History registry for rollback (#837)
|
||||
MAX_HISTORY_VERSIONS = 3
|
||||
|
||||
|
||||
def _history_dir_for_skill(skill_name: str) -> Path:
|
||||
"""Return the history directory path for a skill."""
|
||||
return SKILLS_DIR / ".history" / skill_name
|
||||
|
||||
|
||||
def _save_to_history(skill_name: str, content: str, timestamp: Optional[int] = None) -> Optional[Path]:
|
||||
"""Save a version of the skill to the history registry.
|
||||
|
||||
History is stored in ~/.hermes/skills/.history/<skill-name>/<timestamp>.md
|
||||
Keeps the last MAX_HISTORY_VERSIONS versions.
|
||||
|
||||
Returns the path to the saved history file, or None if not saved.
|
||||
"""
|
||||
if timestamp is None:
|
||||
timestamp = int(time.time())
|
||||
|
||||
history_dir = _history_dir_for_skill(skill_name)
|
||||
history_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
history_file = history_dir / f"{timestamp}.md"
|
||||
_atomic_write_text(history_file, content)
|
||||
|
||||
# Clean up old history versions
|
||||
_cleanup_history(skill_name)
|
||||
|
||||
return history_file
|
||||
|
||||
|
||||
def _cleanup_history(skill_name: str, max_versions: int = MAX_HISTORY_VERSIONS) -> None:
|
||||
"""Prune old history versions, keeping only the most recent max_versions."""
|
||||
history_dir = _history_dir_for_skill(skill_name)
|
||||
if not history_dir.exists():
|
||||
return
|
||||
|
||||
try:
|
||||
# Get all history files sorted by modification time (oldest first)
|
||||
history_files = sorted(
|
||||
[f for f in history_dir.iterdir() if f.suffix == '.md' and f.is_file()],
|
||||
key=lambda p: p.stat().st_mtime,
|
||||
)
|
||||
except OSError:
|
||||
return
|
||||
|
||||
# Remove oldest files if we have more than max_versions
|
||||
while len(history_files) > max_versions:
|
||||
try:
|
||||
history_files.pop(0).unlink()
|
||||
except OSError:
|
||||
break
|
||||
|
||||
|
||||
def _get_history_versions(skill_name: str) -> List[Path]:
|
||||
"""Get list of history versions for a skill, newest first."""
|
||||
history_dir = _history_dir_for_skill(skill_name)
|
||||
if not history_dir.exists():
|
||||
return []
|
||||
|
||||
try:
|
||||
return sorted(
|
||||
[f for f in history_dir.iterdir() if f.suffix == '.md' and f.is_file()],
|
||||
key=lambda p: p.stat().st_mtime,
|
||||
reverse=True,
|
||||
)
|
||||
except OSError:
|
||||
return []
|
||||
|
||||
|
||||
def _revert_to_history(skill_name: str, skill_md_path: Path, version: int = 0) -> Optional[str]:
|
||||
"""Revert a skill to a previous history version.
|
||||
|
||||
Args:
|
||||
skill_name: Name of the skill
|
||||
skill_md_path: Path to the current SKILL.md
|
||||
version: Which history version to revert to (0 = most recent, 1 = second most recent, etc.)
|
||||
|
||||
Returns:
|
||||
Error message if revert failed, None if successful
|
||||
"""
|
||||
history_versions = _get_history_versions(skill_name)
|
||||
if not history_versions:
|
||||
return "No history versions available to revert to."
|
||||
|
||||
if version >= len(history_versions):
|
||||
return f"History version {version} not found (only {len(history_versions)} versions available)."
|
||||
|
||||
target_version = history_versions[version]
|
||||
|
||||
try:
|
||||
content = target_version.read_text(encoding="utf-8")
|
||||
_atomic_write_text(skill_md_path, content)
|
||||
return None
|
||||
except Exception as exc:
|
||||
return f"Failed to revert to history version: {exc}"
|
||||
|
||||
def _validate_written_file(file_path: Path, is_skill_md: bool = False) -> Optional[str]:
|
||||
"""Re-read a file from disk and validate it after writing.
|
||||
|
||||
Catches filesystem-level issues (truncation, encoding errors, empty
|
||||
writes) that pre-write validation cannot detect. For SKILL.md files
|
||||
the frontmatter is also re-validated.
|
||||
the frontmatter is also re-validated and linked_files are verified.
|
||||
|
||||
Returns an error message, or *None* if the file looks healthy.
|
||||
"""
|
||||
@@ -341,11 +441,69 @@ def _validate_written_file(file_path: Path, is_skill_md: bool = False) -> Option
|
||||
if len(content) == 0:
|
||||
return "File is empty after write (possible truncation)."
|
||||
|
||||
# Minimum content length check for SKILL.md only (#837)
|
||||
if is_skill_md and len(content) < 100:
|
||||
return f"SKILL.md is too short after write ({len(content)} chars, minimum 100)."
|
||||
|
||||
if is_skill_md:
|
||||
err = _validate_frontmatter(content)
|
||||
if err:
|
||||
return f"Post-write validation failed: {err}"
|
||||
|
||||
# Verify linked_files exist (#837)
|
||||
err = _validate_linked_files(content, file_path.parent)
|
||||
if err:
|
||||
return f"Post-write validation failed: {err}"
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _validate_linked_files(content: str, skill_dir: Path) -> Optional[str]:
|
||||
"""Validate that all files referenced in linked_files exist.
|
||||
|
||||
Parses the SKILL.md frontmatter and checks that any linked_files
|
||||
entries point to files that actually exist in the skill directory.
|
||||
|
||||
Returns an error message, or *None* if all linked files exist.
|
||||
"""
|
||||
if not content.startswith("---"):
|
||||
return None
|
||||
|
||||
end_match = re.search(r'\n---\s*\n', content[3:])
|
||||
if not end_match:
|
||||
return None
|
||||
|
||||
yaml_content = content[3:end_match.start() + 3]
|
||||
try:
|
||||
parsed = yaml.safe_load(yaml_content)
|
||||
except yaml.YAMLError:
|
||||
return None
|
||||
|
||||
if not isinstance(parsed, dict):
|
||||
return None
|
||||
|
||||
linked_files = parsed.get("linked_files", [])
|
||||
if not linked_files:
|
||||
return None
|
||||
|
||||
missing = []
|
||||
for lf in linked_files:
|
||||
if isinstance(lf, dict):
|
||||
file_ref = lf.get("file") or lf.get("path", "")
|
||||
elif isinstance(lf, str):
|
||||
file_ref = lf
|
||||
else:
|
||||
continue
|
||||
|
||||
if file_ref:
|
||||
# Resolve relative to skill directory
|
||||
target = skill_dir / file_ref
|
||||
if not target.exists():
|
||||
missing.append(file_ref)
|
||||
|
||||
if missing:
|
||||
return f"Linked files missing: {', '.join(missing)}"
|
||||
|
||||
return None
|
||||
|
||||
|
||||
@@ -483,6 +641,13 @@ def _edit_skill(name: str, content: str) -> Dict[str, Any]:
|
||||
|
||||
skill_md = existing["path"] / "SKILL.md"
|
||||
|
||||
# Save original to history before modification (#837)
|
||||
try:
|
||||
original_content = skill_md.read_text(encoding="utf-8")
|
||||
_save_to_history(name, original_content)
|
||||
except (OSError, UnicodeDecodeError):
|
||||
pass # If we can't read original, proceed without history
|
||||
|
||||
# --- Transactional write-validate-commit-or-rollback ---
|
||||
backup_path = _backup_skill_file(skill_md)
|
||||
_atomic_write_text(skill_md, content)
|
||||
@@ -598,6 +763,14 @@ def _patch_skill(
|
||||
|
||||
is_skill_md = not file_path
|
||||
|
||||
# Save original to history when patching SKILL.md (#837)
|
||||
if is_skill_md:
|
||||
try:
|
||||
original_content = target.read_text(encoding="utf-8")
|
||||
_save_to_history(name, original_content)
|
||||
except (OSError, UnicodeDecodeError):
|
||||
pass
|
||||
|
||||
# --- Transactional write-validate-commit-or-rollback ---
|
||||
backup_path = _backup_skill_file(target)
|
||||
_atomic_write_text(target, new_content)
|
||||
|
||||
Reference in New Issue
Block a user