Compare commits

..

1 Commits

Author SHA1 Message Date
20a59d0cb7 feat: knowledge store staleness detector (#92) 2026-04-15 03:42:12 +00:00
4 changed files with 171 additions and 506 deletions

View File

@@ -1,130 +1,220 @@
#!/usr/bin/env python3
"""
Knowledge Store Staleness Detector — Detect stale knowledge entries by comparing source file hashes.
Knowledge Store Staleness Detector
Checks knowledge entries against their source files to detect staleness.
An entry is stale when its source file has been modified since extraction.
Usage:
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --json
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --fix
python3 scripts/knowledge_staleness_check.py knowledge/index.json
python3 scripts/knowledge_staleness_check.py --repo /path/to/repo --index knowledge/index.json
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --fix
Expected index.json format:
{
"version": 1,
"facts": [
{
"fact": "...",
"category": "fact|pitfall|pattern|tool-quirk",
"repo": "repo-name",
"confidence": 0.8,
"source_file": "path/to/file.py",
"source_hash": "sha256:abcdef...",
"extracted_at": "2026-04-13T20:00:00Z"
}
]
}
"""
import argparse
import hashlib
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Any, Optional
from typing import Optional
def compute_file_hash(filepath: str) -> Optional[str]:
"""Compute SHA-256 hash of a file. Returns None if file doesn't exist."""
try:
with open(filepath, "rb") as f:
return "sha256:" + hashlib.sha256(f.read()).hexdigest()
except (FileNotFoundError, IsADirectoryError, PermissionError):
"""Compute SHA-256 hash of a file. Returns None if file not found."""
path = Path(filepath)
if not path.exists():
return None
content = path.read_bytes()
return hashlib.sha256(content).hexdigest()[:16]
def check_staleness(index_path: str, repo_root: str = ".") -> List[Dict[str, Any]]:
"""Check all entries in knowledge index for staleness.
Returns list of entries with staleness info:
- status: "fresh" | "stale" | "missing_source" | "no_hash"
- current_hash: computed hash (if source exists)
- stored_hash: hash from index
"""
with open(index_path) as f:
data = json.load(f)
def check_staleness(index_path: str, repo_root: str = None) -> dict:
"""Check all entries in the knowledge index for staleness."""
index = Path(index_path)
if not index.exists():
return {"error": f"Index not found: {index_path}"}
data = json.loads(index.read_text())
facts = data.get("facts", [])
results = []
for entry in facts:
if not facts:
return {
"total": 0,
"stale": 0,
"fresh": 0,
"no_source": 0,
"missing_files": 0,
"stale_entries": [],
}
# Determine repo root
if repo_root:
root = Path(repo_root)
else:
root = index.parent.parent # knowledge/index.json -> repo root
results = {
"total": len(facts),
"stale": 0,
"fresh": 0,
"no_source": 0,
"missing_files": 0,
"stale_entries": [],
}
for i, entry in enumerate(facts):
source_file = entry.get("source_file")
stored_hash = entry.get("source_hash")
if not source_file:
results.append({**entry, "status": "no_source", "current_hash": None})
results["no_source"] += 1
continue
full_path = os.path.join(repo_root, source_file)
current_hash = compute_file_hash(full_path)
if not stored_hash:
# Entry has source file but no hash — consider stale
results["stale"] += 1
results["stale_entries"].append({
"index": i,
"fact": entry.get("fact", "")[:100],
"source_file": source_file,
"reason": "no_hash",
})
continue
# Compute current hash
full_path = root / source_file
current_hash = compute_file_hash(str(full_path))
if current_hash is None:
results.append({**entry, "status": "missing_source", "current_hash": None})
elif not stored_hash:
results.append({**entry, "status": "no_hash", "current_hash": current_hash})
results["missing_files"] += 1
results["stale_entries"].append({
"index": i,
"fact": entry.get("fact", "")[:100],
"source_file": source_file,
"reason": "file_missing",
})
elif current_hash != stored_hash:
results.append({**entry, "status": "stale", "current_hash": current_hash})
results["stale"] += 1
results["stale_entries"].append({
"index": i,
"fact": entry.get("fact", "")[:100],
"source_file": source_file,
"stored_hash": stored_hash,
"current_hash": current_hash,
"reason": "hash_mismatch",
})
else:
results.append({**entry, "status": "fresh", "current_hash": current_hash})
results["fresh"] += 1
return results
def fix_hashes(index_path: str, repo_root: str = ".") -> int:
"""Add hashes to entries missing them. Returns count of fixed entries."""
with open(index_path) as f:
data = json.load(f)
def add_hashes_to_index(index_path: str, repo_root: str = None) -> dict:
"""Add source hashes to entries that are missing them."""
index = Path(index_path)
data = json.loads(index.read_text())
facts = data.get("facts", [])
fixed = 0
for entry in data.get("facts", []):
if entry.get("source_hash"):
continue
if repo_root:
root = Path(repo_root)
else:
root = index.parent.parent
updated = 0
skipped = 0
for entry in facts:
source_file = entry.get("source_file")
if not source_file:
if not source_file or entry.get("source_hash"):
skipped += 1
continue
full_path = os.path.join(repo_root, source_file)
h = compute_file_hash(full_path)
if h:
entry["source_hash"] = h
fixed += 1
with open(index_path, "w") as f:
json.dump(data, f, indent=2)
full_path = root / source_file
file_hash = compute_file_hash(str(full_path))
if file_hash:
entry["source_hash"] = file_hash
updated += 1
return fixed
if updated > 0:
index.write_text(json.dumps(data, indent=2) + "\n")
return {"updated": updated, "skipped": skipped, "total": len(facts)}
def report_staleness(results: dict) -> str:
"""Format staleness check results as a report."""
lines = []
lines.append("=" * 50)
lines.append("KNOWLEDGE STORE STALENESS REPORT")
lines.append("=" * 50)
lines.append(f"Total entries: {results['total']}")
lines.append(f"Fresh: {results['fresh']}")
lines.append(f"Stale: {results['stale']}")
lines.append(f"No source: {results['no_source']}")
lines.append(f"Missing files: {results['missing_files']}")
lines.append("")
if results["stale_entries"]:
lines.append("STALE ENTRIES:")
lines.append("-" * 50)
for entry in results["stale_entries"]:
lines.append(f" [{entry['reason']}] {entry['source_file']}")
lines.append(f" {entry['fact']}")
if entry.get("stored_hash") and entry.get("current_hash"):
lines.append(f" stored: {entry['stored_hash']}")
lines.append(f" current: {entry['current_hash']}")
lines.append("")
if results["total"] > 0:
staleness_pct = results["stale"] / results["total"] * 100
lines.append(f"Staleness rate: {staleness_pct:.1f}%")
else:
lines.append("No entries to check.")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Check knowledge store staleness")
parser.add_argument("--index", required=True, help="Path to knowledge/index.json")
parser.add_argument("--repo", default=".", help="Repo root for source file resolution")
parser.add_argument("--json", action="store_true", help="Output as JSON")
parser.add_argument("--fix", action="store_true", help="Add hashes to entries missing them")
parser = argparse.ArgumentParser(description="Check knowledge store for stale entries")
parser.add_argument("--index", default="knowledge/index.json", help="Path to index.json")
parser.add_argument("--repo", help="Repository root (default: auto-detect from index path)")
parser.add_argument("--fix", action="store_true", help="Add missing hashes to index")
parser.add_argument("--json", action="store_true", help="Output JSON instead of report")
args = parser.parse_args()
if args.fix:
fixed = fix_hashes(args.index, args.repo)
print(f"Fixed {fixed} entries with missing hashes.")
return
results = check_staleness(args.index, args.repo)
if args.json:
print(json.dumps(results, indent=2))
else:
stale = [r for r in results if r["status"] != "fresh"]
fresh = [r for r in results if r["status"] == "fresh"]
print(f"Knowledge Store Staleness Check")
print(f" Total entries: {len(results)}")
print(f" Fresh: {len(fresh)}")
print(f" Stale/Issues: {len(stale)}")
print()
if stale:
print("Issues found:")
for r in stale:
status = r["status"]
fact = r.get("fact", "?")[:60]
source = r.get("source_file", "?")
print(f" [{status}] {source}: {fact}")
result = add_hashes_to_index(args.index, args.repo)
if args.json:
print(json.dumps(result, indent=2))
else:
print("All entries are fresh!")
print(f"Updated {result['updated']} entries with source hashes.")
print(f"Skipped {result['skipped']} (already had hashes or no source file).")
else:
results = check_staleness(args.index, args.repo)
if "error" in results:
print(f"Error: {results['error']}", file=sys.stderr)
sys.exit(1)
if args.json:
print(json.dumps(results, indent=2))
else:
print(report_staleness(results))
if __name__ == "__main__":

View File

@@ -1,54 +0,0 @@
#!/usr/bin/env python3
"""
Finds refactoring opportunities in codebases
Engine ID: 10.4
Usage:
python3 scripts/refactoring_opportunity_finder.py --output proposals/refactoring_opportunity_finder.json
python3 scripts/refactoring_opportunity_finder.py --output proposals/refactoring_opportunity_finder.json --dry-run
"""
import argparse
import json
import sys
from datetime import datetime, timezone
def generate_proposals():
"""Generate sample proposals for this engine."""
# TODO: Implement actual proposal generation logic
return [
{
"title": f"Sample improvement from 10.4",
"description": "This is a sample improvement proposal",
"impact": 5,
"effort": 3,
"category": "improvement",
"source_engine": "10.4",
"timestamp": datetime.now(timezone.utc).isoformat()
}
]
def main():
parser = argparse.ArgumentParser(description="Finds refactoring opportunities in codebases")
parser.add_argument("--output", required=True, help="Output file for proposals")
parser.add_argument("--dry-run", action="store_true", help="Don't write output file")
args = parser.parse_args()
proposals = generate_proposals()
if not args.dry_run:
with open(args.output, "w") as f:
json.dump({"proposals": proposals}, f, indent=2)
print(f"Generated {len(proposals)} proposals -> {args.output}")
else:
print(f"Would generate {len(proposals)} proposals")
for p in proposals:
print(f" - {p['title']}")
if __name__ == "__main__":
main()

View File

@@ -1,129 +0,0 @@
#!/usr/bin/env python3
"""Tests for scripts/knowledge_staleness_check.py — 8 tests."""
import json
import os
import sys
import tempfile
sys.path.insert(0, os.path.dirname(__file__) or ".")
import importlib.util
spec = importlib.util.spec_from_file_location("ks", os.path.join(os.path.dirname(__file__) or ".", "knowledge_staleness_check.py"))
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
check_staleness = mod.check_staleness
fix_hashes = mod.fix_hashes
compute_file_hash = mod.compute_file_hash
def test_fresh_entry():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("print('hello')")
h = compute_file_hash(src)
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "hello", "source_file": "source.py", "source_hash": h}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "fresh"
print("PASS: test_fresh_entry")
def test_stale_entry():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("original content")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "old", "source_file": "source.py", "source_hash": "sha256:wrong"}]}, f)
# Now change the source
with open(src, "w") as f:
f.write("modified content")
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "stale"
print("PASS: test_stale_entry")
def test_missing_source():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "gone", "source_file": "nonexistent.py", "source_hash": "sha256:abc"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "missing_source"
print("PASS: test_missing_source")
def test_no_hash():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("content")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "no hash", "source_file": "source.py"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "no_hash"
assert results[0]["current_hash"].startswith("sha256:")
print("PASS: test_no_hash")
def test_no_source_field():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "orphan"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "no_source"
print("PASS: test_no_source_field")
def test_fix_hashes():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("content for hashing")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "needs hash", "source_file": "source.py"}]}, f)
fixed = fix_hashes(idx, tmpdir)
assert fixed == 1
# Verify hash was added
with open(idx) as f:
data = json.load(f)
assert data["facts"][0]["source_hash"].startswith("sha256:")
print("PASS: test_fix_hashes")
def test_empty_index():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": []}, f)
results = check_staleness(idx, tmpdir)
assert results == []
print("PASS: test_empty_index")
def test_compute_hash_nonexistent():
h = compute_file_hash("/nonexistent/path/file.py")
assert h is None
print("PASS: test_compute_hash_nonexistent")
def run_all():
test_fresh_entry()
test_stale_entry()
test_missing_source()
test_no_hash()
test_no_source_field()
test_fix_hashes()
test_empty_index()
test_compute_hash_nonexistent()
print("\nAll 8 tests passed!")
if __name__ == "__main__":
run_all()

