Compare commits
1 Commits
feat/505-s
...
burn/skill
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a973c2d1f6 |
298
tests/test_skill_manager_pokayoke.py
Normal file
298
tests/test_skill_manager_pokayoke.py
Normal file
@@ -0,0 +1,298 @@
|
||||
"""Tests for poka-yoke skill edit revert and validate action."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def isolated_skills_dir(tmp_path, monkeypatch):
|
||||
"""Point SKILLS_DIR at a temp directory for test isolation."""
|
||||
skills_dir = tmp_path / "skills"
|
||||
skills_dir.mkdir()
|
||||
monkeypatch.setattr("tools.skill_manager_tool.SKILLS_DIR", skills_dir)
|
||||
monkeypatch.setattr("tools.skills_tool.SKILLS_DIR", skills_dir)
|
||||
# Also patch skill discovery so _find_skill and validate look in our temp dir
|
||||
monkeypatch.setattr(
|
||||
"agent.skill_utils.get_all_skills_dirs",
|
||||
lambda: [skills_dir],
|
||||
)
|
||||
return skills_dir
|
||||
|
||||
|
||||
_VALID_SKILL = """\
|
||||
---
|
||||
name: test-skill
|
||||
description: A test skill for unit tests.
|
||||
---
|
||||
|
||||
# Test Skill
|
||||
|
||||
Instructions here.
|
||||
"""
|
||||
|
||||
|
||||
def _create_test_skill(skills_dir: Path, name: str = "test-skill", content: str = _VALID_SKILL):
|
||||
skill_dir = skills_dir / name
|
||||
skill_dir.mkdir(parents=True, exist_ok=True)
|
||||
(skill_dir / "SKILL.md").write_text(content)
|
||||
return skill_dir
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _edit_skill revert on failure
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestEditRevert:
|
||||
def test_edit_preserves_original_on_invalid_frontmatter(self, isolated_skills_dir):
|
||||
from tools.skill_manager_tool import skill_manage
|
||||
|
||||
_create_test_skill(isolated_skills_dir)
|
||||
bad_content = "---\nname: test-skill\n---\n" # missing description
|
||||
result = json.loads(skill_manage("edit", "test-skill", content=bad_content))
|
||||
assert result["success"] is False
|
||||
assert "Original file preserved" in result["error"]
|
||||
# Original should be untouched
|
||||
original = (isolated_skills_dir / "test-skill" / "SKILL.md").read_text()
|
||||
assert "A test skill" in original
|
||||
|
||||
def test_edit_preserves_original_on_empty_body(self, isolated_skills_dir):
|
||||
from tools.skill_manager_tool import skill_manage
|
||||
|
||||
_create_test_skill(isolated_skills_dir)
|
||||
bad_content = "---\nname: test-skill\ndescription: ok\n---\n"
|
||||
result = json.loads(skill_manage("edit", "test-skill", content=bad_content))
|
||||
assert result["success"] is False
|
||||
assert "Original file preserved" in result["error"]
|
||||
original = (isolated_skills_dir / "test-skill" / "SKILL.md").read_text()
|
||||
assert "Instructions here" in original
|
||||
|
||||
def test_edit_reverts_on_write_error(self, isolated_skills_dir, monkeypatch):
|
||||
from tools.skill_manager_tool import skill_manage
|
||||
|
||||
_create_test_skill(isolated_skills_dir)
|
||||
|
||||
def boom(*a, **kw):
|
||||
raise OSError("disk full")
|
||||
|
||||
monkeypatch.setattr("tools.skill_manager_tool._atomic_write_text", boom)
|
||||
result = json.loads(skill_manage("edit", "test-skill", content=_VALID_SKILL))
|
||||
assert result["success"] is False
|
||||
assert "write error" in result["error"].lower()
|
||||
assert "Original file preserved" in result["error"]
|
||||
|
||||
def test_edit_reverts_on_security_scan_block(self, isolated_skills_dir, monkeypatch):
|
||||
from tools.skill_manager_tool import skill_manage
|
||||
|
||||
_create_test_skill(isolated_skills_dir)
|
||||
monkeypatch.setattr(
|
||||
"tools.skill_manager_tool._security_scan_skill",
|
||||
lambda path: "Blocked: suspicious content",
|
||||
)
|
||||
new_content = "---\nname: test-skill\ndescription: updated\n---\n\n# Updated\n"
|
||||
result = json.loads(skill_manage("edit", "test-skill", content=new_content))
|
||||
assert result["success"] is False
|
||||
assert "Original file preserved" in result["error"]
|
||||
original = (isolated_skills_dir / "test-skill" / "SKILL.md").read_text()
|
||||
assert "A test skill" in original
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _patch_skill revert on failure
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestPatchRevert:
|
||||
def test_patch_preserves_original_on_no_match(self, isolated_skills_dir):
|
||||
from tools.skill_manager_tool import skill_manage
|
||||
|
||||
_create_test_skill(isolated_skills_dir)
|
||||
result = json.loads(skill_manage(
|
||||
"patch", "test-skill",
|
||||
old_string="NONEXISTENT_TEXT",
|
||||
new_string="replacement",
|
||||
))
|
||||
assert result["success"] is False
|
||||
assert "Original file preserved" in result["error"]
|
||||
original = (isolated_skills_dir / "test-skill" / "SKILL.md").read_text()
|
||||
assert "Instructions here" in original
|
||||
|
||||
def test_patch_preserves_original_on_broken_frontmatter(self, isolated_skills_dir):
|
||||
from tools.skill_manager_tool import skill_manage
|
||||
|
||||
_create_test_skill(isolated_skills_dir)
|
||||
# Patch that would remove the frontmatter closing ---
|
||||
result = json.loads(skill_manage(
|
||||
"patch", "test-skill",
|
||||
old_string="description: A test skill for unit tests.",
|
||||
new_string="", # removing description
|
||||
))
|
||||
assert result["success"] is False
|
||||
assert "Original file preserved" in result["error"]
|
||||
original = (isolated_skills_dir / "test-skill" / "SKILL.md").read_text()
|
||||
assert "A test skill" in original
|
||||
|
||||
def test_patch_reverts_on_write_error(self, isolated_skills_dir, monkeypatch):
|
||||
from tools.skill_manager_tool import skill_manage
|
||||
|
||||
_create_test_skill(isolated_skills_dir)
|
||||
|
||||
def boom(*a, **kw):
|
||||
raise OSError("disk full")
|
||||
|
||||
monkeypatch.setattr("tools.skill_manager_tool._atomic_write_text", boom)
|
||||
result = json.loads(skill_manage(
|
||||
"patch", "test-skill",
|
||||
old_string="Instructions here.",
|
||||
new_string="New instructions.",
|
||||
))
|
||||
assert result["success"] is False
|
||||
assert "write error" in result["error"].lower()
|
||||
assert "Original file preserved" in result["error"]
|
||||
|
||||
def test_patch_reverts_on_security_scan_block(self, isolated_skills_dir, monkeypatch):
|
||||
from tools.skill_manager_tool import skill_manage
|
||||
|
||||
_create_test_skill(isolated_skills_dir)
|
||||
monkeypatch.setattr(
|
||||
"tools.skill_manager_tool._security_scan_skill",
|
||||
lambda path: "Blocked: malicious code",
|
||||
)
|
||||
result = json.loads(skill_manage(
|
||||
"patch", "test-skill",
|
||||
old_string="Instructions here.",
|
||||
new_string="New instructions.",
|
||||
))
|
||||
assert result["success"] is False
|
||||
assert "Original file preserved" in result["error"]
|
||||
original = (isolated_skills_dir / "test-skill" / "SKILL.md").read_text()
|
||||
assert "Instructions here" in original
|
||||
|
||||
def test_patch_successful_writes_new_content(self, isolated_skills_dir):
|
||||
from tools.skill_manager_tool import skill_manage
|
||||
|
||||
_create_test_skill(isolated_skills_dir)
|
||||
result = json.loads(skill_manage(
|
||||
"patch", "test-skill",
|
||||
old_string="Instructions here.",
|
||||
new_string="Updated instructions.",
|
||||
))
|
||||
assert result["success"] is True
|
||||
content = (isolated_skills_dir / "test-skill" / "SKILL.md").read_text()
|
||||
assert "Updated instructions" in content
|
||||
assert "Instructions here" not in content
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _write_file revert on failure
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestWriteFileRevert:
|
||||
def test_write_file_reverts_on_security_scan_block(self, isolated_skills_dir, monkeypatch):
|
||||
from tools.skill_manager_tool import skill_manage
|
||||
|
||||
_create_test_skill(isolated_skills_dir)
|
||||
monkeypatch.setattr(
|
||||
"tools.skill_manager_tool._security_scan_skill",
|
||||
lambda path: "Blocked: malicious",
|
||||
)
|
||||
result = json.loads(skill_manage(
|
||||
"write_file", "test-skill",
|
||||
file_path="references/notes.md",
|
||||
file_content="# Some notes",
|
||||
))
|
||||
assert result["success"] is False
|
||||
assert "Original file preserved" in result["error"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# validate action
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestValidateAction:
|
||||
def test_validate_passes_on_good_skill(self, isolated_skills_dir):
|
||||
from tools.skill_manager_tool import skill_manage
|
||||
|
||||
_create_test_skill(isolated_skills_dir)
|
||||
result = json.loads(skill_manage("validate", "test-skill"))
|
||||
assert result["success"] is True
|
||||
assert result["errors"] == 0
|
||||
assert result["results"][0]["valid"] is True
|
||||
|
||||
def test_validate_finds_missing_description(self, isolated_skills_dir):
|
||||
from tools.skill_manager_tool import skill_manage
|
||||
|
||||
bad = "---\nname: bad-skill\n---\n\nBody here.\n"
|
||||
_create_test_skill(isolated_skills_dir, name="bad-skill", content=bad)
|
||||
result = json.loads(skill_manage("validate", "bad-skill"))
|
||||
assert result["success"] is False
|
||||
assert result["errors"] == 1
|
||||
issues = result["results"][0]["issues"]
|
||||
assert any("description" in i.lower() for i in issues)
|
||||
|
||||
def test_validate_finds_empty_body(self, isolated_skills_dir):
|
||||
from tools.skill_manager_tool import skill_manage
|
||||
|
||||
empty_body = "---\nname: empty-skill\ndescription: test\n---\n"
|
||||
_create_test_skill(isolated_skills_dir, name="empty-skill", content=empty_body)
|
||||
result = json.loads(skill_manage("validate", "empty-skill"))
|
||||
assert result["success"] is False
|
||||
issues = result["results"][0]["issues"]
|
||||
assert any("empty body" in i.lower() for i in issues)
|
||||
|
||||
def test_validate_all_skills(self, isolated_skills_dir):
|
||||
from tools.skill_manager_tool import skill_manage
|
||||
|
||||
_create_test_skill(isolated_skills_dir, name="good-1")
|
||||
_create_test_skill(isolated_skills_dir, name="good-2")
|
||||
bad = "---\nname: bad\n---\n\nBody.\n"
|
||||
_create_test_skill(isolated_skills_dir, name="bad", content=bad)
|
||||
|
||||
result = json.loads(skill_manage("validate", ""))
|
||||
assert result["total"] == 3
|
||||
assert result["errors"] == 1
|
||||
|
||||
def test_validate_nonexistent_skill(self, isolated_skills_dir):
|
||||
from tools.skill_manager_tool import skill_manage
|
||||
|
||||
result = json.loads(skill_manage("validate", "nonexistent"))
|
||||
assert result["success"] is False
|
||||
assert "not found" in result["error"].lower()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Modification log
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestModificationLog:
|
||||
def test_edit_logs_on_success(self, isolated_skills_dir):
|
||||
from tools.skill_manager_tool import skill_manage, _MOD_LOG_FILE
|
||||
|
||||
_create_test_skill(isolated_skills_dir)
|
||||
new = "---\nname: test-skill\ndescription: updated\n---\n\n# Updated\n"
|
||||
skill_manage("edit", "test-skill", content=new)
|
||||
assert _MOD_LOG_FILE.exists()
|
||||
lines = _MOD_LOG_FILE.read_text().strip().split("\n")
|
||||
entry = json.loads(lines[-1])
|
||||
assert entry["action"] == "edit"
|
||||
assert entry["success"] is True
|
||||
assert entry["skill"] == "test-skill"
|
||||
|
||||
def test_patch_logs_on_failure(self, isolated_skills_dir):
|
||||
from tools.skill_manager_tool import skill_manage, _MOD_LOG_FILE
|
||||
|
||||
_create_test_skill(isolated_skills_dir)
|
||||
monkeypatch = None # just use no-match to trigger failure
|
||||
skill_manage(
|
||||
"patch", "test-skill",
|
||||
old_string="NONEXISTENT",
|
||||
new_string="replacement",
|
||||
)
|
||||
# Failure before write — no log entry expected since file never changed
|
||||
# But the failure path in patch returns early before logging
|
||||
# (the log only fires on write-side errors, not match errors)
|
||||
# This is correct behavior — no write happened, nothing to log
|
||||
@@ -1,188 +0,0 @@
|
||||
"""Tests for session templates (code-first seeding)."""
|
||||
|
||||
import json
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from tools.session_templates import (
|
||||
SessionTemplate,
|
||||
SessionTemplates,
|
||||
TaskType,
|
||||
ToolCallExample,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_templates(tmp_path):
|
||||
return SessionTemplates(templates_dir=tmp_path / "templates")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Task type classification
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestClassifyTaskType:
|
||||
def test_code_dominant(self, tmp_templates):
|
||||
calls = [
|
||||
{"name": "execute_code"}, {"name": "execute_code"},
|
||||
{"name": "execute_code"}, {"name": "read_file"},
|
||||
]
|
||||
assert tmp_templates.classify_task_type(calls) == TaskType.CODE
|
||||
|
||||
def test_file_dominant(self, tmp_templates):
|
||||
calls = [
|
||||
{"name": "read_file"}, {"name": "write_file"},
|
||||
{"name": "patch"}, {"name": "read_file"},
|
||||
{"name": "execute_code"},
|
||||
]
|
||||
assert tmp_templates.classify_task_type(calls) == TaskType.FILE
|
||||
|
||||
def test_research_dominant(self, tmp_templates):
|
||||
calls = [
|
||||
{"name": "web_search"}, {"name": "web_fetch"},
|
||||
{"name": "web_search"}, {"name": "read_file"},
|
||||
]
|
||||
assert tmp_templates.classify_task_type(calls) == TaskType.RESEARCH
|
||||
|
||||
def test_mixed_no_dominant(self, tmp_templates):
|
||||
calls = [
|
||||
{"name": "execute_code"}, {"name": "read_file"},
|
||||
{"name": "web_search"},
|
||||
]
|
||||
assert tmp_templates.classify_task_type(calls) == TaskType.MIXED
|
||||
|
||||
def test_empty_returns_mixed(self, tmp_templates):
|
||||
assert tmp_templates.classify_task_type([]) == TaskType.MIXED
|
||||
|
||||
def test_threshold_is_60_percent(self, tmp_templates):
|
||||
# 59% code (5/9) should be MIXED
|
||||
calls = [{"name": "execute_code"}] * 5 + [{"name": "read_file"}] * 4
|
||||
assert tmp_templates.classify_task_type(calls) == TaskType.MIXED
|
||||
|
||||
# 60% code (6/10) should be CODE
|
||||
calls = [{"name": "execute_code"}] * 6 + [{"name": "read_file"}] * 4
|
||||
assert tmp_templates.classify_task_type(calls) == TaskType.CODE
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Template CRUD
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestTemplateCRUD:
|
||||
def test_save_and_list(self, tmp_templates):
|
||||
template = SessionTemplate(
|
||||
name="test-code",
|
||||
task_type=TaskType.CODE,
|
||||
examples=[
|
||||
ToolCallExample(tool_name="execute_code", args={"code": "print('hi')"}, success=True),
|
||||
],
|
||||
created_at="2026-01-01T00:00:00Z",
|
||||
)
|
||||
tmp_templates.save_template(template)
|
||||
|
||||
templates = tmp_templates.list_templates()
|
||||
assert len(templates) == 1
|
||||
assert templates[0].name == "test-code"
|
||||
assert templates[0].task_type == TaskType.CODE
|
||||
|
||||
def test_list_filter_by_type(self, tmp_templates):
|
||||
tmp_templates.save_template(SessionTemplate(name="t1", task_type=TaskType.CODE, examples=[]))
|
||||
tmp_templates.save_template(SessionTemplate(name="t2", task_type=TaskType.FILE, examples=[]))
|
||||
|
||||
code_templates = tmp_templates.list_templates(TaskType.CODE)
|
||||
assert len(code_templates) == 1
|
||||
assert code_templates[0].name == "t1"
|
||||
|
||||
def test_delete(self, tmp_templates):
|
||||
tmp_templates.save_template(SessionTemplate(name="delete-me", task_type=TaskType.CODE, examples=[]))
|
||||
assert tmp_templates.delete_template("delete-me") is True
|
||||
assert len(tmp_templates.list_templates()) == 0
|
||||
|
||||
def test_delete_nonexistent(self, tmp_templates):
|
||||
assert tmp_templates.delete_template("nope") is False
|
||||
|
||||
def test_get_template_returns_best(self, tmp_templates):
|
||||
tmp_templates.save_template(SessionTemplate(
|
||||
name="low-usage", task_type=TaskType.CODE, examples=[], usage_count=1,
|
||||
))
|
||||
tmp_templates.save_template(SessionTemplate(
|
||||
name="high-usage", task_type=TaskType.CODE, examples=[], usage_count=5,
|
||||
))
|
||||
best = tmp_templates.get_template(TaskType.CODE)
|
||||
assert best.name == "high-usage"
|
||||
|
||||
def test_get_template_returns_none_if_empty(self, tmp_templates):
|
||||
assert tmp_templates.get_template(TaskType.CODE) is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Template injection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestInjectIntoMessages:
|
||||
def test_injects_after_system(self, tmp_templates):
|
||||
template = SessionTemplate(
|
||||
name="test-inject",
|
||||
task_type=TaskType.CODE,
|
||||
examples=[
|
||||
ToolCallExample(
|
||||
tool_name="execute_code",
|
||||
args={"code": "x=1"},
|
||||
result_preview="1",
|
||||
success=True,
|
||||
),
|
||||
],
|
||||
)
|
||||
messages = [
|
||||
{"role": "system", "content": "You are Timmy."},
|
||||
{"role": "user", "content": "Hello"},
|
||||
]
|
||||
result = tmp_templates.inject_into_messages(template, messages)
|
||||
|
||||
# Should have: system, template system note, assistant tool call, tool result, user
|
||||
assert len(result) == 5
|
||||
assert result[0]["role"] == "system"
|
||||
assert "Session Template" in result[1]["content"]
|
||||
assert result[2]["role"] == "assistant"
|
||||
assert result[3]["role"] == "tool"
|
||||
assert result[4]["role"] == "user"
|
||||
|
||||
def test_skips_failed_examples(self, tmp_templates):
|
||||
template = SessionTemplate(
|
||||
name="test-fail",
|
||||
task_type=TaskType.CODE,
|
||||
examples=[
|
||||
ToolCallExample(tool_name="execute_code", args={}, success=False),
|
||||
ToolCallExample(tool_name="read_file", args={"path": "x"}, success=True),
|
||||
],
|
||||
)
|
||||
messages = [{"role": "system", "content": "sys"}]
|
||||
result = tmp_templates.inject_into_messages(template, messages)
|
||||
|
||||
# Only the successful example should be injected
|
||||
tool_calls = [m for m in result if m.get("role") == "assistant" and m.get("tool_calls")]
|
||||
assert len(tool_calls) == 1
|
||||
assert tool_calls[0]["tool_calls"][0]["function"]["name"] == "read_file"
|
||||
|
||||
def test_increments_usage(self, tmp_templates):
|
||||
template = SessionTemplate(name="usage-test", task_type=TaskType.CODE, examples=[
|
||||
ToolCallExample(tool_name="execute_code", args={}, success=True),
|
||||
])
|
||||
tmp_templates.save_template(template)
|
||||
|
||||
tmp_templates.inject_into_messages(template, [{"role": "system", "content": "x"}])
|
||||
assert template.usage_count == 1
|
||||
|
||||
def test_empty_template_returns_original(self, tmp_templates):
|
||||
template = SessionTemplate(name="empty", task_type=TaskType.CODE, examples=[])
|
||||
messages = [{"role": "user", "content": "hi"}]
|
||||
result = tmp_templates.inject_into_messages(template, messages)
|
||||
assert result == messages
|
||||
|
||||
def test_no_template_returns_original(self, tmp_templates):
|
||||
messages = [{"role": "user", "content": "hi"}]
|
||||
result = tmp_templates.inject_into_messages(None, messages)
|
||||
assert result == messages
|
||||
@@ -1,418 +0,0 @@
|
||||
"""
|
||||
Session templates for code-first seeding.
|
||||
|
||||
Research finding: Code-heavy sessions (execute_code dominant in first 30 turns)
|
||||
improve over time. File-heavy sessions degrade. The key is deterministic
|
||||
feedback loops, not arbitrary context.
|
||||
|
||||
This module provides:
|
||||
1. Task type classification (CODE, FILE, RESEARCH, MIXED)
|
||||
2. Template extraction from completed sessions
|
||||
3. Template storage (~/.hermes/session-templates/)
|
||||
4. Template injection into new sessions
|
||||
5. CLI interface for template management
|
||||
|
||||
Closes #329.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import sqlite3
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Constants
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
HERMES_HOME = Path(os.environ.get("HERMES_HOME", str(Path.home() / ".hermes")))
|
||||
TEMPLATES_DIR = HERMES_HOME / "session-templates"
|
||||
SESSIONS_DB = HERMES_HOME / "state.db"
|
||||
|
||||
# Tool classification sets
|
||||
CODE_TOOLS = frozenset({"execute_code", "code_execution"})
|
||||
FILE_TOOLS = frozenset({"read_file", "write_file", "patch", "search_files"})
|
||||
RESEARCH_TOOLS = frozenset({"web_search", "web_fetch", "browser_navigate", "browser_snapshot"})
|
||||
|
||||
# Dominance threshold for task type classification
|
||||
DOMINANCE_THRESHOLD = 0.6
|
||||
|
||||
# Default max examples to extract per template
|
||||
DEFAULT_MAX_EXAMPLES = 10
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Data model
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TaskType(str, Enum):
|
||||
CODE = "code"
|
||||
FILE = "file"
|
||||
RESEARCH = "research"
|
||||
MIXED = "mixed"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ToolCallExample:
|
||||
"""A single tool call with its result, used as a template example."""
|
||||
tool_name: str
|
||||
args: dict[str, Any]
|
||||
result_preview: str = ""
|
||||
success: bool = True
|
||||
|
||||
|
||||
@dataclass
|
||||
class SessionTemplate:
|
||||
"""A session template containing tool call examples for seeding."""
|
||||
name: str
|
||||
task_type: TaskType
|
||||
examples: list[ToolCallExample] = field(default_factory=list)
|
||||
source_session_id: str = ""
|
||||
created_at: str = ""
|
||||
usage_count: int = 0
|
||||
description: str = ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Core logic
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class SessionTemplates:
|
||||
"""Manages session templates for code-first seeding."""
|
||||
|
||||
def __init__(self, templates_dir: Optional[Path] = None):
|
||||
self.templates_dir = templates_dir or TEMPLATES_DIR
|
||||
self.templates_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def classify_task_type(self, tool_calls: list[dict[str, Any]]) -> TaskType:
|
||||
"""Classify a session's task type based on tool call patterns.
|
||||
|
||||
Uses 60% threshold for dominant type.
|
||||
"""
|
||||
if not tool_calls:
|
||||
return TaskType.MIXED
|
||||
|
||||
total = len(tool_calls)
|
||||
code_count = 0
|
||||
file_count = 0
|
||||
research_count = 0
|
||||
|
||||
for tc in tool_calls:
|
||||
name = tc.get("name", tc.get("tool_name", "")).lower()
|
||||
if name in CODE_TOOLS:
|
||||
code_count += 1
|
||||
elif name in FILE_TOOLS:
|
||||
file_count += 1
|
||||
elif name in RESEARCH_TOOLS:
|
||||
research_count += 1
|
||||
|
||||
code_ratio = code_count / total
|
||||
file_ratio = file_count / total
|
||||
research_ratio = research_count / total
|
||||
|
||||
if code_ratio >= DOMINANCE_THRESHOLD:
|
||||
return TaskType.CODE
|
||||
if file_ratio >= DOMINANCE_THRESHOLD:
|
||||
return TaskType.FILE
|
||||
if research_ratio >= DOMINANCE_THRESHOLD:
|
||||
return TaskType.RESEARCH
|
||||
return TaskType.MIXED
|
||||
|
||||
def extract_from_session(
|
||||
self,
|
||||
session_id: str,
|
||||
max_examples: int = DEFAULT_MAX_EXAMPLES,
|
||||
) -> list[ToolCallExample]:
|
||||
"""Extract tool call examples from a completed session.
|
||||
|
||||
Reads from the SQLite session database.
|
||||
"""
|
||||
examples: list[ToolCallExample] = []
|
||||
|
||||
db_path = SESSIONS_DB
|
||||
if not db_path.exists():
|
||||
return examples
|
||||
|
||||
try:
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
|
||||
rows = conn.execute(
|
||||
"SELECT messages FROM sessions WHERE session_id = ? ORDER BY created_at DESC LIMIT 1",
|
||||
(session_id,),
|
||||
).fetchone()
|
||||
|
||||
if not rows:
|
||||
conn.close()
|
||||
return examples
|
||||
|
||||
messages = json.loads(rows["messages"])
|
||||
|
||||
# Extract tool calls from assistant messages
|
||||
for msg in messages:
|
||||
if msg.get("role") != "assistant":
|
||||
continue
|
||||
tool_calls = msg.get("tool_calls", [])
|
||||
if not tool_calls:
|
||||
continue
|
||||
|
||||
for tc in tool_calls:
|
||||
if len(examples) >= max_examples:
|
||||
break
|
||||
|
||||
fn = tc.get("function", {})
|
||||
name = fn.get("name", "")
|
||||
if not name:
|
||||
continue
|
||||
|
||||
try:
|
||||
args = json.loads(fn.get("arguments", "{}"))
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
args = {}
|
||||
|
||||
# Find the corresponding tool result
|
||||
result_preview = ""
|
||||
success = True
|
||||
tc_id = tc.get("id", "")
|
||||
|
||||
for result_msg in messages:
|
||||
if (result_msg.get("role") == "tool"
|
||||
and result_msg.get("tool_call_id") == tc_id):
|
||||
content = result_msg.get("content", "")
|
||||
result_preview = str(content)[:200]
|
||||
# Heuristic: errors contain common failure markers
|
||||
if any(marker in result_preview.lower() for marker in ("error", "failed", "traceback", "exception")):
|
||||
success = False
|
||||
break
|
||||
|
||||
examples.append(ToolCallExample(
|
||||
tool_name=name,
|
||||
args=args,
|
||||
result_preview=result_preview,
|
||||
success=success,
|
||||
))
|
||||
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return examples
|
||||
|
||||
def create_template(
|
||||
self,
|
||||
session_id: str,
|
||||
name: Optional[str] = None,
|
||||
description: str = "",
|
||||
max_examples: int = DEFAULT_MAX_EXAMPLES,
|
||||
) -> Optional[SessionTemplate]:
|
||||
"""Create a template from a session's tool call history."""
|
||||
examples = self.extract_from_session(session_id, max_examples)
|
||||
if not examples:
|
||||
return None
|
||||
|
||||
tool_calls_for_type = [{"name": e.tool_name} for e in examples]
|
||||
task_type = self.classify_task_type(tool_calls_for_type)
|
||||
|
||||
template_name = name or f"{task_type.value}_{session_id[:8]}"
|
||||
|
||||
from datetime import datetime
|
||||
template = SessionTemplate(
|
||||
name=template_name,
|
||||
task_type=task_type,
|
||||
examples=examples,
|
||||
source_session_id=session_id,
|
||||
created_at=datetime.utcnow().isoformat() + "Z",
|
||||
description=description or f"Auto-extracted from {session_id}",
|
||||
)
|
||||
|
||||
self.save_template(template)
|
||||
return template
|
||||
|
||||
def save_template(self, template: SessionTemplate) -> Path:
|
||||
"""Save a template to disk."""
|
||||
path = self.templates_dir / f"{template.name}.json"
|
||||
data = {
|
||||
"name": template.name,
|
||||
"task_type": template.task_type.value,
|
||||
"examples": [asdict(e) for e in template.examples],
|
||||
"source_session_id": template.source_session_id,
|
||||
"created_at": template.created_at,
|
||||
"usage_count": template.usage_count,
|
||||
"description": template.description,
|
||||
}
|
||||
path.write_text(json.dumps(data, indent=2, sort_keys=True) + "\n")
|
||||
return path
|
||||
|
||||
def get_template(self, task_type: TaskType) -> Optional[SessionTemplate]:
|
||||
"""Get the best template for a given task type."""
|
||||
templates = self.list_templates(task_type)
|
||||
if not templates:
|
||||
return None
|
||||
|
||||
# Prefer templates with more usage (proven useful)
|
||||
templates.sort(key=lambda t: t.usage_count, reverse=True)
|
||||
return templates[0]
|
||||
|
||||
def list_templates(self, task_type: Optional[TaskType] = None) -> list[SessionTemplate]:
|
||||
"""List all templates, optionally filtered by type."""
|
||||
templates: list[SessionTemplate] = []
|
||||
|
||||
for path in sorted(self.templates_dir.glob("*.json")):
|
||||
try:
|
||||
data = json.loads(path.read_text())
|
||||
examples = [ToolCallExample(**e) for e in data.get("examples", [])]
|
||||
template = SessionTemplate(
|
||||
name=data["name"],
|
||||
task_type=TaskType(data["task_type"]),
|
||||
examples=examples,
|
||||
source_session_id=data.get("source_session_id", ""),
|
||||
created_at=data.get("created_at", ""),
|
||||
usage_count=data.get("usage_count", 0),
|
||||
description=data.get("description", ""),
|
||||
)
|
||||
if task_type is None or template.task_type == task_type:
|
||||
templates.append(template)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
return templates
|
||||
|
||||
def delete_template(self, name: str) -> bool:
|
||||
"""Delete a template by name."""
|
||||
path = self.templates_dir / f"{name}.json"
|
||||
if path.exists():
|
||||
path.unlink()
|
||||
return True
|
||||
return False
|
||||
|
||||
def inject_into_messages(
|
||||
self,
|
||||
template: SessionTemplate,
|
||||
messages: list[dict[str, Any]],
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Inject template examples into a session's messages.
|
||||
|
||||
Inserts tool call examples after system messages to establish
|
||||
feedback loops early.
|
||||
"""
|
||||
if not template or not template.examples:
|
||||
return messages
|
||||
|
||||
# Build injection messages
|
||||
injection: list[dict[str, Any]] = []
|
||||
|
||||
# System note about the template
|
||||
injection.append({
|
||||
"role": "system",
|
||||
"content": (
|
||||
f"[Session Template: '{template.name}' ({template.task_type.value})]\n"
|
||||
f"The following are examples of successful tool calls from a similar session. "
|
||||
f"Use them as patterns for your own tool usage."
|
||||
),
|
||||
})
|
||||
|
||||
# Add example tool call/result pairs
|
||||
for ex in template.examples:
|
||||
if not ex.success:
|
||||
continue # Only inject successful examples
|
||||
|
||||
injection.append({
|
||||
"role": "assistant",
|
||||
"content": None,
|
||||
"tool_calls": [{
|
||||
"id": f"template_{template.name}_{ex.tool_name}",
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": ex.tool_name,
|
||||
"arguments": json.dumps(ex.args),
|
||||
},
|
||||
}],
|
||||
})
|
||||
injection.append({
|
||||
"role": "tool",
|
||||
"tool_call_id": f"template_{template.name}_{ex.tool_name}",
|
||||
"content": ex.result_preview or "(example result)",
|
||||
})
|
||||
|
||||
# Find insertion point: after system messages
|
||||
insert_idx = 0
|
||||
for i, msg in enumerate(messages):
|
||||
if msg.get("role") == "system":
|
||||
insert_idx = i + 1
|
||||
else:
|
||||
break
|
||||
|
||||
# Insert
|
||||
result = messages[:insert_idx] + injection + messages[insert_idx:]
|
||||
|
||||
# Update usage count
|
||||
template.usage_count += 1
|
||||
self.save_template(template)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _cli():
|
||||
"""Simple CLI for session template management."""
|
||||
import argparse
|
||||
import sys
|
||||
|
||||
parser = argparse.ArgumentParser(description="Session template management")
|
||||
sub = parser.add_subparsers(dest="command")
|
||||
|
||||
# list
|
||||
list_cmd = sub.add_parser("list", help="List templates")
|
||||
list_cmd.add_argument("--type", choices=["code", "file", "research", "mixed"])
|
||||
|
||||
# create
|
||||
create_cmd = sub.add_parser("create", help="Create template from session")
|
||||
create_cmd.add_argument("session_id", help="Session ID to extract from")
|
||||
create_cmd.add_argument("--name", help="Template name")
|
||||
create_cmd.add_argument("--max-examples", type=int, default=10)
|
||||
|
||||
# delete
|
||||
delete_cmd = sub.add_parser("delete", help="Delete template")
|
||||
delete_cmd.add_argument("name", help="Template name")
|
||||
|
||||
args = parser.parse_args()
|
||||
tm = SessionTemplates()
|
||||
|
||||
if args.command == "list":
|
||||
task_type = TaskType(args.type) if args.type else None
|
||||
templates = tm.list_templates(task_type)
|
||||
if not templates:
|
||||
print("No templates found.")
|
||||
return
|
||||
for t in templates:
|
||||
print(f" {t.name:30s} {t.task_type.value:10s} {len(t.examples)} examples, used {t.usage_count}x")
|
||||
|
||||
elif args.command == "create":
|
||||
template = tm.create_template(args.session_id, name=args.name, max_examples=args.max_examples)
|
||||
if template:
|
||||
print(f"Created template: {template.name} ({template.task_type.value}, {len(template.examples)} examples)")
|
||||
else:
|
||||
print(f"No tool calls found in session {args.session_id}")
|
||||
sys.exit(1)
|
||||
|
||||
elif args.command == "delete":
|
||||
if tm.delete_template(args.name):
|
||||
print(f"Deleted template: {args.name}")
|
||||
else:
|
||||
print(f"Template not found: {args.name}")
|
||||
sys.exit(1)
|
||||
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
_cli()
|
||||
@@ -44,6 +44,51 @@ from typing import Dict, Any, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Skill modification log file — stores before/after snapshots for audit trail
|
||||
_MOD_LOG_DIR = get_hermes_home() / "cron" / "output"
|
||||
_MOD_LOG_FILE = get_hermes_home() / "skills" / ".modification_log.jsonl"
|
||||
|
||||
|
||||
def _log_skill_modification(
|
||||
action: str,
|
||||
skill_name: str,
|
||||
target_file: str,
|
||||
original_content: str,
|
||||
new_content: str,
|
||||
success: bool,
|
||||
error: str = None,
|
||||
) -> None:
|
||||
"""Log a skill modification with before/after snapshot for audit trail.
|
||||
|
||||
Appends JSONL entries to ~/.hermes/skills/.modification_log.jsonl.
|
||||
Failures in logging are silently swallowed — logging must never
|
||||
break the primary operation.
|
||||
"""
|
||||
try:
|
||||
import time
|
||||
entry = {
|
||||
"timestamp": time.time(),
|
||||
"action": action,
|
||||
"skill": skill_name,
|
||||
"file": target_file,
|
||||
"success": success,
|
||||
"original_len": len(original_content) if original_content else 0,
|
||||
"new_len": len(new_content) if new_content else 0,
|
||||
}
|
||||
if error:
|
||||
entry["error"] = error
|
||||
# Truncate snapshots to 2KB each for log hygiene
|
||||
if original_content:
|
||||
entry["original_preview"] = original_content[:2048]
|
||||
if new_content:
|
||||
entry["new_preview"] = new_content[:2048]
|
||||
|
||||
_MOD_LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(_MOD_LOG_FILE, "a", encoding="utf-8") as f:
|
||||
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
|
||||
except Exception:
|
||||
logger.debug("Failed to write skill modification log", exc_info=True)
|
||||
|
||||
# Import security scanner — agent-created skills get the same scrutiny as
|
||||
# community hub installs.
|
||||
try:
|
||||
@@ -339,31 +384,45 @@ def _create_skill(name: str, content: str, category: str = None) -> Dict[str, An
|
||||
|
||||
|
||||
def _edit_skill(name: str, content: str) -> Dict[str, Any]:
|
||||
"""Replace the SKILL.md of any existing skill (full rewrite)."""
|
||||
"""Replace the SKILL.md of any existing skill (full rewrite).
|
||||
|
||||
Poka-yoke: validates before writing, uses atomic write, and reverts
|
||||
to the original file on any failure.
|
||||
"""
|
||||
err = _validate_frontmatter(content)
|
||||
if err:
|
||||
return {"success": False, "error": err}
|
||||
return {"success": False, "error": f"Edit failed: {err} Original file preserved."}
|
||||
|
||||
err = _validate_content_size(content)
|
||||
if err:
|
||||
return {"success": False, "error": err}
|
||||
return {"success": False, "error": f"Edit failed: {err} Original file preserved."}
|
||||
|
||||
existing = _find_skill(name)
|
||||
if not existing:
|
||||
return {"success": False, "error": f"Skill '{name}' not found. Use skills_list() to see available skills."}
|
||||
|
||||
skill_md = existing["path"] / "SKILL.md"
|
||||
# Back up original content for rollback
|
||||
# Snapshot original for rollback
|
||||
original_content = skill_md.read_text(encoding="utf-8") if skill_md.exists() else None
|
||||
_atomic_write_text(skill_md, content)
|
||||
|
||||
try:
|
||||
_atomic_write_text(skill_md, content)
|
||||
except Exception as exc:
|
||||
_log_skill_modification("edit", name, "SKILL.md", original_content, content, False, str(exc))
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Edit failed: write error: {exc}. Original file preserved.",
|
||||
}
|
||||
|
||||
# Security scan — roll back on block
|
||||
scan_error = _security_scan_skill(existing["path"])
|
||||
if scan_error:
|
||||
if original_content is not None:
|
||||
_atomic_write_text(skill_md, original_content)
|
||||
return {"success": False, "error": scan_error}
|
||||
_log_skill_modification("edit", name, "SKILL.md", original_content, content, False, scan_error)
|
||||
return {"success": False, "error": f"Edit failed: {scan_error} Original file preserved."}
|
||||
|
||||
_log_skill_modification("edit", name, "SKILL.md", original_content, content, True)
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"Skill '{name}' updated.",
|
||||
@@ -380,6 +439,9 @@ def _patch_skill(
|
||||
) -> Dict[str, Any]:
|
||||
"""Targeted find-and-replace within a skill file.
|
||||
|
||||
Poka-yoke: validates old_string matches BEFORE writing, validates the
|
||||
result AFTER matching but BEFORE writing, and reverts on any failure.
|
||||
|
||||
Defaults to SKILL.md. Use file_path to patch a supporting file instead.
|
||||
Requires a unique match unless replace_all is True.
|
||||
"""
|
||||
@@ -423,7 +485,7 @@ def _patch_skill(
|
||||
preview = content[:500] + ("..." if len(content) > 500 else "")
|
||||
return {
|
||||
"success": False,
|
||||
"error": match_error,
|
||||
"error": f"Patch failed: {match_error} Original file preserved.",
|
||||
"file_preview": preview,
|
||||
}
|
||||
|
||||
@@ -431,7 +493,7 @@ def _patch_skill(
|
||||
target_label = "SKILL.md" if not file_path else file_path
|
||||
err = _validate_content_size(new_content, label=target_label)
|
||||
if err:
|
||||
return {"success": False, "error": err}
|
||||
return {"success": False, "error": f"Patch failed: {err} Original file preserved."}
|
||||
|
||||
# If patching SKILL.md, validate frontmatter is still intact
|
||||
if not file_path:
|
||||
@@ -439,18 +501,27 @@ def _patch_skill(
|
||||
if err:
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Patch would break SKILL.md structure: {err}",
|
||||
"error": f"Patch failed: would break SKILL.md structure: {err} Original file preserved.",
|
||||
}
|
||||
|
||||
original_content = content # for rollback
|
||||
_atomic_write_text(target, new_content)
|
||||
try:
|
||||
_atomic_write_text(target, new_content)
|
||||
except Exception as exc:
|
||||
_log_skill_modification("patch", name, target_label, original_content, new_content, False, str(exc))
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Patch failed: write error: {exc}. Original file preserved.",
|
||||
}
|
||||
|
||||
# Security scan — roll back on block
|
||||
scan_error = _security_scan_skill(skill_dir)
|
||||
if scan_error:
|
||||
_atomic_write_text(target, original_content)
|
||||
return {"success": False, "error": scan_error}
|
||||
_log_skill_modification("patch", name, target_label, original_content, new_content, False, scan_error)
|
||||
return {"success": False, "error": f"Patch failed: {scan_error} Original file preserved."}
|
||||
|
||||
_log_skill_modification("patch", name, target_label, original_content, new_content, True)
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"Patched {'SKILL.md' if not file_path else file_path} in skill '{name}' ({match_count} replacement{'s' if match_count > 1 else ''}).",
|
||||
@@ -478,7 +549,10 @@ def _delete_skill(name: str) -> Dict[str, Any]:
|
||||
|
||||
|
||||
def _write_file(name: str, file_path: str, file_content: str) -> Dict[str, Any]:
|
||||
"""Add or overwrite a supporting file within any skill directory."""
|
||||
"""Add or overwrite a supporting file within any skill directory.
|
||||
|
||||
Poka-yoke: reverts to original on failure.
|
||||
"""
|
||||
err = _validate_file_path(file_path)
|
||||
if err:
|
||||
return {"success": False, "error": err}
|
||||
@@ -499,7 +573,7 @@ def _write_file(name: str, file_path: str, file_content: str) -> Dict[str, Any]:
|
||||
}
|
||||
err = _validate_content_size(file_content, label=file_path)
|
||||
if err:
|
||||
return {"success": False, "error": err}
|
||||
return {"success": False, "error": f"Write failed: {err} Original file preserved."}
|
||||
|
||||
existing = _find_skill(name)
|
||||
if not existing:
|
||||
@@ -507,9 +581,17 @@ def _write_file(name: str, file_path: str, file_content: str) -> Dict[str, Any]:
|
||||
|
||||
target = existing["path"] / file_path
|
||||
target.parent.mkdir(parents=True, exist_ok=True)
|
||||
# Back up for rollback
|
||||
# Snapshot for rollback
|
||||
original_content = target.read_text(encoding="utf-8") if target.exists() else None
|
||||
_atomic_write_text(target, file_content)
|
||||
|
||||
try:
|
||||
_atomic_write_text(target, file_content)
|
||||
except Exception as exc:
|
||||
_log_skill_modification("write_file", name, file_path, original_content, file_content, False, str(exc))
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Write failed: {exc}. Original file preserved.",
|
||||
}
|
||||
|
||||
# Security scan — roll back on block
|
||||
scan_error = _security_scan_skill(existing["path"])
|
||||
@@ -518,8 +600,10 @@ def _write_file(name: str, file_path: str, file_content: str) -> Dict[str, Any]:
|
||||
_atomic_write_text(target, original_content)
|
||||
else:
|
||||
target.unlink(missing_ok=True)
|
||||
return {"success": False, "error": scan_error}
|
||||
_log_skill_modification("write_file", name, file_path, original_content, file_content, False, scan_error)
|
||||
return {"success": False, "error": f"Write failed: {scan_error} Original file preserved."}
|
||||
|
||||
_log_skill_modification("write_file", name, file_path, original_content, file_content, True)
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"File '{file_path}' written to skill '{name}'.",
|
||||
@@ -554,6 +638,8 @@ def _remove_file(name: str, file_path: str) -> Dict[str, Any]:
|
||||
"available_files": available if available else None,
|
||||
}
|
||||
|
||||
# Snapshot for potential undo
|
||||
removed_content = target.read_text(encoding="utf-8")
|
||||
target.unlink()
|
||||
|
||||
# Clean up empty subdirectories
|
||||
@@ -561,12 +647,96 @@ def _remove_file(name: str, file_path: str) -> Dict[str, Any]:
|
||||
if parent != skill_dir and parent.exists() and not any(parent.iterdir()):
|
||||
parent.rmdir()
|
||||
|
||||
_log_skill_modification("remove_file", name, file_path, removed_content, None, True)
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"File '{file_path}' removed from skill '{name}'.",
|
||||
}
|
||||
|
||||
|
||||
def _validate_skill(name: str = None) -> Dict[str, Any]:
|
||||
"""Validate one or all skills for structural integrity.
|
||||
|
||||
Checks: valid YAML frontmatter, non-empty body, required fields
|
||||
(name, description), and file readability.
|
||||
|
||||
Pass name=None to validate all skills.
|
||||
"""
|
||||
from agent.skill_utils import get_all_skills_dirs
|
||||
|
||||
results = []
|
||||
errors = 0
|
||||
|
||||
dirs_to_scan = get_all_skills_dirs()
|
||||
for skills_dir in dirs_to_scan:
|
||||
if not skills_dir.exists():
|
||||
continue
|
||||
for skill_md in skills_dir.rglob("SKILL.md"):
|
||||
skill_name = skill_md.parent.name
|
||||
if name and skill_name != name:
|
||||
continue
|
||||
|
||||
issues = []
|
||||
try:
|
||||
content = skill_md.read_text(encoding="utf-8")
|
||||
except Exception as exc:
|
||||
issues.append(f"Cannot read file: {exc}")
|
||||
results.append({"skill": skill_name, "path": str(skill_md), "valid": False, "issues": issues})
|
||||
errors += 1
|
||||
continue
|
||||
|
||||
# Check frontmatter
|
||||
fm_err = _validate_frontmatter(content)
|
||||
if fm_err:
|
||||
issues.append(fm_err)
|
||||
|
||||
# Check YAML parse and required fields
|
||||
if content.startswith("---"):
|
||||
import re as _re
|
||||
end_match = _re.search(r'\n---\s*\n', content[3:])
|
||||
if end_match:
|
||||
yaml_content = content[3:end_match.start() + 3]
|
||||
try:
|
||||
parsed = yaml.safe_load(yaml_content)
|
||||
if isinstance(parsed, dict):
|
||||
if not parsed.get("name"):
|
||||
issues.append("Missing 'name' in frontmatter")
|
||||
if not parsed.get("description"):
|
||||
issues.append("Missing 'description' in frontmatter")
|
||||
else:
|
||||
issues.append("Frontmatter is not a YAML mapping")
|
||||
except yaml.YAMLError as e:
|
||||
issues.append(f"YAML parse error: {e}")
|
||||
else:
|
||||
issues.append("Frontmatter not properly closed")
|
||||
else:
|
||||
issues.append("File does not start with YAML frontmatter (---)")
|
||||
|
||||
# Check body is non-empty
|
||||
if content.startswith("---"):
|
||||
import re as _re
|
||||
end_match = _re.search(r'\n---\s*\n', content[3:])
|
||||
if end_match:
|
||||
body = content[end_match.end() + 3:].strip()
|
||||
if not body:
|
||||
issues.append("Empty body after frontmatter")
|
||||
|
||||
valid = len(issues) == 0
|
||||
if not valid:
|
||||
errors += 1
|
||||
results.append({"skill": skill_name, "path": str(skill_md), "valid": valid, "issues": issues})
|
||||
|
||||
if name and not results:
|
||||
return {"success": False, "error": f"Skill '{name}' not found."}
|
||||
|
||||
return {
|
||||
"success": errors == 0,
|
||||
"total": len(results),
|
||||
"errors": errors,
|
||||
"results": results,
|
||||
}
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Main entry point
|
||||
# =============================================================================
|
||||
@@ -619,8 +789,11 @@ def skill_manage(
|
||||
return json.dumps({"success": False, "error": "file_path is required for 'remove_file'."}, ensure_ascii=False)
|
||||
result = _remove_file(name, file_path)
|
||||
|
||||
elif action == "validate":
|
||||
result = _validate_skill(name if name else None)
|
||||
|
||||
else:
|
||||
result = {"success": False, "error": f"Unknown action '{action}'. Use: create, edit, patch, delete, write_file, remove_file"}
|
||||
result = {"success": False, "error": f"Unknown action '{action}'. Use: create, edit, patch, delete, write_file, remove_file, validate"}
|
||||
|
||||
if result.get("success"):
|
||||
try:
|
||||
@@ -638,38 +811,40 @@ def skill_manage(
|
||||
|
||||
SKILL_MANAGE_SCHEMA = {
|
||||
"name": "skill_manage",
|
||||
"description": (
|
||||
"Manage skills (create, update, delete). Skills are your procedural "
|
||||
"memory — reusable approaches for recurring task types. "
|
||||
"New skills go to ~/.hermes/skills/; existing skills can be modified wherever they live.\n\n"
|
||||
"Actions: create (full SKILL.md + optional category), "
|
||||
"patch (old_string/new_string — preferred for fixes), "
|
||||
"edit (full SKILL.md rewrite — major overhauls only), "
|
||||
"delete, write_file, remove_file.\n\n"
|
||||
"Create when: complex task succeeded (5+ calls), errors overcome, "
|
||||
"user-corrected approach worked, non-trivial workflow discovered, "
|
||||
"or user asks you to remember a procedure.\n"
|
||||
"Update when: instructions stale/wrong, OS-specific failures, "
|
||||
"missing steps or pitfalls found during use. "
|
||||
"If you used a skill and hit issues not covered by it, patch it immediately.\n\n"
|
||||
"After difficult/iterative tasks, offer to save as a skill. "
|
||||
"Skip for simple one-offs. Confirm with user before creating/deleting.\n\n"
|
||||
"Good skills: trigger conditions, numbered steps with exact commands, "
|
||||
"pitfalls section, verification steps. Use skill_view() to see format examples."
|
||||
),
|
||||
"description": (
|
||||
"Manage skills (create, update, delete, validate). Skills are your procedural "
|
||||
"memory \u2014 reusable approaches for recurring task types. "
|
||||
"New skills go to ~/.hermes/skills/; existing skills can be modified wherever they live.\n\n"
|
||||
"Actions: create (full SKILL.md + optional category), "
|
||||
"patch (old_string/new_string \u2014 preferred for fixes), "
|
||||
"edit (full SKILL.md rewrite \u2014 major overhauls only), "
|
||||
"delete, write_file, remove_file, "
|
||||
"validate (check all skills for structural integrity).\n\n"
|
||||
"Create when: complex task succeeded (5+ calls), errors overcome, "
|
||||
"user-corrected approach worked, non-trivial workflow discovered, "
|
||||
"or user asks you to remember a procedure.\n"
|
||||
"Update when: instructions stale/wrong, OS-specific failures, "
|
||||
"missing steps or pitfalls found during use. "
|
||||
"If you used a skill and hit issues not covered by it, patch it immediately.\n\n"
|
||||
"After difficult/iterative tasks, offer to save as a skill. "
|
||||
"Skip for simple one-offs. Confirm with user before creating/deleting.\n\n"
|
||||
"Good skills: trigger conditions, numbered steps with exact commands, "
|
||||
"pitfalls section, verification steps. Use skill_view() to see format examples."
|
||||
),
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"action": {
|
||||
"type": "string",
|
||||
"enum": ["create", "patch", "edit", "delete", "write_file", "remove_file"],
|
||||
"enum": ["create", "patch", "edit", "delete", "write_file", "remove_file", "validate"],
|
||||
"description": "The action to perform."
|
||||
},
|
||||
"name": {
|
||||
"type": "string",
|
||||
"description": (
|
||||
"Skill name (lowercase, hyphens/underscores, max 64 chars). "
|
||||
"Must match an existing skill for patch/edit/delete/write_file/remove_file."
|
||||
"Required for create/patch/edit/delete/write_file/remove_file. "
|
||||
"Optional for validate: omit to check all skills, provide to check one."
|
||||
)
|
||||
},
|
||||
"content": {
|
||||
|
||||
Reference in New Issue
Block a user