Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
7664dbb9ef feat: poka-yoke validate action with actionable feedback #626
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 22s
Adds skill_manage(action='validate', name='...') that checks an
existing skill and provides specific remediation steps for each issue.

13 checks with specific fix suggestions:
1. Skill exists (with fuzzy-match suggestions)
2. SKILL.md readable
3. Content non-empty
4. Frontmatter delimiter (---)
5. Frontmatter closing
6. YAML valid (with common error hints)
7. Frontmatter name field
8. Frontmatter description field
9. Body content after frontmatter
10. Content size limits
11. Linked files (references/, templates/, scripts/)
12. Naming convention
13. File organization (orphaned files)

Each issue includes: check name, FAIL/WARN status, message, and
a specific fix instruction (often with exact command to run).

Closes #626
2026-04-14 15:18:56 -04:00
Timmy
d9b891bef4 fix(#626): add validate action with actionable feedback to skill_manage
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 22s
The validate action provides:
- Specific issues with severity (error/warning)
- Actionable remediation steps for each issue
- Examples of correct formatting
- Suggestions for improvement
- Security scan integration

Checks performed:
1. SKILL.md exists
2. Frontmatter present and valid YAML
3. Required fields (name, description)
4. Body content present and structured
5. Content size limits
6. Supporting file sizes
7. Security scan

Refs #626
2026-04-14 14:03:54 -04:00
3 changed files with 521 additions and 694 deletions

View File

@@ -5258,80 +5258,6 @@ For more help on a command:
sessions_parser.set_defaults(func=cmd_sessions)
# Warm session command
warm_parser = subparsers.add_parser(
"warm",
help="Warm session provisioning",
description="Create pre-contextualized sessions from templates"
)
warm_subparsers = warm_parser.add_subparsers(dest="warm_command")
# Extract command
warm_extract = warm_subparsers.add_parser("extract", help="Extract template from session")
warm_extract.add_argument("session_id", help="Session ID to extract from")
warm_extract.add_argument("--name", "-n", required=True, help="Template name")
warm_extract.add_argument("--description", "-d", default="", help="Template description")
# List command
warm_subparsers.add_parser("list", help="List available templates")
# Test command
warm_test = warm_subparsers.add_parser("test", help="Test warm session creation")
warm_test.add_argument("template_id", help="Template ID")
warm_test.add_argument("message", help="Test message")
# Delete command
warm_delete = warm_subparsers.add_parser("delete", help="Delete a template")
warm_delete.add_argument("template_id", help="Template ID to delete")
warm_parser.set_defaults(func=cmd_warm)
# A/B testing command
ab_parser = subparsers.add_parser(
"ab-test",
help="A/B test warm vs cold sessions",
description="Framework for comparing warm and cold session performance"
)
ab_subparsers = ab_parser.add_subparsers(dest="ab_command")
# Create test
ab_create = ab_subparsers.add_parser("create", help="Create a new A/B test")
ab_create.add_argument("--task-id", required=True, help="Task ID")
ab_create.add_argument("--description", required=True, help="Task description")
ab_create.add_argument("--prompt", required=True, help="Test prompt")
ab_create.add_argument("--category", default="general", help="Task category")
ab_create.add_argument("--difficulty", default="medium", choices=["easy", "medium", "hard"])
# List tests
ab_subparsers.add_parser("list", help="List all A/B tests")
# Show test
ab_show = ab_subparsers.add_parser("show", help="Show test details")
ab_show.add_argument("test_id", help="Test ID")
# Analyze test
ab_analyze = ab_subparsers.add_parser("analyze", help="Analyze test results")
ab_analyze.add_argument("test_id", help="Test ID")
# Add result
ab_add = ab_subparsers.add_parser("add-result", help="Add a test result")
ab_add.add_argument("test_id", help="Test ID")
ab_add.add_argument("--session-type", required=True, choices=["cold", "warm"])
ab_add.add_argument("--session-id", required=True, help="Session ID")
ab_add.add_argument("--tool-calls", type=int, default=0)
ab_add.add_argument("--successful-calls", type=int, default=0)
ab_add.add_argument("--completion-time", type=float, default=0.0)
ab_add.add_argument("--success", action="store_true")
ab_add.add_argument("--notes", default="")
# Delete test
ab_delete = ab_subparsers.add_parser("delete", help="Delete a test")
ab_delete.add_argument("test_id", help="Test ID")
ab_parser.set_defaults(func=cmd_ab_test)
# =========================================================================
# insights command
# =========================================================================
@@ -5672,102 +5598,3 @@ Examples:
if __name__ == "__main__":
main()
def cmd_warm(args):
"""Handle warm session commands."""
from hermes_cli.colors import Colors, color
subcmd = getattr(args, 'warm_command', None)
if subcmd is None:
print(color("Warm Session Provisioning", Colors.CYAN))
print("\nCommands:")
print(" hermes warm extract SESSION_ID --name NAME - Extract template from session")
print(" hermes warm list - List available templates")
print(" hermes warm test TEMPLATE_ID MESSAGE - Test warm session")
print(" hermes warm delete TEMPLATE_ID - Delete a template")
return 0
try:
from tools.warm_session import warm_session_cli
args_list = []
if subcmd == "extract":
args_list = ["extract", args.session_id, "--name", args.name]
if args.description:
args_list.extend(["--description", args.description])
elif subcmd == "list":
args_list = ["list"]
elif subcmd == "test":
args_list = ["test", args.template_id, args.message]
elif subcmd == "delete":
args_list = ["delete", args.template_id]
return warm_session_cli(args_list)
except ImportError as e:
print(color(f"Error: Cannot import warm_session module: {e}", Colors.RED))
return 1
except Exception as e:
print(color(f"Error: {e}", Colors.RED))
return 1
def cmd_ab_test(args):
"""Handle A/B testing commands."""
from hermes_cli.colors import Colors, color
subcmd = getattr(args, 'ab_command', None)
if subcmd is None:
print(color("A/B Testing Framework for Warm vs Cold Sessions", Colors.CYAN))
print("\nCommands:")
print(" hermes ab-test create --task-id ID --description DESC --prompt PROMPT")
print(" hermes ab-test list")
print(" hermes ab-test show TEST_ID")
print(" hermes ab-test analyze TEST_ID")
print(" hermes ab-test add-result TEST_ID --session-type TYPE --session-id ID")
print(" hermes ab-test delete TEST_ID")
return 0
try:
from tools.session_ab_testing import ab_test_cli
args_list = []
if subcmd == "create":
args_list = ["create", "--task-id", args.task_id, "--description", args.description, "--prompt", args.prompt]
if args.category:
args_list.extend(["--category", args.category])
if args.difficulty:
args_list.extend(["--difficulty", args.difficulty])
elif subcmd == "list":
args_list = ["list"]
elif subcmd == "show":
args_list = ["show", args.test_id]
elif subcmd == "analyze":
args_list = ["analyze", args.test_id]
elif subcmd == "add-result":
args_list = ["add-result", args.test_id, "--session-type", args.session_type, "--session-id", args.session_id]
if args.tool_calls:
args_list.extend(["--tool-calls", str(args.tool_calls)])
if args.successful_calls:
args_list.extend(["--successful-calls", str(args.successful_calls)])
if args.completion_time:
args_list.extend(["--completion-time", str(args.completion_time)])
if args.success:
args_list.append("--success")
if args.notes:
args_list.extend(["--notes", args.notes])
elif subcmd == "delete":
args_list = ["delete", args.test_id]
return ab_test_cli(args_list)
except ImportError as e:
print(color(f"Error: Cannot import session_ab_testing module: {e}", Colors.RED))
return 1
except Exception as e:
print(color(f"Error: {e}", Colors.RED))
return 1

View File

@@ -1,517 +0,0 @@
"""
Warm Session A/B Testing Framework
Framework for comparing warm vs cold session performance.
Addresses research questions from issue #327.
Issue: #327
"""
import json
import logging
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from dataclasses import dataclass, asdict, field
from enum import Enum
import statistics
logger = logging.getLogger(__name__)
class SessionType(Enum):
"""Type of session for A/B testing."""
COLD = "cold" # Fresh session, no warm-up
WARM = "warm" # Session with warm-up context
@dataclass
class TestTask:
"""A task for A/B testing."""
task_id: str
description: str
prompt: str
expected_tools: List[str] = field(default_factory=list)
success_criteria: Dict[str, Any] = field(default_factory=dict)
category: str = "general"
difficulty: str = "medium" # easy, medium, hard
@dataclass
class SessionResult:
"""Result from a session test."""
session_id: str
session_type: SessionType
task_id: str
start_time: str
end_time: Optional[str] = None
message_count: int = 0
tool_calls: int = 0
successful_tool_calls: int = 0
errors: List[str] = field(default_factory=list)
completion_time_seconds: float = 0.0
user_corrections: int = 0
success: bool = False
notes: str = ""
@property
def error_rate(self) -> float:
"""Calculate error rate."""
if self.tool_calls == 0:
return 0.0
return (self.tool_calls - self.successful_tool_calls) / self.tool_calls
@property
def success_rate(self) -> float:
"""Calculate success rate."""
if self.tool_calls == 0:
return 0.0
return self.successful_tool_calls / self.tool_calls
def to_dict(self) -> Dict[str, Any]:
return {
"session_id": self.session_id,
"session_type": self.session_type.value,
"task_id": self.task_id,
"start_time": self.start_time,
"end_time": self.end_time,
"message_count": self.message_count,
"tool_calls": self.tool_calls,
"successful_tool_calls": self.successful_tool_calls,
"errors": self.errors,
"completion_time_seconds": self.completion_time_seconds,
"user_corrections": self.user_corrections,
"success": self.success,
"error_rate": self.error_rate,
"success_rate": self.success_rate,
"notes": self.notes
}
@dataclass
class ABTestResult:
"""Results from an A/B test."""
test_id: str
task: TestTask
cold_results: List[SessionResult] = field(default_factory=list)
warm_results: List[SessionResult] = field(default_factory=list)
created_at: str = field(default_factory=lambda: datetime.now().isoformat())
def add_result(self, result: SessionResult):
"""Add a session result."""
if result.session_type == SessionType.COLD:
self.cold_results.append(result)
else:
self.warm_results.append(result)
def get_summary(self) -> Dict[str, Any]:
"""Get summary statistics."""
def calc_stats(results: List[SessionResult]) -> Dict[str, Any]:
if not results:
return {"count": 0}
error_rates = [r.error_rate for r in results]
success_rates = [r.success_rate for r in results]
completion_times = [r.completion_time_seconds for r in results if r.completion_time_seconds > 0]
message_counts = [r.message_count for r in results]
return {
"count": len(results),
"avg_error_rate": statistics.mean(error_rates) if error_rates else 0,
"avg_success_rate": statistics.mean(success_rates) if success_rates else 0,
"avg_completion_time": statistics.mean(completion_times) if completion_times else 0,
"avg_messages": statistics.mean(message_counts) if message_counts else 0,
"success_count": sum(1 for r in results if r.success)
}
cold_stats = calc_stats(self.cold_results)
warm_stats = calc_stats(self.warm_results)
# Calculate improvement
improvement = {}
if cold_stats.get("count", 0) > 0 and warm_stats.get("count", 0) > 0:
cold_error = cold_stats.get("avg_error_rate", 0)
warm_error = warm_stats.get("avg_error_rate", 0)
if cold_error > 0:
improvement["error_rate"] = (cold_error - warm_error) / cold_error
cold_success = cold_stats.get("avg_success_rate", 0)
warm_success = warm_stats.get("avg_success_rate", 0)
if cold_success > 0:
improvement["success_rate"] = (warm_success - cold_success) / cold_success
return {
"task_id": self.task.task_id,
"cold": cold_stats,
"warm": warm_stats,
"improvement": improvement,
"recommendation": self._get_recommendation(cold_stats, warm_stats)
}
def _get_recommendation(self, cold_stats: Dict, warm_stats: Dict) -> str:
"""Generate recommendation based on results."""
if cold_stats.get("count", 0) < 3 or warm_stats.get("count", 0) < 3:
return "Insufficient data (need at least 3 tests each)"
cold_error = cold_stats.get("avg_error_rate", 0)
warm_error = warm_stats.get("avg_error_rate", 0)
if warm_error < cold_error * 0.8: # 20% improvement
return "WARM recommended: Significant error reduction"
elif warm_error > cold_error * 1.2: # 20% worse
return "COLD recommended: Warm sessions performed worse"
else:
return "No significant difference detected"
def to_dict(self) -> Dict[str, Any]:
return {
"test_id": self.test_id,
"task": asdict(self.task),
"cold_results": [r.to_dict() for r in self.cold_results],
"warm_results": [r.to_dict() for r in self.warm_results],
"created_at": self.created_at,
"summary": self.get_summary()
}
class ABTestManager:
"""Manage A/B tests."""
def __init__(self, test_dir: Path = None):
self.test_dir = test_dir or Path.home() / ".hermes" / "ab_tests"
self.test_dir.mkdir(parents=True, exist_ok=True)
def create_test(self, task: TestTask) -> ABTestResult:
"""Create a new A/B test."""
test_id = f"test_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{task.task_id}"
result = ABTestResult(
test_id=test_id,
task=task
)
self.save_test(result)
return result
def save_test(self, test: ABTestResult):
"""Save test results."""
path = self.test_dir / f"{test.test_id}.json"
with open(path, 'w') as f:
json.dump(test.to_dict(), f, indent=2)
def load_test(self, test_id: str) -> Optional[ABTestResult]:
"""Load test results."""
path = self.test_dir / f"{test_id}.json"
if not path.exists():
return None
try:
with open(path, 'r') as f:
data = json.load(f)
task = TestTask(**data["task"])
test = ABTestResult(
test_id=data["test_id"],
task=task,
created_at=data.get("created_at", "")
)
for r in data.get("cold_results", []):
r["session_type"] = SessionType(r["session_type"])
test.cold_results.append(SessionResult(**r))
for r in data.get("warm_results", []):
r["session_type"] = SessionType(r["session_type"])
test.warm_results.append(SessionResult(**r))
return test
except Exception as e:
logger.error(f"Failed to load test: {e}")
return None
def list_tests(self) -> List[Dict[str, Any]]:
"""List all tests."""
tests = []
for path in self.test_dir.glob("*.json"):
try:
with open(path, 'r') as f:
data = json.load(f)
tests.append({
"test_id": data.get("test_id"),
"task_id": data.get("task", {}).get("task_id"),
"description": data.get("task", {}).get("description", ""),
"cold_count": len(data.get("cold_results", [])),
"warm_count": len(data.get("warm_results", [])),
"created_at": data.get("created_at")
})
except:
pass
return tests
def delete_test(self, test_id: str) -> bool:
"""Delete a test."""
path = self.test_dir / f"{test_id}.json"
if path.exists():
path.unlink()
return True
return False
class ABTestRunner:
"""Run A/B tests."""
def __init__(self, manager: ABTestManager = None):
self.manager = manager or ABTestManager()
def run_comparison(
self,
task: TestTask,
cold_messages: List[Dict],
warm_messages: List[Dict],
session_db=None
) -> Tuple[SessionResult, SessionResult]:
"""
Run a comparison between cold and warm sessions.
Returns:
Tuple of (cold_result, warm_result)
"""
# This is a framework - actual execution would depend on
# integration with the agent system
cold_result = SessionResult(
session_id=f"cold_{task.task_id}_{int(time.time())}",
session_type=SessionType.COLD,
task_id=task.task_id,
start_time=datetime.now().isoformat()
)
warm_result = SessionResult(
session_id=f"warm_{task.task_id}_{int(time.time())}",
session_type=SessionType.WARM,
task_id=task.task_id,
start_time=datetime.now().isoformat()
)
# In a real implementation, this would:
# 1. Start a cold session with cold_messages
# 2. Execute the task and collect metrics
# 3. Start a warm session with warm_messages
# 4. Execute the same task and collect metrics
# 5. Return both results
return cold_result, warm_result
def analyze_results(self, test_id: str) -> Dict[str, Any]:
"""Analyze test results."""
test = self.manager.load_test(test_id)
if not test:
return {"error": "Test not found"}
summary = test.get_summary()
# Add statistical significance check
if (summary["cold"].get("count", 0) >= 3 and
summary["warm"].get("count", 0) >= 3):
# Simple t-test approximation
cold_errors = [r.error_rate for r in test.cold_results]
warm_errors = [r.error_rate for r in test.warm_results]
if len(cold_errors) >= 2 and len(warm_errors) >= 2:
cold_std = statistics.stdev(cold_errors) if len(cold_errors) > 1 else 0
warm_std = statistics.stdev(warm_errors) if len(warm_errors) > 1 else 0
summary["statistical_notes"] = {
"cold_std_dev": cold_std,
"warm_std_dev": warm_std,
"significance": "low" if max(cold_std, warm_std) > 0.2 else "medium"
}
return summary
# CLI Interface
def ab_test_cli(args: List[str]) -> int:
"""CLI interface for A/B testing."""
import argparse
parser = argparse.ArgumentParser(description="Warm session A/B testing")
subparsers = parser.add_subparsers(dest="command")
# Create test
create_parser = subparsers.add_parser("create", help="Create a new test")
create_parser.add_argument("--task-id", required=True, help="Task ID")
create_parser.add_argument("--description", required=True, help="Task description")
create_parser.add_argument("--prompt", required=True, help="Test prompt")
create_parser.add_argument("--category", default="general", help="Task category")
create_parser.add_argument("--difficulty", default="medium", choices=["easy", "medium", "hard"])
# List tests
subparsers.add_parser("list", help="List all tests")
# Show test results
show_parser = subparsers.add_parser("show", help="Show test results")
show_parser.add_argument("test_id", help="Test ID")
# Analyze test
analyze_parser = subparsers.add_parser("analyze", help="Analyze test results")
analyze_parser.add_argument("test_id", help="Test ID")
# Delete test
delete_parser = subparsers.add_parser("delete", help="Delete a test")
delete_parser.add_argument("test_id", help="Test ID")
# Add result
add_parser = subparsers.add_parser("add-result", help="Add a test result")
add_parser.add_argument("test_id", help="Test ID")
add_parser.add_argument("--session-type", required=True, choices=["cold", "warm"])
add_parser.add_argument("--session-id", required=True, help="Session ID")
add_parser.add_argument("--tool-calls", type=int, default=0)
add_parser.add_argument("--successful-calls", type=int, default=0)
add_parser.add_argument("--completion-time", type=float, default=0.0)
add_parser.add_argument("--success", action="store_true")
add_parser.add_argument("--notes", default="")
parsed = parser.parse_args(args)
if not parsed.command:
parser.print_help()
return 1
manager = ABTestManager()
runner = ABTestRunner(manager)
if parsed.command == "create":
task = TestTask(
task_id=parsed.task_id,
description=parsed.description,
prompt=parsed.prompt,
category=parsed.category,
difficulty=parsed.difficulty
)
test = manager.create_test(task)
print(f"Created test: {test.test_id}")
print(f"Task: {task.description}")
return 0
elif parsed.command == "list":
tests = manager.list_tests()
if not tests:
print("No tests found.")
return 0
print("\n=== A/B Tests ===\n")
for t in tests:
print(f"ID: {t['test_id']}")
print(f" Task: {t['description']}")
print(f" Cold tests: {t['cold_count']}, Warm tests: {t['warm_count']}")
print(f" Created: {t['created_at']}")
print()
return 0
elif parsed.command == "show":
test = manager.load_test(parsed.test_id)
if not test:
print(f"Test {parsed.test_id} not found")
return 1
print(f"\n=== Test: {test.test_id} ===\n")
print(f"Task: {test.task.description}")
print(f"Prompt: {test.task.prompt}")
print(f"Category: {test.task.category}, Difficulty: {test.task.difficulty}")
print(f"\nCold sessions: {len(test.cold_results)}")
for r in test.cold_results:
print(f" {r.session_id}: {r.success_rate:.0%} success, {r.error_rate:.0%} errors")
print(f"\nWarm sessions: {len(test.warm_results)}")
for r in test.warm_results:
print(f" {r.session_id}: {r.success_rate:.0%} success, {r.error_rate:.0%} errors")
return 0
elif parsed.command == "analyze":
analysis = runner.analyze_results(parsed.test_id)
if "error" in analysis:
print(f"Error: {analysis['error']}")
return 1
print(f"\n=== Analysis: {parsed.test_id} ===\n")
cold = analysis.get("cold", {})
warm = analysis.get("warm", {})
print("Cold Sessions:")
print(f" Count: {cold.get('count', 0)}")
print(f" Avg error rate: {cold.get('avg_error_rate', 0):.1%}")
print(f" Avg success rate: {cold.get('avg_success_rate', 0):.1%}")
print(f" Avg completion time: {cold.get('avg_completion_time', 0):.1f}s")
print("\nWarm Sessions:")
print(f" Count: {warm.get('count', 0)}")
print(f" Avg error rate: {warm.get('avg_error_rate', 0):.1%}")
print(f" Avg success rate: {warm.get('avg_success_rate', 0):.1%}")
print(f" Avg completion time: {warm.get('avg_completion_time', 0):.1f}s")
improvement = analysis.get("improvement", {})
if improvement:
print("\nImprovement:")
if "error_rate" in improvement:
print(f" Error rate: {improvement['error_rate']:+.1%}")
if "success_rate" in improvement:
print(f" Success rate: {improvement['success_rate']:+.1%}")
print(f"\nRecommendation: {analysis.get('recommendation', 'N/A')}")
return 0
elif parsed.command == "delete":
if manager.delete_test(parsed.test_id):
print(f"Deleted test: {parsed.test_id}")
return 0
else:
print(f"Test {parsed.test_id} not found")
return 1
elif parsed.command == "add-result":
test = manager.load_test(parsed.test_id)
if not test:
print(f"Test {parsed.test_id} not found")
return 1
result = SessionResult(
session_id=parsed.session_id,
session_type=SessionType(parsed.session_type),
task_id=test.task.task_id,
start_time=datetime.now().isoformat(),
end_time=datetime.now().isoformat(),
tool_calls=parsed.tool_calls,
successful_tool_calls=parsed.successful_calls,
completion_time_seconds=parsed.completion_time,
success=parsed.success,
notes=parsed.notes
)
test.add_result(result)
manager.save_test(test)
print(f"Added {parsed.session_type} result to test {parsed.test_id}")
print(f" Session: {parsed.session_id}")
print(f" Success rate: {result.success_rate:.0%}")
return 0
return 1
if __name__ == "__main__":
import sys
sys.exit(ab_test_cli(sys.argv[1:]))

View File

@@ -245,6 +245,269 @@ def _validate_file_path(file_path: str) -> Optional[str]:
return None
def _validate_skill(name: str) -> Dict[str, Any]:
"""
Validate an existing skill and provide actionable feedback.
Checks:
1. Skill exists
2. SKILL.md frontmatter (name, description, valid YAML)
3. Content structure (body after frontmatter)
4. Content size limits
5. Linked files (references/, templates/, scripts/) exist
6. Naming conventions
Returns dict with success, issues (list of {check, status, message, fix}),
and summary.
"""
issues = []
warnings = []
# Check 1: Does the skill exist?
skill_info = _find_skill(name)
if not skill_info:
# Try to find similar names for the suggestion
from agent.skill_utils import get_all_skills_dirs
all_names = []
for skills_dir in get_all_skills_dirs():
if skills_dir.exists():
for md in skills_dir.rglob("SKILL.md"):
all_names.append(md.parent.name)
suggestion = ""
if all_names:
import difflib
close = difflib.get_close_matches(name, all_names, n=3, cutoff=0.6)
if close:
suggestion = f" Did you mean: {', '.join(close)}?"
return {
"success": False,
"valid": False,
"issues": [{"check": "existence", "status": "FAIL",
"message": f"Skill '{name}' not found.{suggestion}",
"fix": f"Create it with: skill_manage(action='create', name='{name}', content='...')"}],
"summary": f"Skill '{name}' does not exist."
}
skill_dir = skill_info["path"]
skill_md = skill_dir / "SKILL.md"
# Check 2: SKILL.md exists
if not skill_md.exists():
issues.append({
"check": "SKILL.md exists",
"status": "FAIL",
"message": f"No SKILL.md found in {skill_dir}",
"fix": f"Create SKILL.md with: skill_manage(action='create', name='{name}', content='---\\nname: {name}\\ndescription: ...\\n---\\n# Instructions\\n...')"
})
return {"success": True, "valid": False, "issues": issues, "summary": f"Skill '{name}' is missing SKILL.md."}
# Read content
try:
content = skill_md.read_text(encoding="utf-8")
except Exception as e:
issues.append({
"check": "SKILL.md readable",
"status": "FAIL",
"message": f"Cannot read SKILL.md: {e}",
"fix": "Check file permissions: chmod 644 SKILL.md"
})
return {"success": True, "valid": False, "issues": issues, "summary": f"Cannot read SKILL.md."}
# Check 3: Content not empty
if not content.strip():
issues.append({
"check": "content non-empty",
"status": "FAIL",
"message": "SKILL.md is empty.",
"fix": f"Add content with: skill_manage(action='edit', name='{name}', content='---\\nname: {name}\\ndescription: ...\\n---\\n# Instructions\\n...')"
})
return {"success": True, "valid": False, "issues": issues, "summary": "SKILL.md is empty."}
# Check 4: Frontmatter starts with ---
if not content.startswith("---"):
issues.append({
"check": "frontmatter delimiter",
"status": "FAIL",
"message": "SKILL.md must start with YAML frontmatter (---).",
"fix": "Add '---' as the first line, then YAML metadata, then '---' to close.\n"
"Example:\n---\nname: my-skill\ndescription: What this skill does\n---\n# Instructions\n..."
})
else:
# Check 5: Frontmatter closes
end_match = re.search(r'\n---\s*\n', content[3:])
if not end_match:
issues.append({
"check": "frontmatter closing",
"status": "FAIL",
"message": "Frontmatter is not closed with '---'.",
"fix": "Add a line with just '---' after your YAML metadata to close the frontmatter block."
})
else:
# Check 6: Valid YAML
yaml_content = content[3:end_match.start() + 3]
try:
parsed = yaml.safe_load(yaml_content)
except yaml.YAMLError as e:
issues.append({
"check": "YAML valid",
"status": "FAIL",
"message": f"YAML parse error: {e}",
"fix": "Fix the YAML syntax in your frontmatter. Common issues:\n"
" - Missing quotes around values with special chars (:, {, }, [, ])\n"
" - Inconsistent indentation (use spaces, not tabs)\n"
" - Unescaped colons in descriptions"
})
parsed = None
if parsed and isinstance(parsed, dict):
# Check 7: name field
if "name" not in parsed:
issues.append({
"check": "frontmatter name",
"status": "FAIL",
"message": "Frontmatter missing 'name' field.",
"fix": f"Add 'name: {name}' to your frontmatter YAML."
})
elif parsed["name"] != name:
warnings.append({
"check": "frontmatter name match",
"status": "WARN",
"message": f"Frontmatter name '{parsed['name']}' doesn't match directory name '{name}'.",
"fix": "Change 'name: " + str(parsed.get("name", "")) + "' to 'name: " + name + "' in frontmatter, or rename the directory to match."
})
# Check 8: description field
if "description" not in parsed:
issues.append({
"check": "frontmatter description",
"status": "FAIL",
"message": "Frontmatter missing 'description' field.",
"fix": "Add 'description: A brief description of what this skill does' to frontmatter. "
f"Max {MAX_DESCRIPTION_LENGTH} characters."
})
elif len(str(parsed["description"])) > MAX_DESCRIPTION_LENGTH:
issues.append({
"check": "description length",
"status": "FAIL",
"message": f"Description is {len(str(parsed['description']))} chars (max {MAX_DESCRIPTION_LENGTH}).",
"fix": f"Shorten the description to under {MAX_DESCRIPTION_LENGTH} characters. "
"Put detailed instructions in the body, not the description."
})
elif parsed and not isinstance(parsed, dict):
issues.append({
"check": "frontmatter structure",
"status": "FAIL",
"message": "Frontmatter must be a YAML mapping (key: value pairs).",
"fix": "Ensure frontmatter contains key-value pairs like:\nname: my-skill\ndescription: What it does"
})
# Check 9: Body content after frontmatter
if end_match:
body = content[end_match.end() + 3:].strip()
if not body:
issues.append({
"check": "body content",
"status": "FAIL",
"message": "No content after frontmatter.",
"fix": "Add instructions, steps, or reference content after the closing '---'. "
"Skills need a body to be useful — at minimum a description of when to use the skill."
})
elif len(body) < 20:
warnings.append({
"check": "body content size",
"status": "WARN",
"message": f"Body content is very short ({len(body)} chars).",
"fix": "Add more detail: numbered steps, examples, pitfalls to avoid, "
"or reference files in references/ or templates/."
})
# Check 10: Content size
if len(content) > MAX_SKILL_CONTENT_CHARS:
issues.append({
"check": "content size",
"status": "FAIL",
"message": f"SKILL.md is {len(content):,} chars (max {MAX_SKILL_CONTENT_CHARS:,}).",
"fix": f"Split into a shorter SKILL.md (core instructions) with detailed content in:\n"
f" - references/detailed-guide.md\n"
f" - templates/example.yaml\n"
f" - scripts/validate.py\n"
f"Use skill_manage(action='write_file') to add linked files."
})
elif len(content) > MAX_SKILL_CONTENT_CHARS * 0.8:
warnings.append({
"check": "content size warning",
"status": "WARN",
"message": f"SKILL.md is {len(content):,} chars ({len(content) * 100 // MAX_SKILL_CONTENT_CHARS}% of limit).",
"fix": "Consider moving detailed content to references/ or templates/ files."
})
# Check 11: Linked files exist
for subdir in ["references", "templates", "scripts"]:
subdir_path = skill_dir / subdir
if subdir_path.exists():
for linked_file in subdir_path.rglob("*"):
if linked_file.is_file():
try:
linked_file.read_text(encoding="utf-8")
except Exception as e:
warnings.append({
"check": f"linked file {subdir}/{linked_file.name}",
"status": "WARN",
"message": f"Cannot read {linked_file.relative_to(skill_dir)}: {e}",
"fix": f"Check file exists and has read permissions."
})
# Check 12: Naming convention
if not VALID_NAME_RE.match(name):
warnings.append({
"check": "naming convention",
"status": "WARN",
"message": f"Skill name '{name}' doesn't follow convention (lowercase, hyphens, underscores).",
"fix": "Rename to use lowercase letters, numbers, hyphens, dots, and underscores only. "
"Must start with a letter or digit."
})
# Check 13: Orphaned files (files not in allowed subdirs)
if skill_dir.exists():
for item in skill_dir.iterdir():
if item.name == "SKILL.md":
continue
if item.name.startswith("."):
continue
if item.is_dir() and item.name in ALLOWED_SUBDIRS:
continue
warnings.append({
"check": "file organization",
"status": "WARN",
"message": f"'{item.name}' is in the skill root, not in an allowed subdirectory.",
"fix": f"Move to references/, templates/, or scripts/. Allowed subdirs: {', '.join(sorted(ALLOWED_SUBDIRS))}"
})
# Build summary
fail_count = sum(1 for i in issues if i["status"] == "FAIL")
warn_count = len(warnings)
valid = fail_count == 0
if valid and warn_count == 0:
summary = f"Skill '{name}' is valid. No issues found."
elif valid:
summary = f"Skill '{name}' is valid with {warn_count} warning(s)."
else:
summary = f"Skill '{name}' has {fail_count} issue(s) and {warn_count} warning(s)."
return {
"success": True,
"valid": valid,
"issues": issues,
"warnings": warnings,
"summary": summary,
"skill_path": str(skill_dir),
"skill_md_size": len(content),
}
def _atomic_write_text(file_path: Path, content: str, encoding: str = "utf-8") -> None:
"""
Atomically write text content to a file.
@@ -567,6 +830,257 @@ def _remove_file(name: str, file_path: str) -> Dict[str, Any]:
}
def _validate_skill(name: str) -> Dict[str, Any]:
"""Validate a skill and provide actionable feedback with specific remediation steps.
Returns detailed validation results with:
- Specific issues found
- Actionable suggestions for each issue
- Examples of correct formatting
- Overall pass/fail status
"""
existing = _find_skill(name)
if not existing:
return {
"success": False,
"error": f"Skill '{name}' not found.",
"suggestion": f"Use skill_manage(action='create', name='{name}', content='...') to create it.",
}
skill_dir = existing["path"]
skill_md = skill_dir / "SKILL.md"
issues = []
warnings = []
suggestions = []
# 1. Check SKILL.md exists
if not skill_md.exists():
issues.append({
"severity": "error",
"check": "SKILL.md exists",
"message": "SKILL.md file is missing.",
"remediation": f"Create SKILL.md in {skill_dir}/ with YAML frontmatter and instructions.",
"example": """---
name: my-skill
description: "What this skill does in one sentence."
---
## When to Use
- Trigger condition 1
- Trigger condition 2
## Steps
1. First step with exact command
2. Second step
## Pitfalls
- Common mistake and how to avoid it
""",
})
return {"success": False, "name": name, "path": str(skill_dir), "issues": issues, "warnings": warnings, "suggestions": suggestions}
# Read content
try:
content_text = skill_md.read_text(encoding="utf-8")
except Exception as e:
issues.append({
"severity": "error",
"check": "readable",
"message": f"Cannot read SKILL.md: {e}",
"remediation": "Check file permissions and encoding (should be UTF-8).",
})
return {"success": False, "name": name, "path": str(skill_dir), "issues": issues}
# 2. Check frontmatter
if not content_text.strip().startswith("---"):
issues.append({
"severity": "error",
"check": "frontmatter present",
"message": "SKILL.md does not start with YAML frontmatter delimiter (---).",
"remediation": "Add '---' as the very first line of SKILL.md.",
"example": "---\nname: my-skill\ndescription: "What it does."\n---",
})
else:
# Parse frontmatter
end_match = re.search(r'\n---\s*\n', content_text[3:])
if not end_match:
issues.append({
"severity": "error",
"check": "frontmatter closed",
"message": "YAML frontmatter is not closed with a second '---'.",
"remediation": "Add a line with just '---' after your frontmatter fields.",
})
else:
yaml_content = content_text[3:end_match.start() + 3]
try:
parsed = yaml.safe_load(yaml_content)
except yaml.YAMLError as e:
issues.append({
"severity": "error",
"check": "frontmatter valid YAML",
"message": f"YAML parse error: {e}",
"remediation": "Fix YAML syntax in the frontmatter block.",
"example": """---
name: my-skill
description: "A clear description."
version: "1.0.0"
---""",
})
parsed = None
if parsed and isinstance(parsed, dict):
# Check required fields
if "name" not in parsed:
issues.append({
"severity": "error",
"check": "name field",
"message": "Frontmatter missing required 'name' field.",
"remediation": f"Add: name: {name}",
})
elif parsed["name"] != name:
warnings.append({
"check": "name matches directory",
"message": f"Frontmatter name '{parsed['name']}' doesn't match directory name '{name}'.",
"suggestion": f"Consider changing to: name: {name}",
})
if "description" not in parsed:
issues.append({
"severity": "error",
"check": "description field",
"message": "Frontmatter missing required 'description' field.",
"remediation": "Add a one-sentence description of what this skill does.",
"example": 'description: "Deploy containerized services to production VPS."',
})
elif len(str(parsed.get("description", ""))) > MAX_DESCRIPTION_LENGTH:
issues.append({
"severity": "warning",
"check": "description length",
"message": f"Description is {len(str(parsed['description']))} chars (max {MAX_DESCRIPTION_LENGTH}).",
"remediation": "Shorten the description to one clear sentence.",
})
if "version" not in parsed:
suggestions.append({
"check": "version field",
"message": "No version field in frontmatter.",
"suggestion": "Add: version: "1.0.0" for tracking changes.",
})
elif parsed is not None:
issues.append({
"severity": "error",
"check": "frontmatter is mapping",
"message": "Frontmatter must be a YAML mapping (key: value pairs).",
"remediation": "Ensure frontmatter contains key: value pairs, not a list.",
})
# 3. Check body content
if end_match:
body = content_text[end_match.end() + 3:].strip()
if not body:
issues.append({
"severity": "error",
"check": "body content",
"message": "SKILL.md has no content after frontmatter.",
"remediation": "Add instructions, steps, or procedures after the frontmatter.",
"example": """## When to Use
- Condition that triggers this skill
## Steps
1. First step
2. Second step
## Pitfalls
- Known issues and solutions""",
})
else:
# Check for common sections
if "## " not in body:
warnings.append({
"check": "structured sections",
"message": "Body has no markdown headers (##).",
"suggestion": "Add sections like '## Steps', '## Pitfalls' for better structure.",
})
# Check body length
if len(body) < 50:
warnings.append({
"check": "body length",
"message": f"Body is very short ({len(body)} chars).",
"suggestion": "Skills should have enough detail to reproduce the procedure.",
})
# 4. Check content size
if len(content_text) > MAX_SKILL_CONTENT_CHARS:
issues.append({
"severity": "warning",
"check": "content size",
"message": f"SKILL.md is {len(content_text):,} chars (limit: {MAX_SKILL_CONTENT_CHARS:,}).",
"remediation": "Split large content into SKILL.md + supporting files in references/.",
})
# 5. Check supporting files
for subdir in ALLOWED_SUBDIRS:
subdir_path = skill_dir / subdir
if subdir_path.exists():
for f in subdir_path.rglob("*"):
if f.is_file():
size = f.stat().st_size
if size > MAX_SKILL_FILE_BYTES:
issues.append({
"severity": "warning",
"check": "file size",
"message": f"{f.relative_to(skill_dir)} is {size:,} bytes (limit: {MAX_SKILL_FILE_BYTES:,}).",
"remediation": "Split into smaller files or compress.",
})
# 6. Security scan
if _GUARD_AVAILABLE:
try:
scan_result = scan_skill(skill_dir, source="validation")
allowed, reason = should_allow_install(scan_result)
if allowed is False:
issues.append({
"severity": "error",
"check": "security scan",
"message": f"Security scan blocked: {reason}",
"remediation": "Review and fix security findings before using this skill.",
})
elif allowed is None:
warnings.append({
"check": "security scan",
"message": f"Security findings: {reason}",
"suggestion": "Review security findings. They may be intentional but worth checking.",
})
except Exception:
pass
# Build result
is_valid = not any(i["severity"] == "error" for i in issues)
# Add general suggestions if valid but improvable
if is_valid and not warnings and not suggestions:
suggestions.append({
"check": "overall",
"message": "Skill passes all checks.",
"suggestion": "Consider adding '## Pitfalls' section with known issues and solutions.",
})
return {
"success": True,
"name": name,
"path": str(skill_dir),
"valid": is_valid,
"issues": issues,
"warnings": warnings,
"suggestions": suggestions,
"summary": f"{len(issues)} issue(s), {len(warnings)} warning(s), {len(suggestions)} suggestion(s)",
}
# =============================================================================
# Main entry point
# =============================================================================
@@ -619,8 +1133,11 @@ def skill_manage(
return json.dumps({"success": False, "error": "file_path is required for 'remove_file'."}, ensure_ascii=False)
result = _remove_file(name, file_path)
elif action == "validate":
result = _validate_skill(name)
else:
result = {"success": False, "error": f"Unknown action '{action}'. Use: create, edit, patch, delete, write_file, remove_file"}
result = {"success": False, "error": f"Unknown action '{action}'. Use: create, edit, patch, delete, write_file, remove_file, validate"}
if result.get("success"):
try:
@@ -642,10 +1159,10 @@ SKILL_MANAGE_SCHEMA = {
"Manage skills (create, update, delete). Skills are your procedural "
"memory — reusable approaches for recurring task types. "
"New skills go to ~/.hermes/skills/; existing skills can be modified wherever they live.\n\n"
"Actions: create (full SKILL.md + optional category), "
"Actions: create (full SKILL.md + optional category), validate (check skill with actionable feedback), "
"patch (old_string/new_string — preferred for fixes), "
"edit (full SKILL.md rewrite — major overhauls only), "
"delete, write_file, remove_file.\n\n"
"delete, write_file, remove_file, validate (check skill with actionable feedback).\n\n"
"Create when: complex task succeeded (5+ calls), errors overcome, "
"user-corrected approach worked, non-trivial workflow discovered, "
"or user asks you to remember a procedure.\n"
@@ -662,7 +1179,7 @@ SKILL_MANAGE_SCHEMA = {
"properties": {
"action": {
"type": "string",
"enum": ["create", "patch", "edit", "delete", "write_file", "remove_file"],
"enum": ["create", "patch", "edit", "delete", "write_file", "remove_file", "validate"],
"description": "The action to perform."
},
"name": {