View File

@@ -1,242 +0,0 @@
#!/usr/bin/env python3
"""Tests for scripts/refactoring_opportunity_finder.py — 10 tests."""
import json
import os
import sys
import tempfile
sys.path.insert(0, os.path.dirname(__file__) or ".")
import importlib.util
spec = importlib.util.spec_from_file_location(
"rof", os.path.join(os.path.dirname(__file__) or ".", "refactoring_opportunity_finder.py"))
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
compute_file_complexity = mod.compute_file_complexity
calculate_refactoring_score = mod.calculate_refactoring_score
FileMetrics = mod.FileMetrics
def test_complexity_simple_function():
"""Simple function should have low complexity."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("""
def simple():
return 42
""")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
assert avg == 1.0, f"Expected 1.0, got {avg}"
assert max_c == 1, f"Expected 1, got {max_c}"
assert funcs == 1, f"Expected 1, got {funcs}"
assert classes == 0, f"Expected 0, got {classes}"
os.unlink(f.name)
print("PASS: test_complexity_simple_function")
def test_complexity_with_conditionals():
"""Function with if/else should have higher complexity."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("""
def complex_func(x):
if x > 0:
if x > 10:
return "big"
else:
return "small"
elif x < 0:
return "negative"
else:
return "zero"
""")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
# Base 1 + 3 if/elif + 1 nested if = 5
assert max_c >= 4, f"Expected max_c >= 4, got {max_c}"
assert funcs == 1, f"Expected 1, got {funcs}"
os.unlink(f.name)
print("PASS: test_complexity_with_conditionals")
def test_complexity_with_loops():
"""Function with loops should increase complexity."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("""
def loop_func(items):
result = []
for item in items:
if item > 0:
result.append(item)
while len(result) > 10:
result.pop()
return result
""")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
# Base 1 + 1 for + 1 if + 1 while = 4
assert max_c >= 3, f"Expected max_c >= 3, got {max_c}"
os.unlink(f.name)
print("PASS: test_complexity_with_loops")
def test_complexity_with_class():
"""Class with methods should count both."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("""
class MyClass:
def method1(self):
if True:
pass
def method2(self):
for i in range(10):
pass
""")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
assert classes == 1, f"Expected 1 class, got {classes}"
assert funcs == 2, f"Expected 2 functions, got {funcs}"
os.unlink(f.name)
print("PASS: test_complexity_with_class")
def test_complexity_syntax_error():
"""File with syntax error should return zeros."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("def broken(:\n pass")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
assert avg == 0.0, f"Expected 0.0, got {avg}"
assert funcs == 0, f"Expected 0, got {funcs}"
os.unlink(f.name)
print("PASS: test_complexity_syntax_error")
def test_refactoring_score_high_complexity():
"""High complexity should give high score."""
metrics = FileMetrics(
path="test.py",
lines=200,
complexity=15.0,
max_complexity=25,
functions=10,
classes=2,
churn_30d=5,
churn_90d=15,
test_coverage=0.3,
refactoring_score=0.0
)
score = calculate_refactoring_score(metrics)
assert score > 50, f"Expected score > 50, got {score}"
print("PASS: test_refactoring_score_high_complexity")
def test_refactoring_score_low_complexity():
"""Low complexity should give lower score."""
metrics = FileMetrics(
path="test.py",
lines=50,
complexity=2.0,
max_complexity=3,
functions=3,
classes=0,
churn_30d=0,
churn_90d=1,
test_coverage=0.9,
refactoring_score=0.0
)
score = calculate_refactoring_score(metrics)
assert score < 30, f"Expected score < 30, got {score}"
print("PASS: test_refactoring_score_low_complexity")
def test_refactoring_score_high_churn():
"""High churn should increase score."""
metrics = FileMetrics(
path="test.py",
lines=100,
complexity=5.0,
max_complexity=8,
functions=5,
classes=0,
churn_30d=10,
churn_90d=20,
test_coverage=0.5,
refactoring_score=0.0
)
score = calculate_refactoring_score(metrics)
# Churn should contribute significantly
assert score > 40, f"Expected score > 40 for high churn, got {score}"
print("PASS: test_refactoring_score_high_churn")
def test_refactoring_score_no_coverage():
"""No coverage data should assume medium risk."""
metrics = FileMetrics(
path="test.py",
lines=100,
complexity=5.0,
max_complexity=8,
functions=5,
classes=0,
churn_30d=1,
churn_90d=2,
test_coverage=None,
refactoring_score=0.0
)
score = calculate_refactoring_score(metrics)
# Should have some score from the 5-point coverage component
assert score > 0, f"Expected positive score, got {score}"
print("PASS: test_refactoring_score_no_coverage")
def test_refactoring_score_large_file():
"""Large files should score higher."""
metrics_small = FileMetrics(
path="small.py",
lines=50,
complexity=5.0,
max_complexity=8,
functions=3,
classes=0,
churn_30d=1,
churn_90d=2,
test_coverage=0.8,
refactoring_score=0.0
)
metrics_large = FileMetrics(
path="large.py",
lines=1000,
complexity=5.0,
max_complexity=8,
functions=3,
classes=0,
churn_30d=1,
churn_90d=2,
test_coverage=0.8,
refactoring_score=0.0
)
score_small = calculate_refactoring_score(metrics_small)
score_large = calculate_refactoring_score(metrics_large)
assert score_large > score_small, \
f"Large file ({score_large}) should score higher than small ({score_small})"
print("PASS: test_refactoring_score_large_file")
def run_all():
test_complexity_simple_function()
test_complexity_with_conditionals()
test_complexity_with_loops()
test_complexity_with_class()
test_complexity_syntax_error()
test_refactoring_score_high_complexity()
test_refactoring_score_low_complexity()
test_refactoring_score_high_churn()
test_refactoring_score_no_coverage()
test_refactoring_score_large_file()
print("\nAll 10 tests passed!")
if __name__ == "__main__":
run_all()