222 lines
6.8 KiB
Python
222 lines
6.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Knowledge Store Staleness Detector
|
|
|
|
Checks knowledge entries against their source files to detect staleness.
|
|
An entry is stale when its source file has been modified since extraction.
|
|
|
|
Usage:
|
|
python3 scripts/knowledge_staleness_check.py knowledge/index.json
|
|
python3 scripts/knowledge_staleness_check.py --repo /path/to/repo --index knowledge/index.json
|
|
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --fix
|
|
|
|
Expected index.json format:
|
|
{
|
|
"version": 1,
|
|
"facts": [
|
|
{
|
|
"fact": "...",
|
|
"category": "fact|pitfall|pattern|tool-quirk",
|
|
"repo": "repo-name",
|
|
"confidence": 0.8,
|
|
"source_file": "path/to/file.py",
|
|
"source_hash": "sha256:abcdef...",
|
|
"extracted_at": "2026-04-13T20:00:00Z"
|
|
}
|
|
]
|
|
}
|
|
"""
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
|
|
def compute_file_hash(filepath: str) -> Optional[str]:
|
|
"""Compute SHA-256 hash of a file. Returns None if file not found."""
|
|
path = Path(filepath)
|
|
if not path.exists():
|
|
return None
|
|
content = path.read_bytes()
|
|
return hashlib.sha256(content).hexdigest()[:16]
|
|
|
|
|
|
def check_staleness(index_path: str, repo_root: str = None) -> dict:
|
|
"""Check all entries in the knowledge index for staleness."""
|
|
index = Path(index_path)
|
|
if not index.exists():
|
|
return {"error": f"Index not found: {index_path}"}
|
|
|
|
data = json.loads(index.read_text())
|
|
facts = data.get("facts", [])
|
|
|
|
if not facts:
|
|
return {
|
|
"total": 0,
|
|
"stale": 0,
|
|
"fresh": 0,
|
|
"no_source": 0,
|
|
"missing_files": 0,
|
|
"stale_entries": [],
|
|
}
|
|
|
|
# Determine repo root
|
|
if repo_root:
|
|
root = Path(repo_root)
|
|
else:
|
|
root = index.parent.parent # knowledge/index.json -> repo root
|
|
|
|
results = {
|
|
"total": len(facts),
|
|
"stale": 0,
|
|
"fresh": 0,
|
|
"no_source": 0,
|
|
"missing_files": 0,
|
|
"stale_entries": [],
|
|
}
|
|
|
|
for i, entry in enumerate(facts):
|
|
source_file = entry.get("source_file")
|
|
stored_hash = entry.get("source_hash")
|
|
|
|
if not source_file:
|
|
results["no_source"] += 1
|
|
continue
|
|
|
|
if not stored_hash:
|
|
# Entry has source file but no hash — consider stale
|
|
results["stale"] += 1
|
|
results["stale_entries"].append({
|
|
"index": i,
|
|
"fact": entry.get("fact", "")[:100],
|
|
"source_file": source_file,
|
|
"reason": "no_hash",
|
|
})
|
|
continue
|
|
|
|
# Compute current hash
|
|
full_path = root / source_file
|
|
current_hash = compute_file_hash(str(full_path))
|
|
|
|
if current_hash is None:
|
|
results["missing_files"] += 1
|
|
results["stale_entries"].append({
|
|
"index": i,
|
|
"fact": entry.get("fact", "")[:100],
|
|
"source_file": source_file,
|
|
"reason": "file_missing",
|
|
})
|
|
elif current_hash != stored_hash:
|
|
results["stale"] += 1
|
|
results["stale_entries"].append({
|
|
"index": i,
|
|
"fact": entry.get("fact", "")[:100],
|
|
"source_file": source_file,
|
|
"stored_hash": stored_hash,
|
|
"current_hash": current_hash,
|
|
"reason": "hash_mismatch",
|
|
})
|
|
else:
|
|
results["fresh"] += 1
|
|
|
|
return results
|
|
|
|
|
|
def add_hashes_to_index(index_path: str, repo_root: str = None) -> dict:
|
|
"""Add source hashes to entries that are missing them."""
|
|
index = Path(index_path)
|
|
data = json.loads(index.read_text())
|
|
facts = data.get("facts", [])
|
|
|
|
if repo_root:
|
|
root = Path(repo_root)
|
|
else:
|
|
root = index.parent.parent
|
|
|
|
updated = 0
|
|
skipped = 0
|
|
|
|
for entry in facts:
|
|
source_file = entry.get("source_file")
|
|
if not source_file or entry.get("source_hash"):
|
|
skipped += 1
|
|
continue
|
|
|
|
full_path = root / source_file
|
|
file_hash = compute_file_hash(str(full_path))
|
|
if file_hash:
|
|
entry["source_hash"] = file_hash
|
|
updated += 1
|
|
|
|
if updated > 0:
|
|
index.write_text(json.dumps(data, indent=2) + "\n")
|
|
|
|
return {"updated": updated, "skipped": skipped, "total": len(facts)}
|
|
|
|
|
|
def report_staleness(results: dict) -> str:
|
|
"""Format staleness check results as a report."""
|
|
lines = []
|
|
lines.append("=" * 50)
|
|
lines.append("KNOWLEDGE STORE STALENESS REPORT")
|
|
lines.append("=" * 50)
|
|
lines.append(f"Total entries: {results['total']}")
|
|
lines.append(f"Fresh: {results['fresh']}")
|
|
lines.append(f"Stale: {results['stale']}")
|
|
lines.append(f"No source: {results['no_source']}")
|
|
lines.append(f"Missing files: {results['missing_files']}")
|
|
lines.append("")
|
|
|
|
if results["stale_entries"]:
|
|
lines.append("STALE ENTRIES:")
|
|
lines.append("-" * 50)
|
|
for entry in results["stale_entries"]:
|
|
lines.append(f" [{entry['reason']}] {entry['source_file']}")
|
|
lines.append(f" {entry['fact']}")
|
|
if entry.get("stored_hash") and entry.get("current_hash"):
|
|
lines.append(f" stored: {entry['stored_hash']}")
|
|
lines.append(f" current: {entry['current_hash']}")
|
|
lines.append("")
|
|
|
|
if results["total"] > 0:
|
|
staleness_pct = results["stale"] / results["total"] * 100
|
|
lines.append(f"Staleness rate: {staleness_pct:.1f}%")
|
|
else:
|
|
lines.append("No entries to check.")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Check knowledge store for stale entries")
|
|
parser.add_argument("--index", default="knowledge/index.json", help="Path to index.json")
|
|
parser.add_argument("--repo", help="Repository root (default: auto-detect from index path)")
|
|
parser.add_argument("--fix", action="store_true", help="Add missing hashes to index")
|
|
parser.add_argument("--json", action="store_true", help="Output JSON instead of report")
|
|
args = parser.parse_args()
|
|
|
|
if args.fix:
|
|
result = add_hashes_to_index(args.index, args.repo)
|
|
if args.json:
|
|
print(json.dumps(result, indent=2))
|
|
else:
|
|
print(f"Updated {result['updated']} entries with source hashes.")
|
|
print(f"Skipped {result['skipped']} (already had hashes or no source file).")
|
|
else:
|
|
results = check_staleness(args.index, args.repo)
|
|
if "error" in results:
|
|
print(f"Error: {results['error']}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if args.json:
|
|
print(json.dumps(results, indent=2))
|
|
else:
|
|
print(report_staleness(results))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|