feat: knowledge store staleness detector (#92)

This commit is contained in:
2026-04-15 03:42:12 +00:00
parent 8d716ff03f
commit 20a59d0cb7

View File

@@ -0,0 +1,221 @@
#!/usr/bin/env python3
"""
Knowledge Store Staleness Detector
Checks knowledge entries against their source files to detect staleness.
An entry is stale when its source file has been modified since extraction.
Usage:
python3 scripts/knowledge_staleness_check.py knowledge/index.json
python3 scripts/knowledge_staleness_check.py --repo /path/to/repo --index knowledge/index.json
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --fix
Expected index.json format:
{
"version": 1,
"facts": [
{
"fact": "...",
"category": "fact|pitfall|pattern|tool-quirk",
"repo": "repo-name",
"confidence": 0.8,
"source_file": "path/to/file.py",
"source_hash": "sha256:abcdef...",
"extracted_at": "2026-04-13T20:00:00Z"
}
]
}
"""
import argparse
import hashlib
import json
import sys
from pathlib import Path
from typing import Optional
def compute_file_hash(filepath: str) -> Optional[str]:
"""Compute SHA-256 hash of a file. Returns None if file not found."""
path = Path(filepath)
if not path.exists():
return None
content = path.read_bytes()
return hashlib.sha256(content).hexdigest()[:16]
def check_staleness(index_path: str, repo_root: str = None) -> dict:
"""Check all entries in the knowledge index for staleness."""
index = Path(index_path)
if not index.exists():
return {"error": f"Index not found: {index_path}"}
data = json.loads(index.read_text())
facts = data.get("facts", [])
if not facts:
return {
"total": 0,
"stale": 0,
"fresh": 0,
"no_source": 0,
"missing_files": 0,
"stale_entries": [],
}
# Determine repo root
if repo_root:
root = Path(repo_root)
else:
root = index.parent.parent # knowledge/index.json -> repo root
results = {
"total": len(facts),
"stale": 0,
"fresh": 0,
"no_source": 0,
"missing_files": 0,
"stale_entries": [],
}
for i, entry in enumerate(facts):
source_file = entry.get("source_file")
stored_hash = entry.get("source_hash")
if not source_file:
results["no_source"] += 1
continue
if not stored_hash:
# Entry has source file but no hash — consider stale
results["stale"] += 1
results["stale_entries"].append({
"index": i,
"fact": entry.get("fact", "")[:100],
"source_file": source_file,
"reason": "no_hash",
})
continue
# Compute current hash
full_path = root / source_file
current_hash = compute_file_hash(str(full_path))
if current_hash is None:
results["missing_files"] += 1
results["stale_entries"].append({
"index": i,
"fact": entry.get("fact", "")[:100],
"source_file": source_file,
"reason": "file_missing",
})
elif current_hash != stored_hash:
results["stale"] += 1
results["stale_entries"].append({
"index": i,
"fact": entry.get("fact", "")[:100],
"source_file": source_file,
"stored_hash": stored_hash,
"current_hash": current_hash,
"reason": "hash_mismatch",
})
else:
results["fresh"] += 1
return results
def add_hashes_to_index(index_path: str, repo_root: str = None) -> dict:
"""Add source hashes to entries that are missing them."""
index = Path(index_path)
data = json.loads(index.read_text())
facts = data.get("facts", [])
if repo_root:
root = Path(repo_root)
else:
root = index.parent.parent
updated = 0
skipped = 0
for entry in facts:
source_file = entry.get("source_file")
if not source_file or entry.get("source_hash"):
skipped += 1
continue
full_path = root / source_file
file_hash = compute_file_hash(str(full_path))
if file_hash:
entry["source_hash"] = file_hash
updated += 1
if updated > 0:
index.write_text(json.dumps(data, indent=2) + "\n")
return {"updated": updated, "skipped": skipped, "total": len(facts)}
def report_staleness(results: dict) -> str:
"""Format staleness check results as a report."""
lines = []
lines.append("=" * 50)
lines.append("KNOWLEDGE STORE STALENESS REPORT")
lines.append("=" * 50)
lines.append(f"Total entries: {results['total']}")
lines.append(f"Fresh: {results['fresh']}")
lines.append(f"Stale: {results['stale']}")
lines.append(f"No source: {results['no_source']}")
lines.append(f"Missing files: {results['missing_files']}")
lines.append("")
if results["stale_entries"]:
lines.append("STALE ENTRIES:")
lines.append("-" * 50)
for entry in results["stale_entries"]:
lines.append(f" [{entry['reason']}] {entry['source_file']}")
lines.append(f" {entry['fact']}")
if entry.get("stored_hash") and entry.get("current_hash"):
lines.append(f" stored: {entry['stored_hash']}")
lines.append(f" current: {entry['current_hash']}")
lines.append("")
if results["total"] > 0:
staleness_pct = results["stale"] / results["total"] * 100
lines.append(f"Staleness rate: {staleness_pct:.1f}%")
else:
lines.append("No entries to check.")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Check knowledge store for stale entries")
parser.add_argument("--index", default="knowledge/index.json", help="Path to index.json")
parser.add_argument("--repo", help="Repository root (default: auto-detect from index path)")
parser.add_argument("--fix", action="store_true", help="Add missing hashes to index")
parser.add_argument("--json", action="store_true", help="Output JSON instead of report")
args = parser.parse_args()
if args.fix:
result = add_hashes_to_index(args.index, args.repo)
if args.json:
print(json.dumps(result, indent=2))
else:
print(f"Updated {result['updated']} entries with source hashes.")
print(f"Skipped {result['skipped']} (already had hashes or no source file).")
else:
results = check_staleness(args.index, args.repo)
if "error" in results:
print(f"Error: {results['error']}", file=sys.stderr)
sys.exit(1)
if args.json:
print(json.dumps(results, indent=2))
else:
print(report_staleness(results))
if __name__ == "__main__":
main()