Compare commits

..

2 Commits

Author SHA1 Message Date
f4a75baca0 test(templates): Add tests for session templates
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 33s
Resolves #329
2026-04-14 11:41:25 +00:00
33d80f27ff feat(templates): Session templates for code-first seeding (#329)
Implement session templates based on research: code-heavy sessions improve over time.

Features:
- Task type classification (code/file/research/mixed)
- Template extraction from sessions
- Template storage in ~/.hermes/session-templates/
- Template injection into new sessions
- Tags support
- Usage tracking
- CLI: list/create/delete/stats

Resolves #329
2026-04-14 11:39:32 +00:00
5 changed files with 434 additions and 386 deletions

View File

@@ -0,0 +1,92 @@
"""Tests for session templates."""
import json
import pytest
import tempfile
from pathlib import Path
from tools.session_templates import Templates, Template, ToolExample, TaskType
class TestClassification:
def test_code(self):
t = Templates()
assert t.classify([{"tool_name": "execute_code"}] * 4 + [{"tool_name": "read_file"}]) == TaskType.CODE
def test_file(self):
t = Templates()
assert t.classify([{"tool_name": "read_file"}] * 4 + [{"tool_name": "execute_code"}]) == TaskType.FILE
def test_mixed(self):
t = Templates()
assert t.classify([{"tool_name": "execute_code"}, {"tool_name": "read_file"}]) == TaskType.MIXED
class TestToolExample:
def test_roundtrip(self):
e = ToolExample("execute_code", {"code": "print('hi')"}, "hi", True, 0)
d = e.to_dict()
e2 = ToolExample.from_dict(d)
assert e2.tool_name == "execute_code"
assert e2.result == "hi"
class TestTemplate:
def test_roundtrip(self):
examples = [ToolExample("execute_code", {}, "result", True)]
t = Template("test", TaskType.CODE, examples)
d = t.to_dict()
t2 = Template.from_dict(d)
assert t2.name == "test"
assert t2.task_type == TaskType.CODE
class TestTemplates:
def test_create_list(self):
with tempfile.TemporaryDirectory() as d:
ts = Templates(Path(d))
examples = [ToolExample("execute_code", {"code": "print('hi')"}, "hi", True)]
t = Template("test", TaskType.CODE, examples)
ts.templates["test"] = t
ts._save(t)
assert len(ts.list()) == 1
def test_get(self):
with tempfile.TemporaryDirectory() as d:
ts = Templates(Path(d))
ts.templates["code"] = Template("code", TaskType.CODE, [])
ts.templates["file"] = Template("file", TaskType.FILE, [])
assert ts.get(TaskType.CODE).name == "code"
assert ts.get(TaskType.FILE).name == "file"
assert ts.get(TaskType.RESEARCH) is None
def test_inject(self):
with tempfile.TemporaryDirectory() as d:
ts = Templates(Path(d))
examples = [ToolExample("execute_code", {"code": "print('hi')"}, "hi", True)]
t = Template("test", TaskType.CODE, examples)
ts.templates["test"] = t
messages = [{"role": "system", "content": "You are helpful."}]
result = ts.inject(t, messages)
assert len(result) > len(messages)
assert t.used == 1
def test_delete(self):
with tempfile.TemporaryDirectory() as d:
ts = Templates(Path(d))
t = Template("test", TaskType.CODE, [])
ts.templates["test"] = t
ts._save(t)
assert ts.delete("test")
assert "test" not in ts.templates
def test_stats(self):
with tempfile.TemporaryDirectory() as d:
ts = Templates(Path(d))
ts.templates["a"] = Template("a", TaskType.CODE, [ToolExample("x", {}, "", True)])
ts.templates["b"] = Template("b", TaskType.FILE, [ToolExample("y", {}, "", True)])
s = ts.stats()
assert s["total"] == 2
assert s["examples"] == 2
if __name__ == "__main__":
pytest.main([__file__])

View File

@@ -1,152 +0,0 @@
"""
Tests for improved error messages in skill_manager_tool (issue #624).
Verifies that error messages include file paths, context, and suggestions.
"""
import pytest
from pathlib import Path
from unittest.mock import patch, MagicMock
from tools.skill_manager_tool import _format_error, _edit_skill, _patch_skill, skill_manage
class TestFormatError:
"""Test the _format_error helper function."""
def test_basic_error(self):
"""Test basic error formatting."""
result = _format_error("Something went wrong")
assert result["success"] is False
assert "Something went wrong" in result["error"]
assert result["skill_name"] is None
assert result["file_path"] is None
def test_with_skill_name(self):
"""Test error with skill name."""
result = _format_error("Failed", skill_name="test-skill")
assert "test-skill" in result["error"]
assert result["skill_name"] == "test-skill"
def test_with_file_path(self):
"""Test error with file path."""
result = _format_error("Failed", file_path="/path/to/SKILL.md")
assert "/path/to/SKILL.md" in result["error"]
assert result["file_path"] == "/path/to/SKILL.md"
def test_with_suggestion(self):
"""Test error with suggestion."""
result = _format_error("Failed", suggestion="Try again")
assert "Suggestion: Try again" in result["error"]
assert result["suggestion"] == "Try again"
def test_with_context(self):
"""Test error with context dict."""
result = _format_error("Failed", context={"line": 5, "found": "x"})
assert "line: 5" in result["error"]
assert "found: x" in result["error"]
def test_all_fields(self):
"""Test error with all fields."""
result = _format_error(
"Pattern match failed",
skill_name="my-skill",
file_path="/skills/my-skill/SKILL.md",
suggestion="Check whitespace",
context={"expected": "foo", "found": "bar"}
)
assert "Pattern match failed" in result["error"]
assert "Skill: my-skill" in result["error"]
assert "File: /skills/my-skill/SKILL.md" in result["error"]
assert "Suggestion: Check whitespace" in result["error"]
assert "expected: foo" in result["error"]
class TestEditSkillErrors:
"""Test improved error messages in _edit_skill."""
@patch('tools.skill_manager_tool._find_skill')
def test_skill_not_found(self, mock_find):
"""Test skill not found error includes suggestion."""
mock_find.return_value = None
# Provide valid content with frontmatter so it passes validation
valid_content = """---
name: test
description: Test skill
---
Body content here.
"""
result = _edit_skill("nonexistent", valid_content)
assert result["success"] is False
assert "nonexistent" in result["error"]
assert "skills_list()" in result.get("suggestion", "")
@patch('tools.skill_manager_tool._find_skill')
def test_yaml_parse_error_includes_file_path_and_line_number(self, mock_find, tmp_path):
"""Invalid YAML should report target file path and parser line information."""
skill_dir = tmp_path / "my-skill"
skill_dir.mkdir()
(skill_dir / "SKILL.md").write_text("old", encoding="utf-8")
mock_find.return_value = {"path": skill_dir}
bad_content = """---
name: my-skill
description: [broken
---
Body.
"""
result = _edit_skill("my-skill", bad_content)
assert result["success"] is False
assert str(skill_dir / "SKILL.md") in result["error"]
assert "line" in result["error"].lower()
class TestPatchSkillErrors:
"""Test improved error messages in _patch_skill."""
def test_old_string_required(self):
"""Test old_string required error includes suggestion."""
result = _patch_skill("test-skill", None, "new")
assert result["success"] is False
assert "old_string is required" in result["error"]
assert "suggestion" in result
def test_new_string_required(self):
"""Test new_string required error includes suggestion."""
result = _patch_skill("test-skill", "old", None)
assert result["success"] is False
assert "new_string is required" in result["error"]
assert "suggestion" in result
@patch('tools.skill_manager_tool._find_skill')
def test_skill_not_found(self, mock_find):
"""Test skill not found error includes suggestion."""
mock_find.return_value = None
result = _patch_skill("nonexistent", "old", "new")
assert result["success"] is False
assert "nonexistent" in result["error"]
assert "skills_list()" in result.get("suggestion", "")
@patch('tools.skill_manager_tool._find_skill')
def test_pattern_match_error_includes_state_info(self, mock_find, tmp_path):
"""Patch failures should include file path and patch state info."""
skill_dir = tmp_path / "state-skill"
skill_dir.mkdir()
target = skill_dir / "SKILL.md"
target.write_text("---\nname: state-skill\ndescription: desc\n---\n\nBody content here.\n", encoding="utf-8")
mock_find.return_value = {"path": skill_dir}
result = _patch_skill("state-skill", "missing pattern", "new text", replace_all=False)
assert result["success"] is False
assert str(target) in result["error"]
assert "replace_all" in result["error"]
assert "False" in result["error"]
class TestSkillManageEntryPoint:
def test_patch_missing_old_string_returns_json_error(self):
result = skill_manage(action="patch", name="demo-skill", old_string="", new_string="x")
assert isinstance(result, str)
assert "old_string is required" in result
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -21,18 +21,6 @@ from typing import Callable, Dict, List, Optional, Set
logger = logging.getLogger(__name__)
def tool_error(message: str, success: bool = False, **extra) -> str:
"""Return a standardized JSON error payload for tool handlers.
Many tools import this helper directly from the registry module.
Keeping it here avoids circular helper imports and ensures a consistent
error envelope across tools.
"""
payload = {"success": success, "error": message}
payload.update(extra)
return json.dumps(payload, ensure_ascii=False)
class ToolEntry:
"""Metadata for a single registered tool."""

275
tools/session_templates.py Normal file
View File

@@ -0,0 +1,275 @@
"""
Session templates for code-first seeding.
Research: Code-heavy sessions (execute_code dominant in first 30 turns) improve over time.
File-heavy sessions degrade. Key is deterministic feedback loops.
"""
import json
import logging
import sqlite3
import time
from pathlib import Path
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict, field
from enum import Enum
logger = logging.getLogger(__name__)
TEMPLATE_DIR = Path.home() / ".hermes" / "session-templates"
class TaskType(Enum):
CODE = "code"
FILE = "file"
RESEARCH = "research"
MIXED = "mixed"
@dataclass
class ToolExample:
tool_name: str
arguments: Dict[str, Any]
result: str
success: bool
turn: int = 0
def to_dict(self):
return asdict(self)
@classmethod
def from_dict(cls, data):
return cls(**data)
@dataclass
class Template:
name: str
task_type: TaskType
examples: List[ToolExample]
desc: str = ""
created: float = 0.0
used: int = 0
session_id: Optional[str] = None
tags: List[str] = field(default_factory=list)
def __post_init__(self):
if self.created == 0.0:
self.created = time.time()
def to_dict(self):
d = asdict(self)
d['task_type'] = self.task_type.value
return d
@classmethod
def from_dict(cls, data):
data['task_type'] = TaskType(data['task_type'])
data['examples'] = [ToolExample.from_dict(e) for e in data.get('examples', [])]
return cls(**data)
class Templates:
def __init__(self, dir=None):
self.dir = dir or TEMPLATE_DIR
self.dir.mkdir(parents=True, exist_ok=True)
self.templates = {}
self._load()
def _load(self):
for f in self.dir.glob("*.json"):
try:
with open(f) as fh:
t = Template.from_dict(json.load(fh))
self.templates[t.name] = t
except Exception as e:
logger.warning(f"Load failed {f}: {e}")
def _save(self, t):
with open(self.dir / f"{t.name}.json", 'w') as f:
json.dump(t.to_dict(), f, indent=2)
def classify(self, calls):
if not calls:
return TaskType.MIXED
code = {'execute_code', 'code_execution'}
file_ops = {'read_file', 'write_file', 'patch', 'search_files'}
research = {'web_search', 'web_fetch', 'browser_navigate'}
names = [c.get('tool_name', '') for c in calls]
total = len(names)
if sum(1 for n in names if n in code) / total > 0.6:
return TaskType.CODE
if sum(1 for n in names if n in file_ops) / total > 0.6:
return TaskType.FILE
if sum(1 for n in names if n in research) / total > 0.6:
return TaskType.RESEARCH
return TaskType.MIXED
def extract(self, session_id, max_n=10):
db = Path.home() / ".hermes" / "state.db"
if not db.exists():
return []
try:
conn = sqlite3.connect(str(db))
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT role, content, tool_calls FROM messages WHERE session_id=? ORDER BY timestamp LIMIT 100",
(session_id,)
).fetchall()
conn.close()
examples = []
turn = 0
for r in rows:
if len(examples) >= max_n:
break
if r['role'] == 'assistant' and r['tool_calls']:
try:
for tc in json.loads(r['tool_calls']):
if len(examples) >= max_n:
break
name = tc.get('function', {}).get('name')
if not name:
continue
try:
args = json.loads(tc.get('function', {}).get('arguments', '{}'))
except:
args = {}
examples.append(ToolExample(name, args, "", True, turn))
turn += 1
except:
continue
elif r['role'] == 'tool' and examples and examples[-1].result == "":
examples[-1].result = r['content'] or ""
return examples
except Exception as e:
logger.error(f"Extract failed: {e}")
return []
def create(self, session_id, name=None, task_type=None, max_n=10, desc="", tags=None):
examples = self.extract(session_id, max_n)
if not examples:
return None
if task_type is None:
task_type = self.classify([{'tool_name': e.tool_name} for e in examples])
if name is None:
name = f"{task_type.value}_{session_id[:8]}_{int(time.time())}"
t = Template(name, task_type, examples, desc or f"{len(examples)} examples", time.time(), 0, session_id, tags or [])
self.templates[name] = t
self._save(t)
logger.info(f"Created {name} with {len(examples)} examples")
return t
def get(self, task_type, tags=None):
matching = [t for t in self.templates.values() if t.task_type == task_type]
if tags:
matching = [t for t in matching if any(tag in t.tags for tag in tags)]
if not matching:
return None
matching.sort(key=lambda t: t.used)
return matching[0]
def inject(self, template, messages):
if not template.examples:
return messages
injection = [{
"role": "system",
"content": f"Template: {template.name} ({template.task_type.value})\n{template.desc}"
}]
for i, ex in enumerate(template.examples):
injection.append({
"role": "assistant",
"content": None,
"tool_calls": [{
"id": f"tpl_{i}",
"type": "function",
"function": {"name": ex.tool_name, "arguments": json.dumps(ex.arguments)}
}]
})
injection.append({
"role": "tool",
"tool_call_id": f"tpl_{i}",
"content": ex.result
})
idx = 0
for i, m in enumerate(messages):
if m.get("role") != "system":
break
idx = i + 1
for i, m in enumerate(injection):
messages.insert(idx + i, m)
template.used += 1
self._save(template)
return messages
def list(self, task_type=None, tags=None):
ts = list(self.templates.values())
if task_type:
ts = [t for t in ts if t.task_type == task_type]
if tags:
ts = [t for t in ts if any(tag in t.tags for tag in tags)]
ts.sort(key=lambda t: t.created, reverse=True)
return ts
def delete(self, name):
if name not in self.templates:
return False
del self.templates[name]
p = self.dir / f"{name}.json"
if p.exists():
p.unlink()
return True
def stats(self):
if not self.templates:
return {"total": 0, "by_type": {}, "examples": 0, "usage": 0}
by_type = {}
total_ex = 0
total_use = 0
for t in self.templates.values():
by_type[t.task_type.value] = by_type.get(t.task_type.value, 0) + 1
total_ex += len(t.examples)
total_use += t.used
return {"total": len(self.templates), "by_type": by_type, "examples": total_ex, "usage": total_use}
if __name__ == "__main__":
import argparse
p = argparse.ArgumentParser()
s = p.add_subparsers(dest="cmd")
lp = s.add_parser("list")
lp.add_argument("--type", choices=["code", "file", "research", "mixed"])
lp.add_argument("--tags")
cp = s.add_parser("create")
cp.add_argument("session_id")
cp.add_argument("--name")
cp.add_argument("--type", choices=["code", "file", "research", "mixed"])
cp.add_argument("--max", type=int, default=10)
cp.add_argument("--desc")
cp.add_argument("--tags")
dp = s.add_parser("delete")
dp.add_argument("name")
sp = s.add_parser("stats")
args = p.parse_args()
ts = Templates()
if args.cmd == "list":
tt = TaskType(args.type) if args.type else None
tags = args.tags.split(",") if args.tags else None
for t in ts.list(tt, tags):
print(f"{t.name}: {t.task_type.value} ({len(t.examples)} ex, used {t.used}x)")
elif args.cmd == "create":
tt = TaskType(args.type) if args.type else None
tags = args.tags.split(",") if args.tags else None
t = ts.create(args.session_id, args.name, tt, args.max, args.desc or "", tags)
if t:
print(f"Created: {t.name} ({len(t.examples)} examples)")
else:
print("Failed")
elif args.cmd == "delete":
print("Deleted" if ts.delete(args.name) else "Not found")
elif args.cmd == "stats":
s = ts.stats()
print(f"Total: {s['total']}, Examples: {s['examples']}, Usage: {s['usage']}")
for k, v in s['by_type'].items():
print(f" {k}: {v}")
else:
p.print_help()

