#!/usr/bin/env python3 """ skills_audit.py — Skills Drift Detector Compares the skills bundled in the repo against those installed in HERMES_HOME/skills/, then reports any drift: - MISSING — skill in repo but not in installed location - EXTRA — skill installed but not in repo (local-only) - OUTDATED — repo skill.md differs from installed skill.md Usage: python wizard-bootstrap/skills_audit.py python wizard-bootstrap/skills_audit.py --fix # copy missing skills python wizard-bootstrap/skills_audit.py --json python wizard-bootstrap/skills_audit.py --repo-root /path/to/hermes-agent """ import argparse import difflib import hashlib import json import os import shutil import sys from dataclasses import dataclass, field from pathlib import Path from typing import Optional # --------------------------------------------------------------------------- # Data model # --------------------------------------------------------------------------- @dataclass class SkillDrift: skill_path: str # e.g. "software-development/code-review" status: str # "MISSING" | "EXTRA" | "OUTDATED" | "OK" repo_hash: Optional[str] = None installed_hash: Optional[str] = None diff_lines: list[str] = field(default_factory=list) @dataclass class AuditReport: drifts: list[SkillDrift] = field(default_factory=list) repo_root: Path = Path(".") installed_root: Path = Path(".") @property def has_drift(self) -> bool: return any(d.status != "OK" for d in self.drifts) def by_status(self, status: str) -> list[SkillDrift]: return [d for d in self.drifts if d.status == status] # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _sha256_file(path: Path) -> str: h = hashlib.sha256() h.update(path.read_bytes()) return h.hexdigest()[:16] def _find_skills(root: Path) -> dict[str, Path]: """Return {relative_skill_path: SKILL.md path} for every skill under root.""" skills: dict[str, Path] = {} for skill_md in root.rglob("SKILL.md"): # skill path is relative to root, e.g. "software-development/code-review" rel = skill_md.parent.relative_to(root) skills[str(rel)] = skill_md return skills def _diff_skills(repo_md: Path, installed_md: Path) -> list[str]: repo_lines = repo_md.read_text(encoding="utf-8", errors="replace").splitlines() inst_lines = installed_md.read_text(encoding="utf-8", errors="replace").splitlines() diff = list( difflib.unified_diff( inst_lines, repo_lines, fromfile="installed", tofile="repo", lineterm="", ) ) return diff # --------------------------------------------------------------------------- # Core audit logic # --------------------------------------------------------------------------- def _resolve_installed_skills_root() -> Optional[Path]: """Return the installed skills directory, or None if not found.""" hermes_home = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes")) candidates = [ hermes_home / "skills", hermes_home / "hermes-agent" / "skills", ] for candidate in candidates: if candidate.exists(): return candidate return None def run_audit(repo_root: Path, installed_root: Optional[Path] = None) -> AuditReport: repo_skills_dir = repo_root / "skills" if not repo_skills_dir.exists(): print(f"ERROR: Repo skills directory not found: {repo_skills_dir}", file=sys.stderr) sys.exit(1) resolved_installed = installed_root or _resolve_installed_skills_root() report = AuditReport( repo_root=repo_root, installed_root=resolved_installed or Path("/not-found"), ) repo_map = _find_skills(repo_skills_dir) if resolved_installed is None or not resolved_installed.exists(): # All repo skills are "MISSING" from the installation for skill_path in sorted(repo_map): report.drifts.append( SkillDrift( skill_path=skill_path, status="MISSING", repo_hash=_sha256_file(repo_map[skill_path]), ) ) return report installed_map = _find_skills(resolved_installed) all_paths = sorted(set(repo_map) | set(installed_map)) for skill_path in all_paths: in_repo = skill_path in repo_map in_installed = skill_path in installed_map if in_repo and not in_installed: report.drifts.append( SkillDrift( skill_path=skill_path, status="MISSING", repo_hash=_sha256_file(repo_map[skill_path]), ) ) elif in_installed and not in_repo: report.drifts.append( SkillDrift( skill_path=skill_path, status="EXTRA", installed_hash=_sha256_file(installed_map[skill_path]), ) ) else: rh = _sha256_file(repo_map[skill_path]) ih = _sha256_file(installed_map[skill_path]) if rh != ih: diff = _diff_skills(repo_map[skill_path], installed_map[skill_path]) report.drifts.append( SkillDrift( skill_path=skill_path, status="OUTDATED", repo_hash=rh, installed_hash=ih, diff_lines=diff, ) ) else: report.drifts.append( SkillDrift(skill_path=skill_path, status="OK", repo_hash=rh, installed_hash=ih) ) return report # --------------------------------------------------------------------------- # Fix: copy missing skills into installed location # --------------------------------------------------------------------------- def apply_fix(report: AuditReport) -> None: if report.installed_root == Path("/not-found"): print("Cannot fix: installed skills directory not found.", file=sys.stderr) return repo_skills_dir = report.repo_root / "skills" for drift in report.by_status("MISSING"): src = repo_skills_dir / drift.skill_path / "SKILL.md" dst = report.installed_root / drift.skill_path / "SKILL.md" dst.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(src, dst) print(f" Installed: {drift.skill_path}") for drift in report.by_status("OUTDATED"): src = repo_skills_dir / drift.skill_path / "SKILL.md" dst = report.installed_root / drift.skill_path / "SKILL.md" shutil.copy2(src, dst) print(f" Updated: {drift.skill_path}") # --------------------------------------------------------------------------- # Rendering # --------------------------------------------------------------------------- _GREEN = "\033[32m" _RED = "\033[31m" _YELLOW = "\033[33m" _CYAN = "\033[36m" _BOLD = "\033[1m" _RESET = "\033[0m" _STATUS_COLOR = { "OK": _GREEN, "MISSING": _RED, "EXTRA": _YELLOW, "OUTDATED": _CYAN, } def _render_terminal(report: AuditReport, show_diff: bool = False) -> None: print(f"\n{_BOLD}=== Wizard Skills Audit ==={_RESET}") print(f" Repo skills: {report.repo_root / 'skills'}") print(f" Installed skills: {report.installed_root}\n") if not report.drifts: print(f"{_GREEN}No skills found to compare.{_RESET}\n") return total = len(report.drifts) ok = len(report.by_status("OK")) missing = len(report.by_status("MISSING")) extra = len(report.by_status("EXTRA")) outdated = len(report.by_status("OUTDATED")) for drift in sorted(report.drifts, key=lambda d: (d.status == "OK", d.skill_path)): color = _STATUS_COLOR.get(drift.status, _RESET) print(f" {color}{drift.status:8}{_RESET} {drift.skill_path}") if show_diff and drift.diff_lines: for line in drift.diff_lines[:20]: print(f" {line}") if len(drift.diff_lines) > 20: print(f" ... ({len(drift.diff_lines) - 20} more lines)") print() print(f" Total: {total} OK: {_GREEN}{ok}{_RESET} " f"Missing: {_RED}{missing}{_RESET} " f"Extra: {_YELLOW}{extra}{_RESET} " f"Outdated: {_CYAN}{outdated}{_RESET}") print() if not report.has_drift: print(f"{_GREEN}{_BOLD}No drift detected. Skills are in sync.{_RESET}\n") else: print(f"{_YELLOW}{_BOLD}Drift detected. Run with --fix to sync missing/outdated skills.{_RESET}\n") def _render_json(report: AuditReport) -> None: out = { "has_drift": report.has_drift, "repo_skills_dir": str(report.repo_root / "skills"), "installed_skills_dir": str(report.installed_root), "summary": { "total": len(report.drifts), "ok": len(report.by_status("OK")), "missing": len(report.by_status("MISSING")), "extra": len(report.by_status("EXTRA")), "outdated": len(report.by_status("OUTDATED")), }, "drifts": [ { "skill_path": d.skill_path, "status": d.status, "repo_hash": d.repo_hash, "installed_hash": d.installed_hash, "diff_line_count": len(d.diff_lines), } for d in report.drifts if d.status != "OK" ], } print(json.dumps(out, indent=2)) # --------------------------------------------------------------------------- # CLI entry point # --------------------------------------------------------------------------- def main() -> None: parser = argparse.ArgumentParser( description="Audit wizard skills for drift between repo and installed location." ) parser.add_argument( "--repo-root", default=str(Path(__file__).parent.parent), help="Root of the hermes-agent repo (default: parent of this script)", ) parser.add_argument( "--installed-root", default=None, help="Installed skills directory (default: auto-detect from HERMES_HOME)", ) parser.add_argument( "--fix", action="store_true", help="Copy missing/outdated skills from repo to installed location", ) parser.add_argument( "--diff", action="store_true", help="Show diff for outdated skills", ) parser.add_argument( "--json", action="store_true", help="Output results as JSON", ) args = parser.parse_args() repo_root = Path(args.repo_root).resolve() installed_root = Path(args.installed_root).resolve() if args.installed_root else None report = run_audit(repo_root, installed_root) if args.fix: apply_fix(report) # Re-run audit after fix to show updated state report = run_audit(repo_root, installed_root) if args.json: _render_json(report) else: _render_terminal(report, show_diff=args.diff) sys.exit(0 if not report.has_drift else 1) if __name__ == "__main__": main()