diff --git a/scripts/knowledge_staleness_check.py b/scripts/knowledge_staleness_check.py new file mode 100644 index 0000000..fa2de7f --- /dev/null +++ b/scripts/knowledge_staleness_check.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python3 +""" +Knowledge Store Staleness Detector — Detect stale knowledge entries by comparing source file hashes. + +Usage: + python3 scripts/knowledge_staleness_check.py --index knowledge/index.json + python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --json + python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --fix +""" + +import argparse +import hashlib +import json +import os +import sys +from datetime import datetime, timezone +from pathlib import Path +from typing import Dict, List, Any, Optional + + +def compute_file_hash(filepath: str) -> Optional[str]: + """Compute SHA-256 hash of a file. Returns None if file doesn't exist.""" + try: + with open(filepath, "rb") as f: + return "sha256:" + hashlib.sha256(f.read()).hexdigest() + except (FileNotFoundError, IsADirectoryError, PermissionError): + return None + + +def check_staleness(index_path: str, repo_root: str = ".") -> List[Dict[str, Any]]: + """Check all entries in knowledge index for staleness. + + Returns list of entries with staleness info: + - status: "fresh" | "stale" | "missing_source" | "no_hash" + - current_hash: computed hash (if source exists) + - stored_hash: hash from index + """ + with open(index_path) as f: + data = json.load(f) + + facts = data.get("facts", []) + results = [] + + for entry in facts: + source_file = entry.get("source_file") + stored_hash = entry.get("source_hash") + + if not source_file: + results.append({**entry, "status": "no_source", "current_hash": None}) + continue + + full_path = os.path.join(repo_root, source_file) + current_hash = compute_file_hash(full_path) + + if current_hash is None: + results.append({**entry, "status": "missing_source", "current_hash": None}) + elif not stored_hash: + results.append({**entry, "status": "no_hash", "current_hash": current_hash}) + elif current_hash != stored_hash: + results.append({**entry, "status": "stale", "current_hash": current_hash}) + else: + results.append({**entry, "status": "fresh", "current_hash": current_hash}) + + return results + + +def fix_hashes(index_path: str, repo_root: str = ".") -> int: + """Add hashes to entries missing them. Returns count of fixed entries.""" + with open(index_path) as f: + data = json.load(f) + + fixed = 0 + for entry in data.get("facts", []): + if entry.get("source_hash"): + continue + source_file = entry.get("source_file") + if not source_file: + continue + full_path = os.path.join(repo_root, source_file) + h = compute_file_hash(full_path) + if h: + entry["source_hash"] = h + fixed += 1 + + with open(index_path, "w") as f: + json.dump(data, f, indent=2) + + return fixed + + +def main(): + parser = argparse.ArgumentParser(description="Check knowledge store staleness") + parser.add_argument("--index", required=True, help="Path to knowledge/index.json") + parser.add_argument("--repo", default=".", help="Repo root for source file resolution") + parser.add_argument("--json", action="store_true", help="Output as JSON") + parser.add_argument("--fix", action="store_true", help="Add hashes to entries missing them") + + args = parser.parse_args() + + if args.fix: + fixed = fix_hashes(args.index, args.repo) + print(f"Fixed {fixed} entries with missing hashes.") + return + + results = check_staleness(args.index, args.repo) + + if args.json: + print(json.dumps(results, indent=2)) + else: + stale = [r for r in results if r["status"] != "fresh"] + fresh = [r for r in results if r["status"] == "fresh"] + + print(f"Knowledge Store Staleness Check") + print(f" Total entries: {len(results)}") + print(f" Fresh: {len(fresh)}") + print(f" Stale/Issues: {len(stale)}") + print() + + if stale: + print("Issues found:") + for r in stale: + status = r["status"] + fact = r.get("fact", "?")[:60] + source = r.get("source_file", "?") + print(f" [{status}] {source}: {fact}") + else: + print("All entries are fresh!") + + +if __name__ == "__main__": + main()