View File

@@ -40,7 +40,7 @@ import shutil
import tempfile
from pathlib import Path
from hermes_constants import get_hermes_home
from typing import Dict, Any, Optional, Tuple
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
@@ -53,57 +53,6 @@ except ImportError:
_GUARD_AVAILABLE = False
def _format_error(error_msg: str, skill_name: str = None, file_path: str = None,
suggestion: str = None, context: dict = None) -> dict:
"""Format an error response with rich context for debugging.
Args:
error_msg: The primary error message
skill_name: Name of the skill being operated on
file_path: Path to the file that failed
suggestion: Suggested action to fix the issue
context: Additional context dict (e.g., {'line': 5, 'found': 'x', 'expected': 'y'})
Returns:
Formatted error dict with success=False
"""
parts = [error_msg]
if skill_name:
parts.append(f"Skill: {skill_name}")
if file_path:
parts.append(f"File: {file_path}")
if context:
for key, value in context.items():
parts.append(f"{key}: {value}")
if suggestion:
parts.append(f"Suggestion: {suggestion}")
return {
"success": False,
"error": " | ".join(parts),
"skill_name": skill_name,
"file_path": file_path,
"suggestion": suggestion,
}
def _get_skill_file_path(skill_name: str, file_path: str = None) -> str:
"""Get the full file path for error messages."""
existing = _find_skill(skill_name)
if not existing:
return f"(skill '{skill_name}' not found)"
skill_dir = existing["path"]
if file_path:
return str(skill_dir / file_path)
return str(skill_dir / "SKILL.md")
def _security_scan_skill(skill_dir: Path) -> Optional[str]:
"""Scan a skill directory after write. Returns error string if blocked, else None."""
if not _GUARD_AVAILABLE:
@@ -143,6 +92,11 @@ VALID_NAME_RE = re.compile(r'^[a-z0-9][a-z0-9._-]*$')
ALLOWED_SUBDIRS = {"references", "templates", "scripts", "assets"}
def check_skill_manage_requirements() -> bool:
"""Skill management has no external requirements -- always available."""
return True
# =============================================================================
# Validation helpers
# =============================================================================
@@ -186,58 +140,43 @@ def _validate_category(category: Optional[str]) -> Optional[str]:
return None
def _validate_frontmatter_details(content: str) -> Tuple[Optional[str], Optional[dict]]:
"""Validate SKILL.md frontmatter and return optional structured context.
Returns:
(error_message, context_dict) where both are None when valid.
def _validate_frontmatter(content: str) -> Optional[str]:
"""
Validate that SKILL.md content has proper frontmatter with required fields.
Returns error message or None if valid.
"""
if not content.strip():
return "Content cannot be empty.", None
return "Content cannot be empty."
if not content.startswith("---"):
return "SKILL.md must start with YAML frontmatter (---). See existing skills for format.", None
return "SKILL.md must start with YAML frontmatter (---). See existing skills for format."
end_match = re.search(r'\n---\s*\n', content[3:])
if not end_match:
return "SKILL.md frontmatter is not closed. Ensure you have a closing '---' line.", None
return "SKILL.md frontmatter is not closed. Ensure you have a closing '---' line."
yaml_content = content[3:end_match.start() + 3]
try:
parsed = yaml.safe_load(yaml_content)
except yaml.YAMLError as e:
context = {}
problem_mark = getattr(e, "problem_mark", None)
if problem_mark is not None:
context["line"] = problem_mark.line + 1
context["column"] = problem_mark.column + 1
return f"YAML frontmatter parse error: {e}", (context or None)
return f"YAML frontmatter parse error: {e}"
if not isinstance(parsed, dict):
return "Frontmatter must be a YAML mapping (key: value pairs). Check for syntax errors in the YAML.", None
return "Frontmatter must be a YAML mapping (key: value pairs)."
if "name" not in parsed:
return "Frontmatter must include 'name' field.", None
return "Frontmatter must include 'name' field."
if "description" not in parsed:
return "Frontmatter must include 'description' field.", None
return "Frontmatter must include 'description' field."
if len(str(parsed["description"])) > MAX_DESCRIPTION_LENGTH:
return f"Description exceeds {MAX_DESCRIPTION_LENGTH} characters.", None
return f"Description exceeds {MAX_DESCRIPTION_LENGTH} characters."
body = content[end_match.end() + 3:].strip()
if not body:
return "SKILL.md must have content after the frontmatter (instructions, procedures, etc.).", None
return "SKILL.md must have content after the frontmatter (instructions, procedures, etc.)."
return None, None
def _validate_frontmatter(content: str) -> Optional[str]:
"""
Validate that SKILL.md content has proper frontmatter with required fields.
Returns error message or None if valid.
"""
err, _context = _validate_frontmatter_details(content)
return err
return None
def _validate_content_size(content: str, label: str = "SKILL.md") -> Optional[str]:
@@ -271,15 +210,7 @@ def _find_skill(name: str) -> Optional[Dict[str, Any]]:
{"path": Path} or None.
"""
from agent.skill_utils import get_all_skills_dirs
candidate_dirs = []
if isinstance(SKILLS_DIR, Path):
candidate_dirs.append(SKILLS_DIR)
for extra_dir in get_all_skills_dirs():
if extra_dir not in candidate_dirs:
candidate_dirs.append(extra_dir)
for skills_dir in candidate_dirs:
for skills_dir in get_all_skills_dirs():
if not skills_dir.exists():
continue
for skill_md in skills_dir.rglob("SKILL.md"):
@@ -293,15 +224,13 @@ def _validate_file_path(file_path: str) -> Optional[str]:
Validate a file path for write_file/remove_file.
Must be under an allowed subdirectory and not escape the skill dir.
"""
from tools.path_security import has_traversal_component
if not file_path:
return "file_path is required."
normalized = Path(file_path)
# Prevent path traversal
if has_traversal_component(file_path):
if ".." in normalized.parts:
return "Path traversal ('..') is not allowed."
# Must be under an allowed subdirectory
@@ -316,17 +245,6 @@ def _validate_file_path(file_path: str) -> Optional[str]:
return None
def _resolve_skill_target(skill_dir: Path, file_path: str) -> Tuple[Optional[Path], Optional[str]]:
"""Resolve a supporting-file path and ensure it stays within the skill directory."""
from tools.path_security import validate_within_dir
target = skill_dir / file_path
error = validate_within_dir(target, skill_dir)
if error:
return None, error
return target, None
def _atomic_write_text(file_path: Path, content: str, encoding: str = "utf-8") -> None:
"""
Atomically write text content to a file.
@@ -374,19 +292,10 @@ def _create_skill(name: str, content: str, category: str = None) -> Dict[str, An
if err:
return {"success": False, "error": err}
skill_dir = _resolve_skill_dir(name, category)
skill_md = skill_dir / "SKILL.md"
# Validate content
err, context = _validate_frontmatter_details(content)
err = _validate_frontmatter(content)
if err:
return _format_error(
err,
skill_name=name,
file_path=str(skill_md),
context=context,
suggestion="Fix the YAML frontmatter before creating the skill."
)
return {"success": False, "error": err}
err = _validate_content_size(content)
if err:
@@ -395,29 +304,24 @@ def _create_skill(name: str, content: str, category: str = None) -> Dict[str, An
# Check for name collisions across all directories
existing = _find_skill(name)
if existing:
return _format_error(
f"A skill named '{name}' already exists",
skill_name=name,
file_path=str(existing['path']),
suggestion="Use skill_manage(action='edit') to update the existing skill or choose a different name"
)
return {
"success": False,
"error": f"A skill named '{name}' already exists at {existing['path']}."
}
# Create the skill directory
skill_dir = _resolve_skill_dir(name, category)
skill_dir.mkdir(parents=True, exist_ok=True)
# Write SKILL.md atomically
skill_md = skill_dir / "SKILL.md"
_atomic_write_text(skill_md, content)
# Security scan — roll back on block
scan_error = _security_scan_skill(skill_dir)
if scan_error:
shutil.rmtree(skill_dir, ignore_errors=True)
return _format_error(
scan_error,
skill_name=name,
file_path=str(skill_dir),
suggestion="Review the security scan report and fix flagged issues"
)
return {"success": False, "error": scan_error}
result = {
"success": True,
@@ -436,30 +340,19 @@ def _create_skill(name: str, content: str, category: str = None) -> Dict[str, An
def _edit_skill(name: str, content: str) -> Dict[str, Any]:
"""Replace the SKILL.md of any existing skill (full rewrite)."""
existing = _find_skill(name)
if not existing:
return _format_error(
f"Skill '{name}' not found",
skill_name=name,
suggestion="Use skills_list() to see available skills or skill_manage(action='create') to create it"
)
skill_md = existing["path"] / "SKILL.md"
err, context = _validate_frontmatter_details(content)
err = _validate_frontmatter(content)
if err:
return _format_error(
err,
skill_name=name,
file_path=str(skill_md),
context=context,
suggestion="Fix the YAML frontmatter before updating the skill."
)
return {"success": False, "error": err}
err = _validate_content_size(content)
if err:
return {"success": False, "error": err}
existing = _find_skill(name)
if not existing:
return {"success": False, "error": f"Skill '{name}' not found. Use skills_list() to see available skills."}
skill_md = existing["path"] / "SKILL.md"
# Back up original content for rollback
original_content = skill_md.read_text(encoding="utf-8") if skill_md.exists() else None
_atomic_write_text(skill_md, content)
@@ -469,12 +362,7 @@ def _edit_skill(name: str, content: str) -> Dict[str, Any]:
if scan_error:
if original_content is not None:
_atomic_write_text(skill_md, original_content)
return _format_error(
scan_error,
skill_name=name,
file_path=str(skill_md),
suggestion="Review the security scan report and fix flagged issues"
)
return {"success": False, "error": scan_error}
return {
"success": True,
@@ -496,23 +384,13 @@ def _patch_skill(
Requires a unique match unless replace_all is True.
"""
if not old_string:
return _format_error(
"old_string is required for 'patch'",
suggestion="Provide the text to find in the skill file. Use skill_manage(action='edit') for full rewrites."
)
return {"success": False, "error": "old_string is required for 'patch'."}
if new_string is None:
return _format_error(
"new_string is required for 'patch'",
suggestion="Provide the replacement text. Use empty string '' to delete the matched text."
)
return {"success": False, "error": "new_string is required for 'patch'. Use an empty string to delete matched text."}
existing = _find_skill(name)
if not existing:
return _format_error(
f"Skill '{name}' not found",
skill_name=name,
suggestion="Use skills_list() to see available skills"
)
return {"success": False, "error": f"Skill '{name}' not found."}
skill_dir = existing["path"]
@@ -521,20 +399,13 @@ def _patch_skill(
err = _validate_file_path(file_path)
if err:
return {"success": False, "error": err}
target, err = _resolve_skill_target(skill_dir, file_path)
if err:
return {"success": False, "error": err}
target = skill_dir / file_path
else:
# Patching SKILL.md
target = skill_dir / "SKILL.md"
if not target.exists():
return _format_error(
f"File not found: {target.relative_to(skill_dir)}",
skill_name=name,
file_path=str(target),
suggestion=f"Check the file path. Available files in skill: {list(skill_dir.glob('**/*'))}"
)
return {"success": False, "error": f"File not found: {target.relative_to(skill_dir)}"}
content = target.read_text(encoding="utf-8")
@@ -550,18 +421,11 @@ def _patch_skill(
if match_error:
# Show a short preview of the file so the model can self-correct
preview = content[:500] + ("..." if len(content) > 500 else "")
return _format_error(
f"Pattern match failed: {match_error}",
skill_name=name,
file_path=str(target),
context={
"replace_all": replace_all,
"target_exists": target.exists(),
"content_chars": len(content),
"file_preview": preview[:200] + "..." if len(preview) > 200 else preview,
},
suggestion="Check for whitespace differences, indentation, or escaping issues in old_string"
)
return {
"success": False,
"error": match_error,
"file_preview": preview,
}
# Check size limit on the result
target_label = "SKILL.md" if not file_path else file_path
@@ -571,15 +435,12 @@ def _patch_skill(
# If patching SKILL.md, validate frontmatter is still intact
if not file_path:
err, validation_context = _validate_frontmatter_details(new_content)
err = _validate_frontmatter(new_content)
if err:
return _format_error(
f"Patch would break SKILL.md structure: {err}",
skill_name=name,
file_path=str(target),
context=validation_context,
suggestion="Ensure the patch doesn't corrupt YAML frontmatter (--- delimiters and key: value format)"
)
return {
"success": False,
"error": f"Patch would break SKILL.md structure: {err}",
}
original_content = content # for rollback
_atomic_write_text(target, new_content)
@@ -600,11 +461,7 @@ def _delete_skill(name: str) -> Dict[str, Any]:
"""Delete a skill."""
existing = _find_skill(name)
if not existing:
return _format_error(
f"Skill '{name}' not found",
skill_name=name,
suggestion="Use skills_list() to see available skills"
)
return {"success": False, "error": f"Skill '{name}' not found."}
skill_dir = existing["path"]
shutil.rmtree(skill_dir)
@@ -646,15 +503,9 @@ def _write_file(name: str, file_path: str, file_content: str) -> Dict[str, Any]:
existing = _find_skill(name)
if not existing:
return _format_error(
f"Skill '{name}' not found",
skill_name=name,
suggestion="Use skills_list() to see available skills"
)
return {"success": False, "error": f"Skill '{name}' not found. Create it first with action='create'."}
target, err = _resolve_skill_target(existing["path"], file_path)
if err:
return {"success": False, "error": err}
target = existing["path"] / file_path
target.parent.mkdir(parents=True, exist_ok=True)
# Back up for rollback
original_content = target.read_text(encoding="utf-8") if target.exists() else None
@@ -684,16 +535,10 @@ def _remove_file(name: str, file_path: str) -> Dict[str, Any]:
existing = _find_skill(name)
if not existing:
return _format_error(
f"Skill '{name}' not found",
skill_name=name,
suggestion="Use skills_list() to see available skills"
)
return {"success": False, "error": f"Skill '{name}' not found."}
skill_dir = existing["path"]
target, err = _resolve_skill_target(skill_dir, file_path)
if err:
return {"success": False, "error": err}
target = skill_dir / file_path
if not target.exists():
# List what's actually there for the model to see
available = []
@@ -744,19 +589,19 @@ def skill_manage(
"""
if action == "create":
if not content:
return tool_error("content is required for 'create'. Provide the full SKILL.md text (frontmatter + body).", success=False)
return json.dumps({"success": False, "error": "content is required for 'create'. Provide the full SKILL.md text (frontmatter + body)."}, ensure_ascii=False)
result = _create_skill(name, content, category)
elif action == "edit":
if not content:
return tool_error("content is required for 'edit'. Provide the full updated SKILL.md text.", success=False)
return json.dumps({"success": False, "error": "content is required for 'edit'. Provide the full updated SKILL.md text."}, ensure_ascii=False)
result = _edit_skill(name, content)
elif action == "patch":
if not old_string:
return tool_error("old_string is required for 'patch'. Provide the text to find.", success=False)
return json.dumps({"success": False, "error": "old_string is required for 'patch'. Provide the text to find."}, ensure_ascii=False)
if new_string is None:
return tool_error("new_string is required for 'patch'. Use empty string to delete matched text.", success=False)
return json.dumps({"success": False, "error": "new_string is required for 'patch'. Use empty string to delete matched text."}, ensure_ascii=False)
result = _patch_skill(name, old_string, new_string, file_path, replace_all)
elif action == "delete":
@@ -764,14 +609,14 @@ def skill_manage(
elif action == "write_file":
if not file_path:
return tool_error("file_path is required for 'write_file'. Example: 'references/api-guide.md'", success=False)
return json.dumps({"success": False, "error": "file_path is required for 'write_file'. Example: 'references/api-guide.md'"}, ensure_ascii=False)
if file_content is None:
return tool_error("file_content is required for 'write_file'.", success=False)
return json.dumps({"success": False, "error": "file_content is required for 'write_file'."}, ensure_ascii=False)
result = _write_file(name, file_path, file_content)
elif action == "remove_file":
if not file_path:
return tool_error("file_path is required for 'remove_file'.", success=False)
return json.dumps({"success": False, "error": "file_path is required for 'remove_file'."}, ensure_ascii=False)
result = _remove_file(name, file_path)
else:
@@ -882,7 +727,7 @@ SKILL_MANAGE_SCHEMA = {
# --- Registry ---
from tools.registry import registry, tool_error
from tools.registry import registry
registry.register(
name="skill_manage",