Compare commits

..

3 Commits

2 changed files with 210 additions and 171 deletions

View File

@@ -1,220 +1,130 @@
#!/usr/bin/env python3
"""
Knowledge Store Staleness Detector
Checks knowledge entries against their source files to detect staleness.
An entry is stale when its source file has been modified since extraction.
Knowledge Store Staleness Detector — Detect stale knowledge entries by comparing source file hashes.
Usage:
python3 scripts/knowledge_staleness_check.py knowledge/index.json
python3 scripts/knowledge_staleness_check.py --repo /path/to/repo --index knowledge/index.json
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --fix
Expected index.json format:
{
"version": 1,
"facts": [
{
"fact": "...",
"category": "fact|pitfall|pattern|tool-quirk",
"repo": "repo-name",
"confidence": 0.8,
"source_file": "path/to/file.py",
"source_hash": "sha256:abcdef...",
"extracted_at": "2026-04-13T20:00:00Z"
}
]
}
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --json
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --fix
"""
import argparse
import hashlib
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from typing import Dict, List, Any, Optional
def compute_file_hash(filepath: str) -> Optional[str]:
"""Compute SHA-256 hash of a file. Returns None if file not found."""
path = Path(filepath)
if not path.exists():
"""Compute SHA-256 hash of a file. Returns None if file doesn't exist."""
try:
with open(filepath, "rb") as f:
return "sha256:" + hashlib.sha256(f.read()).hexdigest()
except (FileNotFoundError, IsADirectoryError, PermissionError):
return None
content = path.read_bytes()
return hashlib.sha256(content).hexdigest()[:16]
def check_staleness(index_path: str, repo_root: str = None) -> dict:
"""Check all entries in the knowledge index for staleness."""
index = Path(index_path)
if not index.exists():
return {"error": f"Index not found: {index_path}"}
def check_staleness(index_path: str, repo_root: str = ".") -> List[Dict[str, Any]]:
"""Check all entries in knowledge index for staleness.
Returns list of entries with staleness info:
- status: "fresh" | "stale" | "missing_source" | "no_hash"
- current_hash: computed hash (if source exists)
- stored_hash: hash from index
"""
with open(index_path) as f:
data = json.load(f)
data = json.loads(index.read_text())
facts = data.get("facts", [])
results = []
if not facts:
return {
"total": 0,
"stale": 0,
"fresh": 0,
"no_source": 0,
"missing_files": 0,
"stale_entries": [],
}
# Determine repo root
if repo_root:
root = Path(repo_root)
else:
root = index.parent.parent # knowledge/index.json -> repo root
results = {
"total": len(facts),
"stale": 0,
"fresh": 0,
"no_source": 0,
"missing_files": 0,
"stale_entries": [],
}
for i, entry in enumerate(facts):
for entry in facts:
source_file = entry.get("source_file")
stored_hash = entry.get("source_hash")
if not source_file:
results["no_source"] += 1
results.append({**entry, "status": "no_source", "current_hash": None})
continue
if not stored_hash:
# Entry has source file but no hash — consider stale
results["stale"] += 1
results["stale_entries"].append({
"index": i,
"fact": entry.get("fact", "")[:100],
"source_file": source_file,
"reason": "no_hash",
})
continue
# Compute current hash
full_path = root / source_file
current_hash = compute_file_hash(str(full_path))
full_path = os.path.join(repo_root, source_file)
current_hash = compute_file_hash(full_path)
if current_hash is None:
results["missing_files"] += 1
results["stale_entries"].append({
"index": i,
"fact": entry.get("fact", "")[:100],
"source_file": source_file,
"reason": "file_missing",
})
results.append({**entry, "status": "missing_source", "current_hash": None})
elif not stored_hash:
results.append({**entry, "status": "no_hash", "current_hash": current_hash})
elif current_hash != stored_hash:
results["stale"] += 1
results["stale_entries"].append({
"index": i,
"fact": entry.get("fact", "")[:100],
"source_file": source_file,
"stored_hash": stored_hash,
"current_hash": current_hash,
"reason": "hash_mismatch",
})
results.append({**entry, "status": "stale", "current_hash": current_hash})
else:
results["fresh"] += 1
results.append({**entry, "status": "fresh", "current_hash": current_hash})
return results
def add_hashes_to_index(index_path: str, repo_root: str = None) -> dict:
"""Add source hashes to entries that are missing them."""
index = Path(index_path)
data = json.loads(index.read_text())
facts = data.get("facts", [])
def fix_hashes(index_path: str, repo_root: str = ".") -> int:
"""Add hashes to entries missing them. Returns count of fixed entries."""
with open(index_path) as f:
data = json.load(f)
if repo_root:
root = Path(repo_root)
else:
root = index.parent.parent
updated = 0
skipped = 0
for entry in facts:
source_file = entry.get("source_file")
if not source_file or entry.get("source_hash"):
skipped += 1
fixed = 0
for entry in data.get("facts", []):
if entry.get("source_hash"):
continue
source_file = entry.get("source_file")
if not source_file:
continue
full_path = os.path.join(repo_root, source_file)
h = compute_file_hash(full_path)
if h:
entry["source_hash"] = h
fixed += 1
full_path = root / source_file
file_hash = compute_file_hash(str(full_path))
if file_hash:
entry["source_hash"] = file_hash
updated += 1
with open(index_path, "w") as f:
json.dump(data, f, indent=2)
if updated > 0:
index.write_text(json.dumps(data, indent=2) + "\n")
return {"updated": updated, "skipped": skipped, "total": len(facts)}
def report_staleness(results: dict) -> str:
"""Format staleness check results as a report."""
lines = []
lines.append("=" * 50)
lines.append("KNOWLEDGE STORE STALENESS REPORT")
lines.append("=" * 50)
lines.append(f"Total entries: {results['total']}")
lines.append(f"Fresh: {results['fresh']}")
lines.append(f"Stale: {results['stale']}")
lines.append(f"No source: {results['no_source']}")
lines.append(f"Missing files: {results['missing_files']}")
lines.append("")
if results["stale_entries"]:
lines.append("STALE ENTRIES:")
lines.append("-" * 50)
for entry in results["stale_entries"]:
lines.append(f" [{entry['reason']}] {entry['source_file']}")
lines.append(f" {entry['fact']}")
if entry.get("stored_hash") and entry.get("current_hash"):
lines.append(f" stored: {entry['stored_hash']}")
lines.append(f" current: {entry['current_hash']}")
lines.append("")
if results["total"] > 0:
staleness_pct = results["stale"] / results["total"] * 100
lines.append(f"Staleness rate: {staleness_pct:.1f}%")
else:
lines.append("No entries to check.")
return "\n".join(lines)
return fixed
def main():
parser = argparse.ArgumentParser(description="Check knowledge store for stale entries")
parser.add_argument("--index", default="knowledge/index.json", help="Path to index.json")
parser.add_argument("--repo", help="Repository root (default: auto-detect from index path)")
parser.add_argument("--fix", action="store_true", help="Add missing hashes to index")
parser.add_argument("--json", action="store_true", help="Output JSON instead of report")
parser = argparse.ArgumentParser(description="Check knowledge store staleness")
parser.add_argument("--index", required=True, help="Path to knowledge/index.json")
parser.add_argument("--repo", default=".", help="Repo root for source file resolution")
parser.add_argument("--json", action="store_true", help="Output as JSON")
parser.add_argument("--fix", action="store_true", help="Add hashes to entries missing them")
args = parser.parse_args()
if args.fix:
result = add_hashes_to_index(args.index, args.repo)
if args.json:
print(json.dumps(result, indent=2))
else:
print(f"Updated {result['updated']} entries with source hashes.")
print(f"Skipped {result['skipped']} (already had hashes or no source file).")
else:
results = check_staleness(args.index, args.repo)
if "error" in results:
print(f"Error: {results['error']}", file=sys.stderr)
sys.exit(1)
fixed = fix_hashes(args.index, args.repo)
print(f"Fixed {fixed} entries with missing hashes.")
return
if args.json:
print(json.dumps(results, indent=2))
results = check_staleness(args.index, args.repo)
if args.json:
print(json.dumps(results, indent=2))
else:
stale = [r for r in results if r["status"] != "fresh"]
fresh = [r for r in results if r["status"] == "fresh"]
print(f"Knowledge Store Staleness Check")
print(f" Total entries: {len(results)}")
print(f" Fresh: {len(fresh)}")
print(f" Stale/Issues: {len(stale)}")
print()
if stale:
print("Issues found:")
for r in stale:
status = r["status"]
fact = r.get("fact", "?")[:60]
source = r.get("source_file", "?")
print(f" [{status}] {source}: {fact}")
else:
print(report_staleness(results))
print("All entries are fresh!")
if __name__ == "__main__":

