Compare commits
2 Commits
dawn/327-1
...
fix/626-va
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7664dbb9ef | ||
|
|
d9b891bef4 |
@@ -5258,55 +5258,6 @@ For more help on a command:
|
||||
|
||||
sessions_parser.set_defaults(func=cmd_sessions)
|
||||
|
||||
|
||||
# Unified warm session framework command
|
||||
unified_parser = subparsers.add_parser(
|
||||
"warm-session",
|
||||
help="Unified warm session framework",
|
||||
description="Comprehensive framework for warm session provisioning, quality analysis, and A/B testing"
|
||||
)
|
||||
unified_subparsers = unified_parser.add_subparsers(dest="unified_command")
|
||||
|
||||
# Extract template
|
||||
unified_extract = unified_subparsers.add_parser("extract", help="Extract template from session")
|
||||
unified_extract.add_argument("session_id", help="Session ID")
|
||||
unified_extract.add_argument("--name", "-n", required=True, help="Template name")
|
||||
unified_extract.add_argument("--description", "-d", default="", help="Description")
|
||||
|
||||
# List templates
|
||||
unified_subparsers.add_parser("list", help="List templates")
|
||||
|
||||
# Test warm session
|
||||
unified_test = unified_subparsers.add_parser("test", help="Test warm session")
|
||||
unified_test.add_argument("template_id", help="Template ID")
|
||||
unified_test.add_argument("message", help="Test message")
|
||||
|
||||
# Analyze session
|
||||
unified_analyze = unified_subparsers.add_parser("analyze", help="Analyze session quality")
|
||||
unified_analyze.add_argument("session_id", help="Session ID")
|
||||
|
||||
# Create A/B test
|
||||
unified_create_test = unified_subparsers.add_parser("create-test", help="Create A/B test")
|
||||
unified_create_test.add_argument("--task-id", required=True, help="Task ID")
|
||||
unified_create_test.add_argument("--description", required=True, help="Task description")
|
||||
unified_create_test.add_argument("--prompt", required=True, help="Test prompt")
|
||||
|
||||
# Add test result
|
||||
unified_add_result = unified_subparsers.add_parser("add-result", help="Add test result")
|
||||
unified_add_result.add_argument("test_id", help="Test ID")
|
||||
unified_add_result.add_argument("--session-type", required=True, choices=["cold", "warm"])
|
||||
unified_add_result.add_argument("--session-id", required=True, help="Session ID")
|
||||
unified_add_result.add_argument("--tool-calls", type=int, default=0)
|
||||
unified_add_result.add_argument("--successful-calls", type=int, default=0)
|
||||
unified_add_result.add_argument("--success", action="store_true")
|
||||
|
||||
# Analyze test
|
||||
unified_analyze_test = unified_subparsers.add_parser("analyze-test", help="Analyze A/B test")
|
||||
unified_analyze_test.add_argument("test_id", help="Test ID")
|
||||
|
||||
unified_parser.set_defaults(func=cmd_unified)
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# insights command
|
||||
# =========================================================================
|
||||
@@ -5647,59 +5598,3 @@ Examples:
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
|
||||
def cmd_unified(args):
|
||||
"""Handle unified warm session framework commands."""
|
||||
from hermes_cli.colors import Colors, color
|
||||
|
||||
subcmd = getattr(args, 'unified_command', None)
|
||||
|
||||
if subcmd is None:
|
||||
print(color("Unified Warm Session Framework", Colors.CYAN))
|
||||
print("\nCommands:")
|
||||
print(" hermes warm-session extract SESSION_ID --name NAME - Extract template")
|
||||
print(" hermes warm-session list - List templates")
|
||||
print(" hermes warm-session test TEMPLATE_ID MESSAGE - Test warm session")
|
||||
print(" hermes warm-session analyze SESSION_ID - Analyze session quality")
|
||||
print(" hermes warm-session create-test --task-id ID --description DESC --prompt PROMPT")
|
||||
print(" hermes warm-session add-result TEST_ID --session-type TYPE --session-id ID")
|
||||
print(" hermes warm-session analyze-test TEST_ID - Analyze A/B test")
|
||||
return 0
|
||||
|
||||
try:
|
||||
from tools.unified_warm_session import unified_cli
|
||||
|
||||
args_list = []
|
||||
if subcmd == "extract":
|
||||
args_list = ["extract", args.session_id, "--name", args.name]
|
||||
if args.description:
|
||||
args_list.extend(["--description", args.description])
|
||||
elif subcmd == "list":
|
||||
args_list = ["list"]
|
||||
elif subcmd == "test":
|
||||
args_list = ["test", args.template_id, args.message]
|
||||
elif subcmd == "analyze":
|
||||
args_list = ["analyze", args.session_id]
|
||||
elif subcmd == "create-test":
|
||||
args_list = ["create-test", "--task-id", args.task_id, "--description", args.description, "--prompt", args.prompt]
|
||||
elif subcmd == "add-result":
|
||||
args_list = ["add-result", args.test_id, "--session-type", args.session_type, "--session-id", args.session_id]
|
||||
if args.tool_calls:
|
||||
args_list.extend(["--tool-calls", str(args.tool_calls)])
|
||||
if args.successful_calls:
|
||||
args_list.extend(["--successful-calls", str(args.successful_calls)])
|
||||
if args.success:
|
||||
args_list.append("--success")
|
||||
elif subcmd == "analyze-test":
|
||||
args_list = ["analyze-test", args.test_id]
|
||||
|
||||
return unified_cli(args_list)
|
||||
|
||||
except ImportError as e:
|
||||
print(color(f"Error: Cannot import unified_warm_session module: {e}", Colors.RED))
|
||||
return 1
|
||||
except Exception as e:
|
||||
print(color(f"Error: {e}", Colors.RED))
|
||||
return 1
|
||||
|
||||
|
||||
@@ -245,6 +245,269 @@ def _validate_file_path(file_path: str) -> Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
def _validate_skill(name: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Validate an existing skill and provide actionable feedback.
|
||||
|
||||
Checks:
|
||||
1. Skill exists
|
||||
2. SKILL.md frontmatter (name, description, valid YAML)
|
||||
3. Content structure (body after frontmatter)
|
||||
4. Content size limits
|
||||
5. Linked files (references/, templates/, scripts/) exist
|
||||
6. Naming conventions
|
||||
|
||||
Returns dict with success, issues (list of {check, status, message, fix}),
|
||||
and summary.
|
||||
"""
|
||||
issues = []
|
||||
warnings = []
|
||||
|
||||
# Check 1: Does the skill exist?
|
||||
skill_info = _find_skill(name)
|
||||
if not skill_info:
|
||||
# Try to find similar names for the suggestion
|
||||
from agent.skill_utils import get_all_skills_dirs
|
||||
all_names = []
|
||||
for skills_dir in get_all_skills_dirs():
|
||||
if skills_dir.exists():
|
||||
for md in skills_dir.rglob("SKILL.md"):
|
||||
all_names.append(md.parent.name)
|
||||
suggestion = ""
|
||||
if all_names:
|
||||
import difflib
|
||||
close = difflib.get_close_matches(name, all_names, n=3, cutoff=0.6)
|
||||
if close:
|
||||
suggestion = f" Did you mean: {', '.join(close)}?"
|
||||
|
||||
return {
|
||||
"success": False,
|
||||
"valid": False,
|
||||
"issues": [{"check": "existence", "status": "FAIL",
|
||||
"message": f"Skill '{name}' not found.{suggestion}",
|
||||
"fix": f"Create it with: skill_manage(action='create', name='{name}', content='...')"}],
|
||||
"summary": f"Skill '{name}' does not exist."
|
||||
}
|
||||
|
||||
skill_dir = skill_info["path"]
|
||||
skill_md = skill_dir / "SKILL.md"
|
||||
|
||||
# Check 2: SKILL.md exists
|
||||
if not skill_md.exists():
|
||||
issues.append({
|
||||
"check": "SKILL.md exists",
|
||||
"status": "FAIL",
|
||||
"message": f"No SKILL.md found in {skill_dir}",
|
||||
"fix": f"Create SKILL.md with: skill_manage(action='create', name='{name}', content='---\\nname: {name}\\ndescription: ...\\n---\\n# Instructions\\n...')"
|
||||
})
|
||||
return {"success": True, "valid": False, "issues": issues, "summary": f"Skill '{name}' is missing SKILL.md."}
|
||||
|
||||
# Read content
|
||||
try:
|
||||
content = skill_md.read_text(encoding="utf-8")
|
||||
except Exception as e:
|
||||
issues.append({
|
||||
"check": "SKILL.md readable",
|
||||
"status": "FAIL",
|
||||
"message": f"Cannot read SKILL.md: {e}",
|
||||
"fix": "Check file permissions: chmod 644 SKILL.md"
|
||||
})
|
||||
return {"success": True, "valid": False, "issues": issues, "summary": f"Cannot read SKILL.md."}
|
||||
|
||||
# Check 3: Content not empty
|
||||
if not content.strip():
|
||||
issues.append({
|
||||
"check": "content non-empty",
|
||||
"status": "FAIL",
|
||||
"message": "SKILL.md is empty.",
|
||||
"fix": f"Add content with: skill_manage(action='edit', name='{name}', content='---\\nname: {name}\\ndescription: ...\\n---\\n# Instructions\\n...')"
|
||||
})
|
||||
return {"success": True, "valid": False, "issues": issues, "summary": "SKILL.md is empty."}
|
||||
|
||||
# Check 4: Frontmatter starts with ---
|
||||
if not content.startswith("---"):
|
||||
issues.append({
|
||||
"check": "frontmatter delimiter",
|
||||
"status": "FAIL",
|
||||
"message": "SKILL.md must start with YAML frontmatter (---).",
|
||||
"fix": "Add '---' as the first line, then YAML metadata, then '---' to close.\n"
|
||||
"Example:\n---\nname: my-skill\ndescription: What this skill does\n---\n# Instructions\n..."
|
||||
})
|
||||
else:
|
||||
# Check 5: Frontmatter closes
|
||||
end_match = re.search(r'\n---\s*\n', content[3:])
|
||||
if not end_match:
|
||||
issues.append({
|
||||
"check": "frontmatter closing",
|
||||
"status": "FAIL",
|
||||
"message": "Frontmatter is not closed with '---'.",
|
||||
"fix": "Add a line with just '---' after your YAML metadata to close the frontmatter block."
|
||||
})
|
||||
else:
|
||||
# Check 6: Valid YAML
|
||||
yaml_content = content[3:end_match.start() + 3]
|
||||
try:
|
||||
parsed = yaml.safe_load(yaml_content)
|
||||
except yaml.YAMLError as e:
|
||||
issues.append({
|
||||
"check": "YAML valid",
|
||||
"status": "FAIL",
|
||||
"message": f"YAML parse error: {e}",
|
||||
"fix": "Fix the YAML syntax in your frontmatter. Common issues:\n"
|
||||
" - Missing quotes around values with special chars (:, {, }, [, ])\n"
|
||||
" - Inconsistent indentation (use spaces, not tabs)\n"
|
||||
" - Unescaped colons in descriptions"
|
||||
})
|
||||
parsed = None
|
||||
|
||||
if parsed and isinstance(parsed, dict):
|
||||
# Check 7: name field
|
||||
if "name" not in parsed:
|
||||
issues.append({
|
||||
"check": "frontmatter name",
|
||||
"status": "FAIL",
|
||||
"message": "Frontmatter missing 'name' field.",
|
||||
"fix": f"Add 'name: {name}' to your frontmatter YAML."
|
||||
})
|
||||
elif parsed["name"] != name:
|
||||
warnings.append({
|
||||
"check": "frontmatter name match",
|
||||
"status": "WARN",
|
||||
"message": f"Frontmatter name '{parsed['name']}' doesn't match directory name '{name}'.",
|
||||
"fix": "Change 'name: " + str(parsed.get("name", "")) + "' to 'name: " + name + "' in frontmatter, or rename the directory to match."
|
||||
})
|
||||
|
||||
# Check 8: description field
|
||||
if "description" not in parsed:
|
||||
issues.append({
|
||||
"check": "frontmatter description",
|
||||
"status": "FAIL",
|
||||
"message": "Frontmatter missing 'description' field.",
|
||||
"fix": "Add 'description: A brief description of what this skill does' to frontmatter. "
|
||||
f"Max {MAX_DESCRIPTION_LENGTH} characters."
|
||||
})
|
||||
elif len(str(parsed["description"])) > MAX_DESCRIPTION_LENGTH:
|
||||
issues.append({
|
||||
"check": "description length",
|
||||
"status": "FAIL",
|
||||
"message": f"Description is {len(str(parsed['description']))} chars (max {MAX_DESCRIPTION_LENGTH}).",
|
||||
"fix": f"Shorten the description to under {MAX_DESCRIPTION_LENGTH} characters. "
|
||||
"Put detailed instructions in the body, not the description."
|
||||
})
|
||||
|
||||
elif parsed and not isinstance(parsed, dict):
|
||||
issues.append({
|
||||
"check": "frontmatter structure",
|
||||
"status": "FAIL",
|
||||
"message": "Frontmatter must be a YAML mapping (key: value pairs).",
|
||||
"fix": "Ensure frontmatter contains key-value pairs like:\nname: my-skill\ndescription: What it does"
|
||||
})
|
||||
|
||||
# Check 9: Body content after frontmatter
|
||||
if end_match:
|
||||
body = content[end_match.end() + 3:].strip()
|
||||
if not body:
|
||||
issues.append({
|
||||
"check": "body content",
|
||||
"status": "FAIL",
|
||||
"message": "No content after frontmatter.",
|
||||
"fix": "Add instructions, steps, or reference content after the closing '---'. "
|
||||
"Skills need a body to be useful — at minimum a description of when to use the skill."
|
||||
})
|
||||
elif len(body) < 20:
|
||||
warnings.append({
|
||||
"check": "body content size",
|
||||
"status": "WARN",
|
||||
"message": f"Body content is very short ({len(body)} chars).",
|
||||
"fix": "Add more detail: numbered steps, examples, pitfalls to avoid, "
|
||||
"or reference files in references/ or templates/."
|
||||
})
|
||||
|
||||
# Check 10: Content size
|
||||
if len(content) > MAX_SKILL_CONTENT_CHARS:
|
||||
issues.append({
|
||||
"check": "content size",
|
||||
"status": "FAIL",
|
||||
"message": f"SKILL.md is {len(content):,} chars (max {MAX_SKILL_CONTENT_CHARS:,}).",
|
||||
"fix": f"Split into a shorter SKILL.md (core instructions) with detailed content in:\n"
|
||||
f" - references/detailed-guide.md\n"
|
||||
f" - templates/example.yaml\n"
|
||||
f" - scripts/validate.py\n"
|
||||
f"Use skill_manage(action='write_file') to add linked files."
|
||||
})
|
||||
elif len(content) > MAX_SKILL_CONTENT_CHARS * 0.8:
|
||||
warnings.append({
|
||||
"check": "content size warning",
|
||||
"status": "WARN",
|
||||
"message": f"SKILL.md is {len(content):,} chars ({len(content) * 100 // MAX_SKILL_CONTENT_CHARS}% of limit).",
|
||||
"fix": "Consider moving detailed content to references/ or templates/ files."
|
||||
})
|
||||
|
||||
# Check 11: Linked files exist
|
||||
for subdir in ["references", "templates", "scripts"]:
|
||||
subdir_path = skill_dir / subdir
|
||||
if subdir_path.exists():
|
||||
for linked_file in subdir_path.rglob("*"):
|
||||
if linked_file.is_file():
|
||||
try:
|
||||
linked_file.read_text(encoding="utf-8")
|
||||
except Exception as e:
|
||||
warnings.append({
|
||||
"check": f"linked file {subdir}/{linked_file.name}",
|
||||
"status": "WARN",
|
||||
"message": f"Cannot read {linked_file.relative_to(skill_dir)}: {e}",
|
||||
"fix": f"Check file exists and has read permissions."
|
||||
})
|
||||
|
||||
# Check 12: Naming convention
|
||||
if not VALID_NAME_RE.match(name):
|
||||
warnings.append({
|
||||
"check": "naming convention",
|
||||
"status": "WARN",
|
||||
"message": f"Skill name '{name}' doesn't follow convention (lowercase, hyphens, underscores).",
|
||||
"fix": "Rename to use lowercase letters, numbers, hyphens, dots, and underscores only. "
|
||||
"Must start with a letter or digit."
|
||||
})
|
||||
|
||||
# Check 13: Orphaned files (files not in allowed subdirs)
|
||||
if skill_dir.exists():
|
||||
for item in skill_dir.iterdir():
|
||||
if item.name == "SKILL.md":
|
||||
continue
|
||||
if item.name.startswith("."):
|
||||
continue
|
||||
if item.is_dir() and item.name in ALLOWED_SUBDIRS:
|
||||
continue
|
||||
warnings.append({
|
||||
"check": "file organization",
|
||||
"status": "WARN",
|
||||
"message": f"'{item.name}' is in the skill root, not in an allowed subdirectory.",
|
||||
"fix": f"Move to references/, templates/, or scripts/. Allowed subdirs: {', '.join(sorted(ALLOWED_SUBDIRS))}"
|
||||
})
|
||||
|
||||
# Build summary
|
||||
fail_count = sum(1 for i in issues if i["status"] == "FAIL")
|
||||
warn_count = len(warnings)
|
||||
valid = fail_count == 0
|
||||
|
||||
if valid and warn_count == 0:
|
||||
summary = f"Skill '{name}' is valid. No issues found."
|
||||
elif valid:
|
||||
summary = f"Skill '{name}' is valid with {warn_count} warning(s)."
|
||||
else:
|
||||
summary = f"Skill '{name}' has {fail_count} issue(s) and {warn_count} warning(s)."
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"valid": valid,
|
||||
"issues": issues,
|
||||
"warnings": warnings,
|
||||
"summary": summary,
|
||||
"skill_path": str(skill_dir),
|
||||
"skill_md_size": len(content),
|
||||
}
|
||||
|
||||
|
||||
def _atomic_write_text(file_path: Path, content: str, encoding: str = "utf-8") -> None:
|
||||
"""
|
||||
Atomically write text content to a file.
|
||||
@@ -567,6 +830,257 @@ def _remove_file(name: str, file_path: str) -> Dict[str, Any]:
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
def _validate_skill(name: str) -> Dict[str, Any]:
|
||||
"""Validate a skill and provide actionable feedback with specific remediation steps.
|
||||
|
||||
Returns detailed validation results with:
|
||||
- Specific issues found
|
||||
- Actionable suggestions for each issue
|
||||
- Examples of correct formatting
|
||||
- Overall pass/fail status
|
||||
"""
|
||||
existing = _find_skill(name)
|
||||
if not existing:
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Skill '{name}' not found.",
|
||||
"suggestion": f"Use skill_manage(action='create', name='{name}', content='...') to create it.",
|
||||
}
|
||||
|
||||
skill_dir = existing["path"]
|
||||
skill_md = skill_dir / "SKILL.md"
|
||||
|
||||
issues = []
|
||||
warnings = []
|
||||
suggestions = []
|
||||
|
||||
# 1. Check SKILL.md exists
|
||||
if not skill_md.exists():
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"check": "SKILL.md exists",
|
||||
"message": "SKILL.md file is missing.",
|
||||
"remediation": f"Create SKILL.md in {skill_dir}/ with YAML frontmatter and instructions.",
|
||||
"example": """---
|
||||
name: my-skill
|
||||
description: "What this skill does in one sentence."
|
||||
---
|
||||
|
||||
## When to Use
|
||||
- Trigger condition 1
|
||||
- Trigger condition 2
|
||||
|
||||
## Steps
|
||||
1. First step with exact command
|
||||
2. Second step
|
||||
|
||||
## Pitfalls
|
||||
- Common mistake and how to avoid it
|
||||
""",
|
||||
})
|
||||
return {"success": False, "name": name, "path": str(skill_dir), "issues": issues, "warnings": warnings, "suggestions": suggestions}
|
||||
|
||||
# Read content
|
||||
try:
|
||||
content_text = skill_md.read_text(encoding="utf-8")
|
||||
except Exception as e:
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"check": "readable",
|
||||
"message": f"Cannot read SKILL.md: {e}",
|
||||
"remediation": "Check file permissions and encoding (should be UTF-8).",
|
||||
})
|
||||
return {"success": False, "name": name, "path": str(skill_dir), "issues": issues}
|
||||
|
||||
# 2. Check frontmatter
|
||||
if not content_text.strip().startswith("---"):
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"check": "frontmatter present",
|
||||
"message": "SKILL.md does not start with YAML frontmatter delimiter (---).",
|
||||
"remediation": "Add '---' as the very first line of SKILL.md.",
|
||||
"example": "---\nname: my-skill\ndescription: "What it does."\n---",
|
||||
})
|
||||
else:
|
||||
# Parse frontmatter
|
||||
end_match = re.search(r'\n---\s*\n', content_text[3:])
|
||||
if not end_match:
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"check": "frontmatter closed",
|
||||
"message": "YAML frontmatter is not closed with a second '---'.",
|
||||
"remediation": "Add a line with just '---' after your frontmatter fields.",
|
||||
})
|
||||
else:
|
||||
yaml_content = content_text[3:end_match.start() + 3]
|
||||
try:
|
||||
parsed = yaml.safe_load(yaml_content)
|
||||
except yaml.YAMLError as e:
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"check": "frontmatter valid YAML",
|
||||
"message": f"YAML parse error: {e}",
|
||||
"remediation": "Fix YAML syntax in the frontmatter block.",
|
||||
"example": """---
|
||||
name: my-skill
|
||||
description: "A clear description."
|
||||
version: "1.0.0"
|
||||
---""",
|
||||
})
|
||||
parsed = None
|
||||
|
||||
if parsed and isinstance(parsed, dict):
|
||||
# Check required fields
|
||||
if "name" not in parsed:
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"check": "name field",
|
||||
"message": "Frontmatter missing required 'name' field.",
|
||||
"remediation": f"Add: name: {name}",
|
||||
})
|
||||
elif parsed["name"] != name:
|
||||
warnings.append({
|
||||
"check": "name matches directory",
|
||||
"message": f"Frontmatter name '{parsed['name']}' doesn't match directory name '{name}'.",
|
||||
"suggestion": f"Consider changing to: name: {name}",
|
||||
})
|
||||
|
||||
if "description" not in parsed:
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"check": "description field",
|
||||
"message": "Frontmatter missing required 'description' field.",
|
||||
"remediation": "Add a one-sentence description of what this skill does.",
|
||||
"example": 'description: "Deploy containerized services to production VPS."',
|
||||
})
|
||||
elif len(str(parsed.get("description", ""))) > MAX_DESCRIPTION_LENGTH:
|
||||
issues.append({
|
||||
"severity": "warning",
|
||||
"check": "description length",
|
||||
"message": f"Description is {len(str(parsed['description']))} chars (max {MAX_DESCRIPTION_LENGTH}).",
|
||||
"remediation": "Shorten the description to one clear sentence.",
|
||||
})
|
||||
|
||||
if "version" not in parsed:
|
||||
suggestions.append({
|
||||
"check": "version field",
|
||||
"message": "No version field in frontmatter.",
|
||||
"suggestion": "Add: version: "1.0.0" for tracking changes.",
|
||||
})
|
||||
elif parsed is not None:
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"check": "frontmatter is mapping",
|
||||
"message": "Frontmatter must be a YAML mapping (key: value pairs).",
|
||||
"remediation": "Ensure frontmatter contains key: value pairs, not a list.",
|
||||
})
|
||||
|
||||
# 3. Check body content
|
||||
if end_match:
|
||||
body = content_text[end_match.end() + 3:].strip()
|
||||
if not body:
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"check": "body content",
|
||||
"message": "SKILL.md has no content after frontmatter.",
|
||||
"remediation": "Add instructions, steps, or procedures after the frontmatter.",
|
||||
"example": """## When to Use
|
||||
- Condition that triggers this skill
|
||||
|
||||
## Steps
|
||||
1. First step
|
||||
2. Second step
|
||||
|
||||
## Pitfalls
|
||||
- Known issues and solutions""",
|
||||
})
|
||||
else:
|
||||
# Check for common sections
|
||||
if "## " not in body:
|
||||
warnings.append({
|
||||
"check": "structured sections",
|
||||
"message": "Body has no markdown headers (##).",
|
||||
"suggestion": "Add sections like '## Steps', '## Pitfalls' for better structure.",
|
||||
})
|
||||
|
||||
# Check body length
|
||||
if len(body) < 50:
|
||||
warnings.append({
|
||||
"check": "body length",
|
||||
"message": f"Body is very short ({len(body)} chars).",
|
||||
"suggestion": "Skills should have enough detail to reproduce the procedure.",
|
||||
})
|
||||
|
||||
# 4. Check content size
|
||||
if len(content_text) > MAX_SKILL_CONTENT_CHARS:
|
||||
issues.append({
|
||||
"severity": "warning",
|
||||
"check": "content size",
|
||||
"message": f"SKILL.md is {len(content_text):,} chars (limit: {MAX_SKILL_CONTENT_CHARS:,}).",
|
||||
"remediation": "Split large content into SKILL.md + supporting files in references/.",
|
||||
})
|
||||
|
||||
# 5. Check supporting files
|
||||
for subdir in ALLOWED_SUBDIRS:
|
||||
subdir_path = skill_dir / subdir
|
||||
if subdir_path.exists():
|
||||
for f in subdir_path.rglob("*"):
|
||||
if f.is_file():
|
||||
size = f.stat().st_size
|
||||
if size > MAX_SKILL_FILE_BYTES:
|
||||
issues.append({
|
||||
"severity": "warning",
|
||||
"check": "file size",
|
||||
"message": f"{f.relative_to(skill_dir)} is {size:,} bytes (limit: {MAX_SKILL_FILE_BYTES:,}).",
|
||||
"remediation": "Split into smaller files or compress.",
|
||||
})
|
||||
|
||||
# 6. Security scan
|
||||
if _GUARD_AVAILABLE:
|
||||
try:
|
||||
scan_result = scan_skill(skill_dir, source="validation")
|
||||
allowed, reason = should_allow_install(scan_result)
|
||||
if allowed is False:
|
||||
issues.append({
|
||||
"severity": "error",
|
||||
"check": "security scan",
|
||||
"message": f"Security scan blocked: {reason}",
|
||||
"remediation": "Review and fix security findings before using this skill.",
|
||||
})
|
||||
elif allowed is None:
|
||||
warnings.append({
|
||||
"check": "security scan",
|
||||
"message": f"Security findings: {reason}",
|
||||
"suggestion": "Review security findings. They may be intentional but worth checking.",
|
||||
})
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Build result
|
||||
is_valid = not any(i["severity"] == "error" for i in issues)
|
||||
|
||||
# Add general suggestions if valid but improvable
|
||||
if is_valid and not warnings and not suggestions:
|
||||
suggestions.append({
|
||||
"check": "overall",
|
||||
"message": "Skill passes all checks.",
|
||||
"suggestion": "Consider adding '## Pitfalls' section with known issues and solutions.",
|
||||
})
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"name": name,
|
||||
"path": str(skill_dir),
|
||||
"valid": is_valid,
|
||||
"issues": issues,
|
||||
"warnings": warnings,
|
||||
"suggestions": suggestions,
|
||||
"summary": f"{len(issues)} issue(s), {len(warnings)} warning(s), {len(suggestions)} suggestion(s)",
|
||||
}
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Main entry point
|
||||
# =============================================================================
|
||||
@@ -619,8 +1133,11 @@ def skill_manage(
|
||||
return json.dumps({"success": False, "error": "file_path is required for 'remove_file'."}, ensure_ascii=False)
|
||||
result = _remove_file(name, file_path)
|
||||
|
||||
elif action == "validate":
|
||||
result = _validate_skill(name)
|
||||
|
||||
else:
|
||||
result = {"success": False, "error": f"Unknown action '{action}'. Use: create, edit, patch, delete, write_file, remove_file"}
|
||||
result = {"success": False, "error": f"Unknown action '{action}'. Use: create, edit, patch, delete, write_file, remove_file, validate"}
|
||||
|
||||
if result.get("success"):
|
||||
try:
|
||||
@@ -642,10 +1159,10 @@ SKILL_MANAGE_SCHEMA = {
|
||||
"Manage skills (create, update, delete). Skills are your procedural "
|
||||
"memory — reusable approaches for recurring task types. "
|
||||
"New skills go to ~/.hermes/skills/; existing skills can be modified wherever they live.\n\n"
|
||||
"Actions: create (full SKILL.md + optional category), "
|
||||
"Actions: create (full SKILL.md + optional category), validate (check skill with actionable feedback), "
|
||||
"patch (old_string/new_string — preferred for fixes), "
|
||||
"edit (full SKILL.md rewrite — major overhauls only), "
|
||||
"delete, write_file, remove_file.\n\n"
|
||||
"delete, write_file, remove_file, validate (check skill with actionable feedback).\n\n"
|
||||
"Create when: complex task succeeded (5+ calls), errors overcome, "
|
||||
"user-corrected approach worked, non-trivial workflow discovered, "
|
||||
"or user asks you to remember a procedure.\n"
|
||||
@@ -662,7 +1179,7 @@ SKILL_MANAGE_SCHEMA = {
|
||||
"properties": {
|
||||
"action": {
|
||||
"type": "string",
|
||||
"enum": ["create", "patch", "edit", "delete", "write_file", "remove_file"],
|
||||
"enum": ["create", "patch", "edit", "delete", "write_file", "remove_file", "validate"],
|
||||
"description": "The action to perform."
|
||||
},
|
||||
"name": {
|
||||
|
||||
@@ -1,808 +0,0 @@
|
||||
"""
|
||||
Unified Warm Session Framework
|
||||
|
||||
Comprehensive framework for warm session provisioning, quality analysis,
|
||||
and A/B testing. Combines all components from issue #327 research.
|
||||
|
||||
Issue: #327
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
from dataclasses import dataclass, asdict, field
|
||||
from enum import Enum
|
||||
import statistics
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Core Data Structures
|
||||
# ============================================================================
|
||||
|
||||
class SessionType(Enum):
|
||||
"""Type of session."""
|
||||
COLD = "cold" # Fresh session, no warm-up
|
||||
WARM = "warm" # Session with warm-up context
|
||||
|
||||
|
||||
@dataclass
|
||||
class SessionSeed:
|
||||
"""Seed data for warming up a new session."""
|
||||
system_context: str = ""
|
||||
tool_examples: List[Dict[str, Any]] = field(default_factory=list)
|
||||
user_patterns: Dict[str, Any] = field(default_factory=dict)
|
||||
context_markers: List[str] = field(default_factory=list)
|
||||
version: str = "1.0"
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return asdict(self)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> 'SessionSeed':
|
||||
return cls(**data)
|
||||
|
||||
|
||||
@dataclass
|
||||
class WarmTemplate:
|
||||
"""Template for creating warm sessions."""
|
||||
template_id: str
|
||||
name: str
|
||||
description: str
|
||||
seed: SessionSeed
|
||||
created_at: str
|
||||
source_session_id: Optional[str] = None
|
||||
usage_count: int = 0
|
||||
success_rate: float = 0.0
|
||||
version: str = "1.0"
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"template_id": self.template_id,
|
||||
"name": self.name,
|
||||
"description": self.description,
|
||||
"seed": self.seed.to_dict(),
|
||||
"created_at": self.created_at,
|
||||
"source_session_id": self.source_session_id,
|
||||
"usage_count": self.usage_count,
|
||||
"success_rate": self.success_rate,
|
||||
"version": self.version
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> 'WarmTemplate':
|
||||
seed = SessionSeed.from_dict(data.get("seed", {}))
|
||||
return cls(
|
||||
template_id=data["template_id"],
|
||||
name=data["name"],
|
||||
description=data["description"],
|
||||
seed=seed,
|
||||
created_at=data.get("created_at", datetime.now().isoformat()),
|
||||
source_session_id=data.get("source_session_id"),
|
||||
usage_count=data.get("usage_count", 0),
|
||||
success_rate=data.get("success_rate", 0.0),
|
||||
version=data.get("version", "1.0")
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class QualityMetrics:
|
||||
"""Quality metrics for a session."""
|
||||
session_id: str
|
||||
session_type: SessionType
|
||||
message_count: int = 0
|
||||
tool_calls: int = 0
|
||||
successful_tool_calls: int = 0
|
||||
error_count: int = 0
|
||||
user_corrections: int = 0
|
||||
completion_time_seconds: float = 0.0
|
||||
token_usage: int = 0
|
||||
|
||||
@property
|
||||
def error_rate(self) -> float:
|
||||
if self.tool_calls == 0:
|
||||
return 0.0
|
||||
return self.error_count / self.tool_calls
|
||||
|
||||
@property
|
||||
def success_rate(self) -> float:
|
||||
if self.tool_calls == 0:
|
||||
return 0.0
|
||||
return self.successful_tool_calls / self.tool_calls
|
||||
|
||||
@property
|
||||
def correction_rate(self) -> float:
|
||||
if self.message_count == 0:
|
||||
return 0.0
|
||||
return self.user_corrections / self.message_count
|
||||
|
||||
@property
|
||||
def efficiency_score(self) -> float:
|
||||
if self.message_count == 0:
|
||||
return 0.0
|
||||
|
||||
# Weighted score
|
||||
success_score = self.success_rate * 0.4
|
||||
error_score = (1 - self.error_rate) * 0.3
|
||||
correction_score = (1 - min(1.0, self.correction_rate * 5)) * 0.2
|
||||
msg_score = 0.1 if self.message_count <= 50 else 0.05
|
||||
|
||||
return success_score + error_score + correction_score + msg_score
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"session_id": self.session_id,
|
||||
"session_type": self.session_type.value,
|
||||
"message_count": self.message_count,
|
||||
"tool_calls": self.tool_calls,
|
||||
"successful_tool_calls": self.successful_tool_calls,
|
||||
"error_count": self.error_count,
|
||||
"user_corrections": self.user_corrections,
|
||||
"completion_time_seconds": self.completion_time_seconds,
|
||||
"token_usage": self.token_usage,
|
||||
"error_rate": self.error_rate,
|
||||
"success_rate": self.success_rate,
|
||||
"correction_rate": self.correction_rate,
|
||||
"efficiency_score": self.efficiency_score
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class TestTask:
|
||||
"""A task for A/B testing."""
|
||||
task_id: str
|
||||
description: str
|
||||
prompt: str
|
||||
category: str = "general"
|
||||
difficulty: str = "medium"
|
||||
expected_tools: List[str] = field(default_factory=list)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return asdict(self)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TestResult:
|
||||
"""Result from a test session."""
|
||||
test_id: str
|
||||
task_id: str
|
||||
session_id: str
|
||||
session_type: SessionType
|
||||
metrics: QualityMetrics
|
||||
success: bool = False
|
||||
notes: str = ""
|
||||
created_at: str = field(default_factory=lambda: datetime.now().isoformat())
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"test_id": self.test_id,
|
||||
"task_id": self.task_id,
|
||||
"session_id": self.session_id,
|
||||
"session_type": self.session_type.value,
|
||||
"metrics": self.metrics.to_dict(),
|
||||
"success": self.success,
|
||||
"notes": self.notes,
|
||||
"created_at": self.created_at
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Session Extraction
|
||||
# ============================================================================
|
||||
|
||||
class SessionExtractor:
|
||||
"""Extract seed data from existing sessions."""
|
||||
|
||||
def __init__(self, session_db=None):
|
||||
self.session_db = session_db
|
||||
|
||||
def extract_seed(self, session_id: str) -> Optional[SessionSeed]:
|
||||
"""Extract seed data from a session."""
|
||||
if not self.session_db:
|
||||
return None
|
||||
|
||||
try:
|
||||
messages = self.session_db.get_messages(session_id)
|
||||
if not messages:
|
||||
return None
|
||||
|
||||
system_context = self._extract_system_context(messages)
|
||||
tool_examples = self._extract_tool_examples(messages)
|
||||
user_patterns = self._extract_user_patterns(messages)
|
||||
context_markers = self._extract_context_markers(messages)
|
||||
|
||||
return SessionSeed(
|
||||
system_context=system_context,
|
||||
tool_examples=tool_examples,
|
||||
user_patterns=user_patterns,
|
||||
context_markers=context_markers,
|
||||
version="1.0"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to extract seed: {e}")
|
||||
return None
|
||||
|
||||
def _extract_system_context(self, messages: List[Dict]) -> str:
|
||||
context_parts = []
|
||||
for msg in messages:
|
||||
if msg.get("role") == "system":
|
||||
content = msg.get("content", "")
|
||||
if content:
|
||||
context_parts.append(content[:500])
|
||||
break
|
||||
return "\n".join(context_parts)[:1000]
|
||||
|
||||
def _extract_tool_examples(self, messages: List[Dict]) -> List[Dict]:
|
||||
examples = []
|
||||
for i, msg in enumerate(messages):
|
||||
if msg.get("role") == "assistant" and msg.get("tool_calls"):
|
||||
for j in range(i + 1, min(i + 3, len(messages))):
|
||||
if messages[j].get("role") == "tool":
|
||||
content = messages[j].get("content", "")
|
||||
if content and "error" not in content.lower()[:100]:
|
||||
for tool_call in msg["tool_calls"]:
|
||||
func = tool_call.get("function", {})
|
||||
examples.append({
|
||||
"tool": func.get("name"),
|
||||
"arguments": func.get("arguments", "{}"),
|
||||
"result_preview": content[:200]
|
||||
})
|
||||
if len(examples) >= 5:
|
||||
break
|
||||
break
|
||||
if len(examples) >= 5:
|
||||
break
|
||||
return examples
|
||||
|
||||
def _extract_user_patterns(self, messages: List[Dict]) -> Dict:
|
||||
user_messages = [m for m in messages if m.get("role") == "user"]
|
||||
if not user_messages:
|
||||
return {}
|
||||
|
||||
lengths = [len(m.get("content", "")) for m in user_messages]
|
||||
questions = sum(1 for m in user_messages if "?" in m.get("content", ""))
|
||||
|
||||
return {
|
||||
"message_count": len(user_messages),
|
||||
"avg_length": sum(lengths) / len(lengths),
|
||||
"question_ratio": questions / len(user_messages),
|
||||
"preferred_style": "conversational" if questions > len(user_messages) * 0.3 else "direct"
|
||||
}
|
||||
|
||||
def _extract_context_markers(self, messages: List[Dict]) -> List[str]:
|
||||
markers = set()
|
||||
for msg in messages:
|
||||
content = msg.get("content", "")
|
||||
import re
|
||||
paths = re.findall(r'[\w/\.]+\.[\w]+', content)
|
||||
markers.update(p for p in paths if len(p) < 50)
|
||||
if len(markers) > 20:
|
||||
break
|
||||
return list(markers)[:20]
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Quality Analysis
|
||||
# ============================================================================
|
||||
|
||||
class QualityAnalyzer:
|
||||
"""Analyze session quality."""
|
||||
|
||||
def __init__(self, session_db=None):
|
||||
self.session_db = session_db
|
||||
|
||||
def analyze_session(self, session_id: str, session_type: SessionType = SessionType.COLD) -> Optional[QualityMetrics]:
|
||||
"""Analyze a session."""
|
||||
if not self.session_db:
|
||||
return None
|
||||
|
||||
try:
|
||||
messages = self.session_db.get_messages(session_id)
|
||||
if not messages:
|
||||
return None
|
||||
|
||||
tool_calls = 0
|
||||
successful_tool_calls = 0
|
||||
error_count = 0
|
||||
user_corrections = 0
|
||||
|
||||
for i, msg in enumerate(messages):
|
||||
if msg.get("role") == "assistant" and msg.get("tool_calls"):
|
||||
tool_calls += len(msg["tool_calls"])
|
||||
|
||||
if msg.get("role") == "tool":
|
||||
content = msg.get("content", "").lower()
|
||||
if "error" in content or "failed" in content:
|
||||
error_count += 1
|
||||
else:
|
||||
successful_tool_calls += 1
|
||||
|
||||
if (msg.get("role") == "user" and i > 0 and
|
||||
messages[i-1].get("role") == "tool" and
|
||||
("error" in messages[i-1].get("content", "").lower() or
|
||||
"failed" in messages[i-1].get("content", "").lower())):
|
||||
user_corrections += 1
|
||||
|
||||
return QualityMetrics(
|
||||
session_id=session_id,
|
||||
session_type=session_type,
|
||||
message_count=len(messages),
|
||||
tool_calls=tool_calls,
|
||||
successful_tool_calls=successful_tool_calls,
|
||||
error_count=error_count,
|
||||
user_corrections=user_corrections
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to analyze session: {e}")
|
||||
return None
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# A/B Testing
|
||||
# ============================================================================
|
||||
|
||||
class ABTestManager:
|
||||
"""Manage A/B tests."""
|
||||
|
||||
def __init__(self, test_dir: Path = None):
|
||||
self.test_dir = test_dir or Path.home() / ".hermes" / "ab_tests"
|
||||
self.test_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def create_test(self, task: TestTask) -> str:
|
||||
"""Create a new test."""
|
||||
test_id = f"test_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{task.task_id}"
|
||||
test_path = self.test_dir / f"{test_id}.json"
|
||||
|
||||
with open(test_path, 'w') as f:
|
||||
json.dump({
|
||||
"test_id": test_id,
|
||||
"task": task.to_dict(),
|
||||
"results": [],
|
||||
"created_at": datetime.now().isoformat()
|
||||
}, f, indent=2)
|
||||
|
||||
return test_id
|
||||
|
||||
def add_result(self, test_id: str, result: TestResult):
|
||||
"""Add a test result."""
|
||||
test_path = self.test_dir / f"{test_id}.json"
|
||||
if not test_path.exists():
|
||||
logger.error(f"Test {test_id} not found")
|
||||
return
|
||||
|
||||
try:
|
||||
with open(test_path, 'r') as f:
|
||||
data = json.load(f)
|
||||
|
||||
data["results"].append(result.to_dict())
|
||||
|
||||
with open(test_path, 'w') as f:
|
||||
json.dump(data, f, indent=2)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to add result: {e}")
|
||||
|
||||
def analyze_test(self, test_id: str) -> Dict[str, Any]:
|
||||
"""Analyze test results."""
|
||||
test_path = self.test_dir / f"{test_id}.json"
|
||||
if not test_path.exists():
|
||||
return {"error": "Test not found"}
|
||||
|
||||
try:
|
||||
with open(test_path, 'r') as f:
|
||||
data = json.load(f)
|
||||
|
||||
results = data.get("results", [])
|
||||
if not results:
|
||||
return {"error": "No results yet"}
|
||||
|
||||
cold_results = [r for r in results if r["session_type"] == "cold"]
|
||||
warm_results = [r for r in results if r["session_type"] == "warm"]
|
||||
|
||||
def calc_stats(result_list):
|
||||
if not result_list:
|
||||
return {"count": 0}
|
||||
|
||||
error_rates = [r["metrics"]["error_rate"] for r in result_list]
|
||||
success_rates = [r["metrics"]["success_rate"] for r in result_list]
|
||||
|
||||
return {
|
||||
"count": len(result_list),
|
||||
"avg_error_rate": statistics.mean(error_rates) if error_rates else 0,
|
||||
"avg_success_rate": statistics.mean(success_rates) if success_rates else 0,
|
||||
"success_count": sum(1 for r in result_list if r["success"])
|
||||
}
|
||||
|
||||
cold_stats = calc_stats(cold_results)
|
||||
warm_stats = calc_stats(warm_results)
|
||||
|
||||
improvement = {}
|
||||
if cold_stats.get("count", 0) > 0 and warm_stats.get("count", 0) > 0:
|
||||
cold_error = cold_stats.get("avg_error_rate", 0)
|
||||
warm_error = warm_stats.get("avg_error_rate", 0)
|
||||
if cold_error > 0:
|
||||
improvement["error_rate"] = (cold_error - warm_error) / cold_error
|
||||
|
||||
return {
|
||||
"test_id": test_id,
|
||||
"task": data.get("task", {}),
|
||||
"cold": cold_stats,
|
||||
"warm": warm_stats,
|
||||
"improvement": improvement,
|
||||
"recommendation": self._get_recommendation(cold_stats, warm_stats)
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to analyze test: {e}")
|
||||
return {"error": str(e)}
|
||||
|
||||
def _get_recommendation(self, cold_stats: Dict, warm_stats: Dict) -> str:
|
||||
if cold_stats.get("count", 0) < 3 or warm_stats.get("count", 0) < 3:
|
||||
return "Insufficient data (need at least 3 tests each)"
|
||||
|
||||
cold_error = cold_stats.get("avg_error_rate", 0)
|
||||
warm_error = warm_stats.get("avg_error_rate", 0)
|
||||
|
||||
if warm_error < cold_error * 0.8:
|
||||
return "WARM recommended: Significant error reduction"
|
||||
elif warm_error > cold_error * 1.2:
|
||||
return "COLD recommended: Warm sessions performed worse"
|
||||
else:
|
||||
return "No significant difference detected"
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Template Management
|
||||
# ============================================================================
|
||||
|
||||
class TemplateManager:
|
||||
"""Manage warm session templates."""
|
||||
|
||||
def __init__(self, template_dir: Path = None):
|
||||
self.template_dir = template_dir or Path.home() / ".hermes" / "warm_templates"
|
||||
self.template_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def save_template(self, template: WarmTemplate) -> Path:
|
||||
"""Save a template."""
|
||||
path = self.template_dir / f"{template.template_id}.json"
|
||||
with open(path, 'w') as f:
|
||||
json.dump(template.to_dict(), f, indent=2)
|
||||
return path
|
||||
|
||||
def load_template(self, template_id: str) -> Optional[WarmTemplate]:
|
||||
"""Load a template."""
|
||||
path = self.template_dir / f"{template_id}.json"
|
||||
if not path.exists():
|
||||
return None
|
||||
|
||||
try:
|
||||
with open(path, 'r') as f:
|
||||
data = json.load(f)
|
||||
return WarmTemplate.from_dict(data)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load template: {e}")
|
||||
return None
|
||||
|
||||
def list_templates(self) -> List[Dict]:
|
||||
"""List all templates."""
|
||||
templates = []
|
||||
for path in self.template_dir.glob("*.json"):
|
||||
try:
|
||||
with open(path, 'r') as f:
|
||||
data = json.load(f)
|
||||
templates.append({
|
||||
"template_id": data.get("template_id"),
|
||||
"name": data.get("name"),
|
||||
"description": data.get("description"),
|
||||
"usage_count": data.get("usage_count", 0),
|
||||
"success_rate": data.get("success_rate", 0.0),
|
||||
"version": data.get("version", "1.0")
|
||||
})
|
||||
except:
|
||||
pass
|
||||
return templates
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Bootstrapper
|
||||
# ============================================================================
|
||||
|
||||
class SessionBootstrapper:
|
||||
"""Bootstrap warm sessions from templates."""
|
||||
|
||||
def __init__(self, template_manager: TemplateManager = None):
|
||||
self.template_manager = template_manager or TemplateManager()
|
||||
|
||||
def prepare_messages(
|
||||
self,
|
||||
template: WarmTemplate,
|
||||
user_message: str,
|
||||
include_examples: bool = True
|
||||
) -> List[Dict]:
|
||||
"""Prepare messages for a warm session."""
|
||||
messages = []
|
||||
|
||||
# Add warm context
|
||||
warm_context = self._build_warm_context(template.seed)
|
||||
if warm_context:
|
||||
messages.append({"role": "system", "content": warm_context})
|
||||
|
||||
# Add tool examples
|
||||
if include_examples and template.seed.tool_examples:
|
||||
example_messages = self._create_example_messages(template.seed.tool_examples)
|
||||
messages.extend(example_messages)
|
||||
|
||||
# Add user message
|
||||
messages.append({"role": "user", "content": user_message})
|
||||
|
||||
return messages
|
||||
|
||||
def _build_warm_context(self, seed: SessionSeed) -> str:
|
||||
parts = []
|
||||
if seed.system_context:
|
||||
parts.append(seed.system_context)
|
||||
if seed.context_markers:
|
||||
parts.append("\nKnown context: " + ", ".join(seed.context_markers[:10]))
|
||||
if seed.user_patterns:
|
||||
style = seed.user_patterns.get("preferred_style", "balanced")
|
||||
parts.append(f"\nUser prefers {style} interactions.")
|
||||
return "\n".join(parts)[:1500]
|
||||
|
||||
def _create_example_messages(self, examples: List[Dict]) -> List[Dict]:
|
||||
messages = []
|
||||
for i, ex in enumerate(examples[:3]):
|
||||
messages.append({"role": "user", "content": f"[Example {i+1}] Use {ex['tool']}"})
|
||||
messages.append({
|
||||
"role": "assistant",
|
||||
"content": f"I'll use {ex['tool']}.",
|
||||
"tool_calls": [{
|
||||
"id": f"example_{i}",
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": ex["tool"],
|
||||
"arguments": ex.get("arguments", "{}")
|
||||
}
|
||||
}]
|
||||
})
|
||||
messages.append({
|
||||
"role": "tool",
|
||||
"tool_call_id": f"example_{i}",
|
||||
"content": ex.get("result_preview", "Success")
|
||||
})
|
||||
return messages
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# CLI Interface
|
||||
# ============================================================================
|
||||
|
||||
def unified_cli(args: List[str]) -> int:
|
||||
"""CLI interface for unified warm session framework."""
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="Unified warm session framework")
|
||||
subparsers = parser.add_subparsers(dest="command")
|
||||
|
||||
# Extract template
|
||||
extract_parser = subparsers.add_parser("extract", help="Extract template from session")
|
||||
extract_parser.add_argument("session_id", help="Session ID")
|
||||
extract_parser.add_argument("--name", "-n", required=True, help="Template name")
|
||||
extract_parser.add_argument("--description", "-d", default="", help="Description")
|
||||
|
||||
# List templates
|
||||
subparsers.add_parser("list", help="List templates")
|
||||
|
||||
# Test warm session
|
||||
test_parser = subparsers.add_parser("test", help="Test warm session")
|
||||
test_parser.add_argument("template_id", help="Template ID")
|
||||
test_parser.add_argument("message", help="Test message")
|
||||
|
||||
# Analyze session
|
||||
analyze_parser = subparsers.add_parser("analyze", help="Analyze session quality")
|
||||
analyze_parser.add_argument("session_id", help="Session ID")
|
||||
|
||||
# Create A/B test
|
||||
create_test_parser = subparsers.add_parser("create-test", help="Create A/B test")
|
||||
create_test_parser.add_argument("--task-id", required=True, help="Task ID")
|
||||
create_test_parser.add_argument("--description", required=True, help="Task description")
|
||||
create_test_parser.add_argument("--prompt", required=True, help="Test prompt")
|
||||
|
||||
# Add test result
|
||||
add_result_parser = subparsers.add_parser("add-result", help="Add test result")
|
||||
add_result_parser.add_argument("test_id", help="Test ID")
|
||||
add_result_parser.add_argument("--session-type", required=True, choices=["cold", "warm"])
|
||||
add_result_parser.add_argument("--session-id", required=True, help="Session ID")
|
||||
add_result_parser.add_argument("--tool-calls", type=int, default=0)
|
||||
add_result_parser.add_argument("--successful-calls", type=int, default=0)
|
||||
add_result_parser.add_argument("--success", action="store_true")
|
||||
|
||||
# Analyze test
|
||||
analyze_test_parser = subparsers.add_parser("analyze-test", help="Analyze A/B test")
|
||||
analyze_test_parser.add_argument("test_id", help="Test ID")
|
||||
|
||||
parsed = parser.parse_args(args)
|
||||
|
||||
if not parsed.command:
|
||||
parser.print_help()
|
||||
return 1
|
||||
|
||||
# Import session DB
|
||||
session_db = None
|
||||
try:
|
||||
from hermes_state import SessionDB
|
||||
session_db = SessionDB()
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
if parsed.command == "extract":
|
||||
extractor = SessionExtractor(session_db)
|
||||
seed = extractor.extract_seed(parsed.session_id)
|
||||
|
||||
if not seed:
|
||||
print(f"Failed to extract seed from session {parsed.session_id}")
|
||||
return 1
|
||||
|
||||
template = WarmTemplate(
|
||||
template_id=f"warm_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
|
||||
name=parsed.name,
|
||||
description=parsed.description,
|
||||
seed=seed,
|
||||
created_at=datetime.now().isoformat(),
|
||||
source_session_id=parsed.session_id,
|
||||
version="1.0"
|
||||
)
|
||||
|
||||
manager = TemplateManager()
|
||||
path = manager.save_template(template)
|
||||
print(f"Created template: {template.template_id}")
|
||||
print(f"Saved to: {path}")
|
||||
return 0
|
||||
|
||||
elif parsed.command == "list":
|
||||
manager = TemplateManager()
|
||||
templates = manager.list_templates()
|
||||
|
||||
if not templates:
|
||||
print("No templates found.")
|
||||
return 0
|
||||
|
||||
print("\n=== Warm Session Templates ===\n")
|
||||
for t in templates:
|
||||
print(f"ID: {t['template_id']}")
|
||||
print(f" Name: {t['name']}")
|
||||
print(f" Description: {t['description']}")
|
||||
print(f" Version: {t['version']}")
|
||||
print(f" Usage: {t['usage_count']} times, {t['success_rate']:.0%} success")
|
||||
print()
|
||||
|
||||
return 0
|
||||
|
||||
elif parsed.command == "test":
|
||||
manager = TemplateManager()
|
||||
template = manager.load_template(parsed.template_id)
|
||||
|
||||
if not template:
|
||||
print(f"Template {parsed.template_id} not found")
|
||||
return 1
|
||||
|
||||
bootstrapper = SessionBootstrapper(manager)
|
||||
messages = bootstrapper.prepare_messages(template, parsed.message)
|
||||
|
||||
print(f"\n=== Warm Session Test: {template.name} ===\n")
|
||||
print(f"Generated {len(messages)} messages")
|
||||
|
||||
for i, msg in enumerate(messages):
|
||||
role = msg.get("role", "unknown")
|
||||
if role == "system":
|
||||
print(f"\n[System Context] ({len(msg.get('content', ''))} chars)")
|
||||
elif role == "user":
|
||||
print(f"\n[User]: {msg.get('content', '')}")
|
||||
elif role == "assistant":
|
||||
print(f"[Assistant]: {msg.get('content', '')}")
|
||||
if msg.get("tool_calls"):
|
||||
for tc in msg["tool_calls"]:
|
||||
func = tc.get("function", {})
|
||||
print(f" -> {func.get('name')}()")
|
||||
elif role == "tool":
|
||||
print(f" [Result]: {msg.get('content', '')[:50]}...")
|
||||
|
||||
return 0
|
||||
|
||||
elif parsed.command == "analyze":
|
||||
analyzer = QualityAnalyzer(session_db)
|
||||
metrics = analyzer.analyze_session(parsed.session_id)
|
||||
|
||||
if not metrics:
|
||||
print(f"Failed to analyze session {parsed.session_id}")
|
||||
return 1
|
||||
|
||||
print(f"\n=== Session Quality: {parsed.session_id} ===\n")
|
||||
print(f"Messages: {metrics.message_count}")
|
||||
print(f"Tool calls: {metrics.tool_calls}")
|
||||
print(f"Error rate: {metrics.error_rate:.1%}")
|
||||
print(f"Success rate: {metrics.success_rate:.1%}")
|
||||
print(f"Efficiency score: {metrics.efficiency_score:.2f}")
|
||||
|
||||
return 0
|
||||
|
||||
elif parsed.command == "create-test":
|
||||
task = TestTask(
|
||||
task_id=parsed.task_id,
|
||||
description=parsed.description,
|
||||
prompt=parsed.prompt
|
||||
)
|
||||
|
||||
manager = ABTestManager()
|
||||
test_id = manager.create_test(task)
|
||||
print(f"Created test: {test_id}")
|
||||
return 0
|
||||
|
||||
elif parsed.command == "add-result":
|
||||
analyzer = QualityAnalyzer(session_db)
|
||||
metrics = analyzer.analyze_session(parsed.session_id, SessionType(parsed.session_type))
|
||||
|
||||
if not metrics:
|
||||
print(f"Failed to analyze session {parsed.session_id}")
|
||||
return 1
|
||||
|
||||
metrics.tool_calls = parsed.tool_calls or metrics.tool_calls
|
||||
metrics.successful_tool_calls = parsed.successful_calls or metrics.successful_tool_calls
|
||||
|
||||
result = TestResult(
|
||||
test_id=parsed.test_id,
|
||||
task_id="", # Will be filled from test
|
||||
session_id=parsed.session_id,
|
||||
session_type=SessionType(parsed.session_type),
|
||||
metrics=metrics,
|
||||
success=parsed.success
|
||||
)
|
||||
|
||||
manager = ABTestManager()
|
||||
manager.add_result(parsed.test_id, result)
|
||||
print(f"Added result to test {parsed.test_id}")
|
||||
return 0
|
||||
|
||||
elif parsed.command == "analyze-test":
|
||||
manager = ABTestManager()
|
||||
analysis = manager.analyze_test(parsed.test_id)
|
||||
|
||||
if "error" in analysis:
|
||||
print(f"Error: {analysis['error']}")
|
||||
return 1
|
||||
|
||||
print(f"\n=== A/B Test Analysis: {parsed.test_id} ===\n")
|
||||
print(f"Task: {analysis['task'].get('description', 'N/A')}")
|
||||
|
||||
cold = analysis.get("cold", {})
|
||||
warm = analysis.get("warm", {})
|
||||
|
||||
print(f"\nCold sessions: {cold.get('count', 0)}")
|
||||
print(f" Avg error rate: {cold.get('avg_error_rate', 0):.1%}")
|
||||
print(f" Avg success rate: {cold.get('avg_success_rate', 0):.1%}")
|
||||
|
||||
print(f"\nWarm sessions: {warm.get('count', 0)}")
|
||||
print(f" Avg error rate: {warm.get('avg_error_rate', 0):.1%}")
|
||||
print(f" Avg success rate: {warm.get('avg_success_rate', 0):.1%}")
|
||||
|
||||
improvement = analysis.get("improvement", {})
|
||||
if improvement:
|
||||
print(f"\nImprovement:")
|
||||
if "error_rate" in improvement:
|
||||
print(f" Error rate: {improvement['error_rate']:+.1%}")
|
||||
|
||||
print(f"\nRecommendation: {analysis.get('recommendation', 'N/A')}")
|
||||
|
||||
return 0
|
||||
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
sys.exit(unified_cli(sys.argv[1:]))
|
||||
Reference in New Issue
Block a user