#!/usr/bin/env python3 """ Knowledge Store Staleness Detector — Detect stale knowledge entries by comparing source file hashes. Usage: python3 scripts/knowledge_staleness_check.py --index knowledge/index.json python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --json python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --fix """ import argparse import hashlib import json import os import sys from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Any, Optional def compute_file_hash(filepath: str) -> Optional[str]: """Compute SHA-256 hash of a file. Returns None if file doesn't exist.""" try: with open(filepath, "rb") as f: return "sha256:" + hashlib.sha256(f.read()).hexdigest() except (FileNotFoundError, IsADirectoryError, PermissionError): return None def check_staleness(index_path: str, repo_root: str = ".") -> List[Dict[str, Any]]: """Check all entries in knowledge index for staleness. Returns list of entries with staleness info: - status: "fresh" | "stale" | "missing_source" | "no_hash" - current_hash: computed hash (if source exists) - stored_hash: hash from index """ with open(index_path) as f: data = json.load(f) facts = data.get("facts", []) results = [] for entry in facts: source_file = entry.get("source_file") stored_hash = entry.get("source_hash") if not source_file: results.append({**entry, "status": "no_source", "current_hash": None}) continue full_path = os.path.join(repo_root, source_file) current_hash = compute_file_hash(full_path) if current_hash is None: results.append({**entry, "status": "missing_source", "current_hash": None}) elif not stored_hash: results.append({**entry, "status": "no_hash", "current_hash": current_hash}) elif current_hash != stored_hash: results.append({**entry, "status": "stale", "current_hash": current_hash}) else: results.append({**entry, "status": "fresh", "current_hash": current_hash}) return results def fix_hashes(index_path: str, repo_root: str = ".") -> int: """Add hashes to entries missing them. Returns count of fixed entries.""" with open(index_path) as f: data = json.load(f) fixed = 0 for entry in data.get("facts", []): if entry.get("source_hash"): continue source_file = entry.get("source_file") if not source_file: continue full_path = os.path.join(repo_root, source_file) h = compute_file_hash(full_path) if h: entry["source_hash"] = h fixed += 1 with open(index_path, "w") as f: json.dump(data, f, indent=2) return fixed def main(): parser = argparse.ArgumentParser(description="Check knowledge store staleness") parser.add_argument("--index", required=True, help="Path to knowledge/index.json") parser.add_argument("--repo", default=".", help="Repo root for source file resolution") parser.add_argument("--json", action="store_true", help="Output as JSON") parser.add_argument("--fix", action="store_true", help="Add hashes to entries missing them") args = parser.parse_args() if args.fix: fixed = fix_hashes(args.index, args.repo) print(f"Fixed {fixed} entries with missing hashes.") return results = check_staleness(args.index, args.repo) if args.json: print(json.dumps(results, indent=2)) else: stale = [r for r in results if r["status"] != "fresh"] fresh = [r for r in results if r["status"] == "fresh"] print(f"Knowledge Store Staleness Check") print(f" Total entries: {len(results)}") print(f" Fresh: {len(fresh)}") print(f" Stale/Issues: {len(stale)}") print() if stale: print("Issues found:") for r in stale: status = r["status"] fact = r.get("fact", "?")[:60] source = r.get("source_file", "?") print(f" [{status}] {source}: {fact}") else: print("All entries are fresh!") if __name__ == "__main__": main()