View File

@@ -0,0 +1,129 @@
#!/usr/bin/env python3
"""Tests for scripts/knowledge_staleness_check.py — 8 tests."""
import json
import os
import sys
import tempfile
sys.path.insert(0, os.path.dirname(__file__) or ".")
import importlib.util
spec = importlib.util.spec_from_file_location("ks", os.path.join(os.path.dirname(__file__) or ".", "knowledge_staleness_check.py"))
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
check_staleness = mod.check_staleness
fix_hashes = mod.fix_hashes
compute_file_hash = mod.compute_file_hash
def test_fresh_entry():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("print('hello')")
h = compute_file_hash(src)
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "hello", "source_file": "source.py", "source_hash": h}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "fresh"
print("PASS: test_fresh_entry")
def test_stale_entry():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("original content")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "old", "source_file": "source.py", "source_hash": "sha256:wrong"}]}, f)
# Now change the source
with open(src, "w") as f:
f.write("modified content")
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "stale"
print("PASS: test_stale_entry")
def test_missing_source():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "gone", "source_file": "nonexistent.py", "source_hash": "sha256:abc"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "missing_source"
print("PASS: test_missing_source")
def test_no_hash():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("content")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "no hash", "source_file": "source.py"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "no_hash"
assert results[0]["current_hash"].startswith("sha256:")
print("PASS: test_no_hash")
def test_no_source_field():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "orphan"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "no_source"
print("PASS: test_no_source_field")
def test_fix_hashes():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("content for hashing")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "needs hash", "source_file": "source.py"}]}, f)
fixed = fix_hashes(idx, tmpdir)
assert fixed == 1
# Verify hash was added
with open(idx) as f:
data = json.load(f)
assert data["facts"][0]["source_hash"].startswith("sha256:")
print("PASS: test_fix_hashes")
def test_empty_index():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": []}, f)
results = check_staleness(idx, tmpdir)
assert results == []
print("PASS: test_empty_index")
def test_compute_hash_nonexistent():
h = compute_file_hash("/nonexistent/path/file.py")
assert h is None
print("PASS: test_compute_hash_nonexistent")
def run_all():
test_fresh_entry()
test_stale_entry()
test_missing_source()
test_no_hash()
test_no_source_field()
test_fix_hashes()
test_empty_index()
test_compute_hash_nonexistent()
print("\nAll 8 tests passed!")
if __name__ == "__main__":
run_all()