343 lines
11 KiB
Python
343 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
skills_audit.py — Skills Drift Detector
|
|
|
|
Compares the skills bundled in the repo against those installed in
|
|
HERMES_HOME/skills/, then reports any drift:
|
|
|
|
- MISSING — skill in repo but not in installed location
|
|
- EXTRA — skill installed but not in repo (local-only)
|
|
- OUTDATED — repo skill.md differs from installed skill.md
|
|
|
|
Usage:
|
|
python wizard-bootstrap/skills_audit.py
|
|
python wizard-bootstrap/skills_audit.py --fix # copy missing skills
|
|
python wizard-bootstrap/skills_audit.py --json
|
|
python wizard-bootstrap/skills_audit.py --repo-root /path/to/hermes-agent
|
|
"""
|
|
|
|
import argparse
|
|
import difflib
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import shutil
|
|
import sys
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data model
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dataclass
|
|
class SkillDrift:
|
|
skill_path: str # e.g. "software-development/code-review"
|
|
status: str # "MISSING" | "EXTRA" | "OUTDATED" | "OK"
|
|
repo_hash: Optional[str] = None
|
|
installed_hash: Optional[str] = None
|
|
diff_lines: list[str] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class AuditReport:
|
|
drifts: list[SkillDrift] = field(default_factory=list)
|
|
repo_root: Path = Path(".")
|
|
installed_root: Path = Path(".")
|
|
|
|
@property
|
|
def has_drift(self) -> bool:
|
|
return any(d.status != "OK" for d in self.drifts)
|
|
|
|
def by_status(self, status: str) -> list[SkillDrift]:
|
|
return [d for d in self.drifts if d.status == status]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _sha256_file(path: Path) -> str:
|
|
h = hashlib.sha256()
|
|
h.update(path.read_bytes())
|
|
return h.hexdigest()[:16]
|
|
|
|
|
|
def _find_skills(root: Path) -> dict[str, Path]:
|
|
"""Return {relative_skill_path: SKILL.md path} for every skill under root."""
|
|
skills: dict[str, Path] = {}
|
|
for skill_md in root.rglob("SKILL.md"):
|
|
# skill path is relative to root, e.g. "software-development/code-review"
|
|
rel = skill_md.parent.relative_to(root)
|
|
skills[str(rel)] = skill_md
|
|
return skills
|
|
|
|
|
|
def _diff_skills(repo_md: Path, installed_md: Path) -> list[str]:
|
|
repo_lines = repo_md.read_text(encoding="utf-8", errors="replace").splitlines()
|
|
inst_lines = installed_md.read_text(encoding="utf-8", errors="replace").splitlines()
|
|
diff = list(
|
|
difflib.unified_diff(
|
|
inst_lines,
|
|
repo_lines,
|
|
fromfile="installed",
|
|
tofile="repo",
|
|
lineterm="",
|
|
)
|
|
)
|
|
return diff
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Core audit logic
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _resolve_installed_skills_root() -> Optional[Path]:
|
|
"""Return the installed skills directory, or None if not found."""
|
|
hermes_home = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
|
|
candidates = [
|
|
hermes_home / "skills",
|
|
hermes_home / "hermes-agent" / "skills",
|
|
]
|
|
for candidate in candidates:
|
|
if candidate.exists():
|
|
return candidate
|
|
return None
|
|
|
|
|
|
def run_audit(repo_root: Path, installed_root: Optional[Path] = None) -> AuditReport:
|
|
repo_skills_dir = repo_root / "skills"
|
|
if not repo_skills_dir.exists():
|
|
print(f"ERROR: Repo skills directory not found: {repo_skills_dir}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
resolved_installed = installed_root or _resolve_installed_skills_root()
|
|
report = AuditReport(
|
|
repo_root=repo_root,
|
|
installed_root=resolved_installed or Path("/not-found"),
|
|
)
|
|
|
|
repo_map = _find_skills(repo_skills_dir)
|
|
|
|
if resolved_installed is None or not resolved_installed.exists():
|
|
# All repo skills are "MISSING" from the installation
|
|
for skill_path in sorted(repo_map):
|
|
report.drifts.append(
|
|
SkillDrift(
|
|
skill_path=skill_path,
|
|
status="MISSING",
|
|
repo_hash=_sha256_file(repo_map[skill_path]),
|
|
)
|
|
)
|
|
return report
|
|
|
|
installed_map = _find_skills(resolved_installed)
|
|
|
|
all_paths = sorted(set(repo_map) | set(installed_map))
|
|
for skill_path in all_paths:
|
|
in_repo = skill_path in repo_map
|
|
in_installed = skill_path in installed_map
|
|
|
|
if in_repo and not in_installed:
|
|
report.drifts.append(
|
|
SkillDrift(
|
|
skill_path=skill_path,
|
|
status="MISSING",
|
|
repo_hash=_sha256_file(repo_map[skill_path]),
|
|
)
|
|
)
|
|
elif in_installed and not in_repo:
|
|
report.drifts.append(
|
|
SkillDrift(
|
|
skill_path=skill_path,
|
|
status="EXTRA",
|
|
installed_hash=_sha256_file(installed_map[skill_path]),
|
|
)
|
|
)
|
|
else:
|
|
rh = _sha256_file(repo_map[skill_path])
|
|
ih = _sha256_file(installed_map[skill_path])
|
|
if rh != ih:
|
|
diff = _diff_skills(repo_map[skill_path], installed_map[skill_path])
|
|
report.drifts.append(
|
|
SkillDrift(
|
|
skill_path=skill_path,
|
|
status="OUTDATED",
|
|
repo_hash=rh,
|
|
installed_hash=ih,
|
|
diff_lines=diff,
|
|
)
|
|
)
|
|
else:
|
|
report.drifts.append(
|
|
SkillDrift(skill_path=skill_path, status="OK", repo_hash=rh, installed_hash=ih)
|
|
)
|
|
|
|
return report
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fix: copy missing skills into installed location
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def apply_fix(report: AuditReport) -> None:
|
|
if report.installed_root == Path("/not-found"):
|
|
print("Cannot fix: installed skills directory not found.", file=sys.stderr)
|
|
return
|
|
|
|
repo_skills_dir = report.repo_root / "skills"
|
|
for drift in report.by_status("MISSING"):
|
|
src = repo_skills_dir / drift.skill_path / "SKILL.md"
|
|
dst = report.installed_root / drift.skill_path / "SKILL.md"
|
|
dst.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.copy2(src, dst)
|
|
print(f" Installed: {drift.skill_path}")
|
|
|
|
for drift in report.by_status("OUTDATED"):
|
|
src = repo_skills_dir / drift.skill_path / "SKILL.md"
|
|
dst = report.installed_root / drift.skill_path / "SKILL.md"
|
|
shutil.copy2(src, dst)
|
|
print(f" Updated: {drift.skill_path}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Rendering
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_GREEN = "\033[32m"
|
|
_RED = "\033[31m"
|
|
_YELLOW = "\033[33m"
|
|
_CYAN = "\033[36m"
|
|
_BOLD = "\033[1m"
|
|
_RESET = "\033[0m"
|
|
|
|
_STATUS_COLOR = {
|
|
"OK": _GREEN,
|
|
"MISSING": _RED,
|
|
"EXTRA": _YELLOW,
|
|
"OUTDATED": _CYAN,
|
|
}
|
|
|
|
|
|
def _render_terminal(report: AuditReport, show_diff: bool = False) -> None:
|
|
print(f"\n{_BOLD}=== Wizard Skills Audit ==={_RESET}")
|
|
print(f" Repo skills: {report.repo_root / 'skills'}")
|
|
print(f" Installed skills: {report.installed_root}\n")
|
|
|
|
if not report.drifts:
|
|
print(f"{_GREEN}No skills found to compare.{_RESET}\n")
|
|
return
|
|
|
|
total = len(report.drifts)
|
|
ok = len(report.by_status("OK"))
|
|
missing = len(report.by_status("MISSING"))
|
|
extra = len(report.by_status("EXTRA"))
|
|
outdated = len(report.by_status("OUTDATED"))
|
|
|
|
for drift in sorted(report.drifts, key=lambda d: (d.status == "OK", d.skill_path)):
|
|
color = _STATUS_COLOR.get(drift.status, _RESET)
|
|
print(f" {color}{drift.status:8}{_RESET} {drift.skill_path}")
|
|
if show_diff and drift.diff_lines:
|
|
for line in drift.diff_lines[:20]:
|
|
print(f" {line}")
|
|
if len(drift.diff_lines) > 20:
|
|
print(f" ... ({len(drift.diff_lines) - 20} more lines)")
|
|
|
|
print()
|
|
print(f" Total: {total} OK: {_GREEN}{ok}{_RESET} "
|
|
f"Missing: {_RED}{missing}{_RESET} "
|
|
f"Extra: {_YELLOW}{extra}{_RESET} "
|
|
f"Outdated: {_CYAN}{outdated}{_RESET}")
|
|
print()
|
|
|
|
if not report.has_drift:
|
|
print(f"{_GREEN}{_BOLD}No drift detected. Skills are in sync.{_RESET}\n")
|
|
else:
|
|
print(f"{_YELLOW}{_BOLD}Drift detected. Run with --fix to sync missing/outdated skills.{_RESET}\n")
|
|
|
|
|
|
def _render_json(report: AuditReport) -> None:
|
|
out = {
|
|
"has_drift": report.has_drift,
|
|
"repo_skills_dir": str(report.repo_root / "skills"),
|
|
"installed_skills_dir": str(report.installed_root),
|
|
"summary": {
|
|
"total": len(report.drifts),
|
|
"ok": len(report.by_status("OK")),
|
|
"missing": len(report.by_status("MISSING")),
|
|
"extra": len(report.by_status("EXTRA")),
|
|
"outdated": len(report.by_status("OUTDATED")),
|
|
},
|
|
"drifts": [
|
|
{
|
|
"skill_path": d.skill_path,
|
|
"status": d.status,
|
|
"repo_hash": d.repo_hash,
|
|
"installed_hash": d.installed_hash,
|
|
"diff_line_count": len(d.diff_lines),
|
|
}
|
|
for d in report.drifts
|
|
if d.status != "OK"
|
|
],
|
|
}
|
|
print(json.dumps(out, indent=2))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(
|
|
description="Audit wizard skills for drift between repo and installed location."
|
|
)
|
|
parser.add_argument(
|
|
"--repo-root",
|
|
default=str(Path(__file__).parent.parent),
|
|
help="Root of the hermes-agent repo (default: parent of this script)",
|
|
)
|
|
parser.add_argument(
|
|
"--installed-root",
|
|
default=None,
|
|
help="Installed skills directory (default: auto-detect from HERMES_HOME)",
|
|
)
|
|
parser.add_argument(
|
|
"--fix",
|
|
action="store_true",
|
|
help="Copy missing/outdated skills from repo to installed location",
|
|
)
|
|
parser.add_argument(
|
|
"--diff",
|
|
action="store_true",
|
|
help="Show diff for outdated skills",
|
|
)
|
|
parser.add_argument(
|
|
"--json",
|
|
action="store_true",
|
|
help="Output results as JSON",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
repo_root = Path(args.repo_root).resolve()
|
|
installed_root = Path(args.installed_root).resolve() if args.installed_root else None
|
|
|
|
report = run_audit(repo_root, installed_root)
|
|
|
|
if args.fix:
|
|
apply_fix(report)
|
|
# Re-run audit after fix to show updated state
|
|
report = run_audit(repo_root, installed_root)
|
|
|
|
if args.json:
|
|
_render_json(report)
|
|
else:
|
|
_render_terminal(report, show_diff=args.diff)
|
|
|
|
sys.exit(0 if not report.has_drift else 1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|