diff --git a/scripts/knowledge_staleness_check.py b/scripts/knowledge_staleness_check.py new file mode 100644 index 0000000..e6f4d4b --- /dev/null +++ b/scripts/knowledge_staleness_check.py @@ -0,0 +1,221 @@ +#!/usr/bin/env python3 +""" +Knowledge Store Staleness Detector + +Checks knowledge entries against their source files to detect staleness. +An entry is stale when its source file has been modified since extraction. + +Usage: + python3 scripts/knowledge_staleness_check.py knowledge/index.json + python3 scripts/knowledge_staleness_check.py --repo /path/to/repo --index knowledge/index.json + python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --fix + +Expected index.json format: +{ + "version": 1, + "facts": [ + { + "fact": "...", + "category": "fact|pitfall|pattern|tool-quirk", + "repo": "repo-name", + "confidence": 0.8, + "source_file": "path/to/file.py", + "source_hash": "sha256:abcdef...", + "extracted_at": "2026-04-13T20:00:00Z" + } + ] +} +""" + +import argparse +import hashlib +import json +import sys +from pathlib import Path +from typing import Optional + + +def compute_file_hash(filepath: str) -> Optional[str]: + """Compute SHA-256 hash of a file. Returns None if file not found.""" + path = Path(filepath) + if not path.exists(): + return None + content = path.read_bytes() + return hashlib.sha256(content).hexdigest()[:16] + + +def check_staleness(index_path: str, repo_root: str = None) -> dict: + """Check all entries in the knowledge index for staleness.""" + index = Path(index_path) + if not index.exists(): + return {"error": f"Index not found: {index_path}"} + + data = json.loads(index.read_text()) + facts = data.get("facts", []) + + if not facts: + return { + "total": 0, + "stale": 0, + "fresh": 0, + "no_source": 0, + "missing_files": 0, + "stale_entries": [], + } + + # Determine repo root + if repo_root: + root = Path(repo_root) + else: + root = index.parent.parent # knowledge/index.json -> repo root + + results = { + "total": len(facts), + "stale": 0, + "fresh": 0, + "no_source": 0, + "missing_files": 0, + "stale_entries": [], + } + + for i, entry in enumerate(facts): + source_file = entry.get("source_file") + stored_hash = entry.get("source_hash") + + if not source_file: + results["no_source"] += 1 + continue + + if not stored_hash: + # Entry has source file but no hash — consider stale + results["stale"] += 1 + results["stale_entries"].append({ + "index": i, + "fact": entry.get("fact", "")[:100], + "source_file": source_file, + "reason": "no_hash", + }) + continue + + # Compute current hash + full_path = root / source_file + current_hash = compute_file_hash(str(full_path)) + + if current_hash is None: + results["missing_files"] += 1 + results["stale_entries"].append({ + "index": i, + "fact": entry.get("fact", "")[:100], + "source_file": source_file, + "reason": "file_missing", + }) + elif current_hash != stored_hash: + results["stale"] += 1 + results["stale_entries"].append({ + "index": i, + "fact": entry.get("fact", "")[:100], + "source_file": source_file, + "stored_hash": stored_hash, + "current_hash": current_hash, + "reason": "hash_mismatch", + }) + else: + results["fresh"] += 1 + + return results + + +def add_hashes_to_index(index_path: str, repo_root: str = None) -> dict: + """Add source hashes to entries that are missing them.""" + index = Path(index_path) + data = json.loads(index.read_text()) + facts = data.get("facts", []) + + if repo_root: + root = Path(repo_root) + else: + root = index.parent.parent + + updated = 0 + skipped = 0 + + for entry in facts: + source_file = entry.get("source_file") + if not source_file or entry.get("source_hash"): + skipped += 1 + continue + + full_path = root / source_file + file_hash = compute_file_hash(str(full_path)) + if file_hash: + entry["source_hash"] = file_hash + updated += 1 + + if updated > 0: + index.write_text(json.dumps(data, indent=2) + "\n") + + return {"updated": updated, "skipped": skipped, "total": len(facts)} + + +def report_staleness(results: dict) -> str: + """Format staleness check results as a report.""" + lines = [] + lines.append("=" * 50) + lines.append("KNOWLEDGE STORE STALENESS REPORT") + lines.append("=" * 50) + lines.append(f"Total entries: {results['total']}") + lines.append(f"Fresh: {results['fresh']}") + lines.append(f"Stale: {results['stale']}") + lines.append(f"No source: {results['no_source']}") + lines.append(f"Missing files: {results['missing_files']}") + lines.append("") + + if results["stale_entries"]: + lines.append("STALE ENTRIES:") + lines.append("-" * 50) + for entry in results["stale_entries"]: + lines.append(f" [{entry['reason']}] {entry['source_file']}") + lines.append(f" {entry['fact']}") + if entry.get("stored_hash") and entry.get("current_hash"): + lines.append(f" stored: {entry['stored_hash']}") + lines.append(f" current: {entry['current_hash']}") + lines.append("") + + if results["total"] > 0: + staleness_pct = results["stale"] / results["total"] * 100 + lines.append(f"Staleness rate: {staleness_pct:.1f}%") + else: + lines.append("No entries to check.") + + return "\n".join(lines) + + +def main(): + parser = argparse.ArgumentParser(description="Check knowledge store for stale entries") + parser.add_argument("--index", default="knowledge/index.json", help="Path to index.json") + parser.add_argument("--repo", help="Repository root (default: auto-detect from index path)") + parser.add_argument("--fix", action="store_true", help="Add missing hashes to index") + parser.add_argument("--json", action="store_true", help="Output JSON instead of report") + args = parser.parse_args() + + if args.fix: + result = add_hashes_to_index(args.index, args.repo) + if args.json: + print(json.dumps(result, indent=2)) + else: + print(f"Updated {result['updated']} entries with source hashes.") + print(f"Skipped {result['skipped']} (already had hashes or no source file).") + else: + results = check_staleness(args.index, args.repo) + if "error" in results: + print(f"Error: {results['error']}", file=sys.stderr) + sys.exit(1) + + if args.json: + print(json.dumps(results, indent=2)) + else: + print(report_staleness(results)) + + +if __name__ == "__main__": + main()