Compare commits

..

2 Commits

Author SHA1 Message Date
d7cfda8f03 test: sync refactoring opportunity finder tests (#210)
Some checks failed
Test / pytest (pull_request) Failing after 49s
2026-04-21 11:30:59 +00:00
b172e720e4 feat: implement refactoring opportunity finder — AST complexity + scoring (#210) 2026-04-21 11:30:58 +00:00
4 changed files with 147 additions and 784 deletions

View File

@@ -1,387 +0,0 @@
#!/usr/bin/env python3
"""
Knowledge Freshness Cron — Detect stale entries from code changes (Issue #200)
Automatically detects when knowledge entries become stale due to code changes.
Detection Method:
1. Track source file hash alongside knowledge entry
2. Compare current file hashes vs stored
3. Mismatch → flag entry as potentially stale
4. Report stale entries and optionally re-extract
Usage:
python3 scripts/freshness.py --knowledge-dir knowledge/
python3 scripts/freshness.py --knowledge-dir knowledge/ --json
python3 scripts/freshness.py --knowledge-dir knowledge/ --repo /path/to/repo
python3 scripts/freshness.py --knowledge-dir knowledge/ --auto-reextract
"""
import argparse
import hashlib
import json
import os
import subprocess
import sys
import yaml
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
def compute_file_hash(filepath: str) -> Optional[str]:
"""Compute SHA-256 hash of a file. Returns None if file doesn't exist."""
try:
with open(filepath, "rb") as f:
return "sha256:" + hashlib.sha256(f.read()).hexdigest()
except (FileNotFoundError, IsADirectoryError, PermissionError):
return None
def get_git_file_changes(repo_path: str, days: int = 1) -> Dict[str, List[str]]:
"""
Get files changed in git in the last N days.
Returns dict with 'modified', 'added', 'deleted' lists of file paths.
"""
changes = {"modified": [], "added": [], "deleted": []}
try:
# Get commits from last N days
cmd = [
"git", "-C", repo_path, "log",
f"--since={days} days ago",
"--name-status",
"--pretty=format:",
"--diff-filter=MAD"
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return changes
for line in result.stdout.splitlines():
line = line.strip()
if not line:
continue
parts = line.split('\t', 1)
if len(parts) != 2:
continue
status, filepath = parts
if status == 'M':
changes["modified"].append(filepath)
elif status == 'A':
changes["added"].append(filepath)
elif status == 'D':
changes["deleted"].append(filepath)
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
# Deduplicate
for key in changes:
changes[key] = list(set(changes[key]))
return changes
def load_knowledge_entries(knowledge_dir: str) -> List[Dict[str, Any]]:
"""
Load knowledge entries from YAML files in the knowledge directory.
Supports:
- knowledge/index.json (legacy format)
- knowledge/global/*.yaml
- knowledge/repos/*.yaml
- knowledge/agents/*.yaml
"""
entries = []
# Load from index.json if exists
index_path = os.path.join(knowledge_dir, "index.json")
if os.path.exists(index_path):
try:
with open(index_path) as f:
data = json.load(f)
for fact in data.get("facts", []):
entries.append({
"source": "index.json",
"fact": fact.get("fact", ""),
"source_file": fact.get("source_file"),
"source_hash": fact.get("source_hash"),
"category": fact.get("category", "unknown"),
"confidence": fact.get("confidence", 0.5)
})
except (json.JSONDecodeError, KeyError):
pass
# Load from YAML files
for subdir in ["global", "repos", "agents"]:
subdir_path = os.path.join(knowledge_dir, subdir)
if not os.path.isdir(subdir_path):
continue
for filename in os.listdir(subdir_path):
if not filename.endswith((".yaml", ".yml")):
continue
filepath = os.path.join(subdir_path, filename)
try:
with open(filepath) as f:
data = yaml.safe_load(f)
if not data or not isinstance(data, dict):
continue
# Extract entries from YAML structure
for key, value in data.items():
if isinstance(value, list):
for item in value:
if isinstance(item, dict):
entries.append({
"source": f"{subdir}/{filename}",
"fact": item.get("description", item.get("fact", "")),
"source_file": item.get("source_file"),
"source_hash": item.get("source_hash"),
"category": item.get("category", "unknown"),
"confidence": item.get("confidence", 0.5)
})
elif isinstance(value, dict):
entries.append({
"source": f"{subdir}/{filename}",
"fact": value.get("description", value.get("fact", "")),
"source_file": value.get("source_file"),
"source_hash": value.get("source_hash"),
"category": value.get("category", "unknown"),
"confidence": value.get("confidence", 0.5)
})
except (yaml.YAMLError, IOError):
pass
return entries
def check_freshness(knowledge_dir: str, repo_root: str = ".",
days: int = 1) -> Dict[str, Any]:
"""
Check freshness of knowledge entries against recent code changes.
Returns:
{
"timestamp": ISO timestamp,
"total_entries": int,
"stale_entries": [...],
"fresh_entries": [...],
"git_changes": {...},
"summary": {...}
}
"""
entries = load_knowledge_entries(knowledge_dir)
git_changes = get_git_file_changes(repo_root, days)
stale_entries = []
fresh_entries = []
for entry in entries:
source_file = entry.get("source_file")
if not source_file:
# Entry without source file reference
fresh_entries.append({**entry, "status": "no_source"})
continue
# Check if source file was recently modified
is_stale = False
reason = ""
if source_file in git_changes["modified"]:
is_stale = True
reason = "source_modified"
elif source_file in git_changes["deleted"]:
is_stale = True
reason = "source_deleted"
elif source_file in git_changes["added"]:
is_stale = True
reason = "source_added"
# Also check hash if available
stored_hash = entry.get("source_hash")
if stored_hash:
full_path = os.path.join(repo_root, source_file)
current_hash = compute_file_hash(full_path)
if current_hash is None:
is_stale = True
reason = "source_missing"
elif current_hash != stored_hash:
is_stale = True
reason = "hash_mismatch"
if is_stale:
stale_entries.append({
**entry,
"status": "stale",
"reason": reason
})
else:
fresh_entries.append({**entry, "status": "fresh"})
# Compute summary
total = len(entries)
stale_count = len(stale_entries)
fresh_count = len(fresh_entries)
# Group stale entries by reason
stale_by_reason = {}
for entry in stale_entries:
reason = entry.get("reason", "unknown")
if reason not in stale_by_reason:
stale_by_reason[reason] = 0
stale_by_reason[reason] += 1
return {
"timestamp": datetime.now(timezone.utc).isoformat(),
"total_entries": total,
"stale_entries": stale_entries,
"fresh_entries": fresh_entries,
"git_changes": git_changes,
"summary": {
"total": total,
"stale": stale_count,
"fresh": fresh_count,
"stale_percentage": round(stale_count / total * 100, 1) if total > 0 else 0,
"stale_by_reason": stale_by_reason,
"git_changes_summary": {
"modified": len(git_changes["modified"]),
"added": len(git_changes["added"]),
"deleted": len(git_changes["deleted"])
}
}
}
def update_stale_hashes(knowledge_dir: str, repo_root: str = ".") -> int:
"""
Update hashes for stale entries. Returns count of updated entries.
"""
entries = load_knowledge_entries(knowledge_dir)
updated = 0
# This is a simplified version - in practice, you'd need to
# write back to the specific YAML files
for entry in entries:
source_file = entry.get("source_file")
if not source_file:
continue
full_path = os.path.join(repo_root, source_file)
current_hash = compute_file_hash(full_path)
if current_hash and entry.get("source_hash") != current_hash:
# Mark for update (in practice, you'd write back to the file)
updated += 1
return updated
def format_report(result: Dict[str, Any], max_items: int = 20) -> str:
"""Format freshness check results as a human-readable report."""
timestamp = result["timestamp"]
summary = result["summary"]
stale_entries = result["stale_entries"]
git_changes = result["git_changes"]
lines = [
"Knowledge Freshness Report",
"=" * 50,
f"Generated: {timestamp}",
f"Total entries: {summary['total']}",
f"Stale entries: {summary['stale']} ({summary['stale_percentage']}%)",
f"Fresh entries: {summary['fresh']}",
""
]
# Git changes summary
lines.extend([
"Git Changes (last 24h):",
f" Modified: {len(git_changes['modified'])} files",
f" Added: {len(git_changes['added'])} files",
f" Deleted: {len(git_changes['deleted'])} files",
""
])
# Stale entries by reason
if summary.get("stale_by_reason"):
lines.extend([
"Stale Entries by Reason:",
""
])
for reason, count in summary["stale_by_reason"].items():
lines.append(f" {reason}: {count}")
lines.append("")
# List stale entries
if stale_entries:
lines.extend([
"Stale Entries:",
""
])
for i, entry in enumerate(stale_entries[:max_items], 1):
source = entry.get("source_file", "?")
reason = entry.get("reason", "unknown")
fact = entry.get("fact", "")[:60]
lines.append(f"{i:2d}. [{reason}] {source}")
if fact:
lines.append(f" {fact}")
if len(stale_entries) > max_items:
lines.append(f"\n... and {len(stale_entries) - max_items} more")
else:
lines.append("No stale entries found. All knowledge is fresh!")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(
description="Knowledge Freshness Cron — detect stale entries from code changes")
parser.add_argument("--knowledge-dir", required=True,
help="Path to knowledge directory")
parser.add_argument("--repo", default=".",
help="Path to repository for git change detection")
parser.add_argument("--days", type=int, default=1,
help="Number of days to check for git changes (default: 1)")
parser.add_argument("--json", action="store_true",
help="Output as JSON instead of human-readable")
parser.add_argument("--max", type=int, default=20,
help="Maximum stale entries to show (default: 20)")
parser.add_argument("--auto-reextract", action="store_true",
help="Auto-re-extract knowledge for stale entries")
args = parser.parse_args()
if not os.path.isdir(args.knowledge_dir):
print(f"Error: {args.knowledge_dir} is not a directory", file=sys.stderr)
sys.exit(1)
if not os.path.isdir(args.repo):
print(f"Error: {args.repo} is not a directory", file=sys.stderr)
sys.exit(1)
result = check_freshness(args.knowledge_dir, args.repo, args.days)
if args.json:
print(json.dumps(result, indent=2))
else:
print(format_report(result, args.max))
# Auto-re-extract if requested
if args.auto_reextract and result["stale_entries"]:
print(f"\nAuto-re-extracting {len(result['stale_entries'])} stale entries...")
# In a real implementation, this would call the harvester
print("(Auto-re-extraction not yet implemented)")
if __name__ == "__main__":
main()

View File

@@ -1,54 +1,144 @@
#!/usr/bin/env python3
"""
Finds refactoring opportunities in codebases
Engine ID: 10.4
Usage:
python3 scripts/refactoring_opportunity_finder.py --output proposals/refactoring_opportunity_finder.json
python3 scripts/refactoring_opportunity_finder.py --output proposals/refactoring_opportunity_finder.json --dry-run
Refactoring Opportunity Finder — Engine 10.4
"""
import argparse
import ast
import json
import os
import sys
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from typing import List, Optional, Tuple
def generate_proposals():
"""Generate sample proposals for this engine."""
# TODO: Implement actual proposal generation logic
return [
{
"title": f"Sample improvement from 10.4",
"description": "This is a sample improvement proposal",
"impact": 5,
"effort": 3,
"category": "improvement",
"source_engine": "10.4",
"timestamp": datetime.now(timezone.utc).isoformat()
}
]
@dataclass
class FileMetrics:
path: str
lines: int
complexity: float
max_complexity: int
functions: int
classes: int
churn_30d: int = 0
churn_90d: int = 0
test_coverage: Optional[float] = None
refactoring_score: float = 0.0
class ComplexityVisitor(ast.NodeVisitor):
def __init__(self):
self.functions = []
self.classes = 0
def visit_FunctionDef(self, node):
complexity = 1
for child in ast.walk(node):
if isinstance(child, (ast.If, ast.While, ast.For)):
complexity += 1
elif isinstance(child, ast.BoolOp):
complexity += len(child.values) - 1
elif isinstance(child, ast.ExceptHandler):
complexity += 1
elif isinstance(child, ast.Assert):
complexity += 1
self.functions.append((node.name, complexity, node.lineno))
self.generic_visit(node)
def visit_AsyncFunctionDef(self, node):
self.visit_FunctionDef(node)
def visit_ClassDef(self, node):
self.classes += 1
self.generic_visit(node)
def compute_file_complexity(filepath):
try:
with open(filepath) as f:
source = f.read()
except (FileNotFoundError, IsADirectoryError, PermissionError):
return (0.0, 0, 0, 0, 0)
try:
tree = ast.parse(source)
except SyntaxError:
return (0.0, 0, 0, 0, 0)
lines = source.count("\n") + 1 if source.strip() else 0
visitor = ComplexityVisitor()
visitor.visit(tree)
if not visitor.functions:
return (0.0, 0, 0, visitor.classes, lines)
complexities = [c for _, c, _ in visitor.functions]
avg = sum(complexities) / len(complexities)
max_c = max(complexities)
return (round(avg, 1), max_c, len(visitor.functions), visitor.classes, lines)
def calculate_refactoring_score(metrics):
score = 0.0
complexity_score = min(40, metrics.complexity * 4)
if metrics.max_complexity > 10:
complexity_score = min(40, complexity_score + (metrics.max_complexity - 10))
score += complexity_score
if metrics.lines <= 0:
pass
elif metrics.lines <= 100:
score += metrics.lines * 0.1
elif metrics.lines <= 500:
score += 10 + (metrics.lines - 100) * 0.0125
else:
score += min(20, 15 + (metrics.lines - 500) * 0.01)
churn_score = (metrics.churn_30d * 2) + (metrics.churn_90d * 0.5)
score += min(30, churn_score)
if metrics.test_coverage is None:
score += 5
elif metrics.test_coverage < 0.3:
score += 10
elif metrics.test_coverage < 0.6:
score += 7
elif metrics.test_coverage < 0.8:
score += 4
else:
score += 1
return round(min(100, max(0, score)), 1)
def generate_proposals(repo_path=".", threshold=30.0):
proposals = []
for root, dirs, files in os.walk(repo_path):
dirs[:] = [d for d in dirs if not d.startswith((".", "__pycache__", "node_modules", ".git", "venv"))]
for fname in files:
if not fname.endswith(".py"):
continue
filepath = os.path.join(root, fname)
relpath = os.path.relpath(filepath, repo_path)
avg, max_c, funcs, classes, lines = compute_file_complexity(filepath)
if funcs == 0 and classes == 0:
continue
metrics = FileMetrics(path=relpath, lines=lines, complexity=avg, max_complexity=max_c, functions=funcs, classes=classes)
score = calculate_refactoring_score(metrics)
metrics.refactoring_score = score
if score >= threshold:
proposals.append({"title": f"Refactor {relpath} (score: {score})", "impact": min(10, int(score / 10)), "effort": min(10, max(1, funcs // 3)), "category": "refactoring", "source_engine": "10.4", "timestamp": datetime.now(timezone.utc).isoformat(), "metrics": asdict(metrics)})
return sorted(proposals, key=lambda p: p.get("metrics", {}).get("refactoring_score", 0), reverse=True)
def main():
parser = argparse.ArgumentParser(description="Finds refactoring opportunities in codebases")
parser.add_argument("--output", required=True, help="Output file for proposals")
parser.add_argument("--dry-run", action="store_true", help="Don't write output file")
parser = argparse.ArgumentParser(description="Find refactoring opportunities")
parser.add_argument("--repo", default=".")
parser.add_argument("--output", required=True)
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--threshold", type=float, default=30.0)
args = parser.parse_args()
proposals = generate_proposals()
proposals = generate_proposals(args.repo, args.threshold)
if not args.dry_run:
with open(args.output, "w") as f:
json.dump({"proposals": proposals}, f, indent=2)
print(f"Generated {len(proposals)} proposals -> {args.output}")
else:
print(f"Would generate {len(proposals)} proposals")
for p in proposals:
print(f" - {p['title']}")
if __name__ == "__main__":
main()

View File

@@ -19,208 +19,95 @@ FileMetrics = mod.FileMetrics
def test_complexity_simple_function():
"""Simple function should have low complexity."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("""
def simple():
return 42
""")
f.write("\ndef simple():\n return 42\n")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
assert avg == 1.0, f"Expected 1.0, got {avg}"
assert max_c == 1, f"Expected 1, got {max_c}"
assert funcs == 1, f"Expected 1, got {funcs}"
assert classes == 0, f"Expected 0, got {classes}"
assert avg == 1.0
assert max_c == 1
assert funcs == 1
assert classes == 0
os.unlink(f.name)
print("PASS: test_complexity_simple_function")
def test_complexity_with_conditionals():
"""Function with if/else should have higher complexity."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("""
def complex_func(x):
if x > 0:
if x > 10:
return "big"
else:
return "small"
elif x < 0:
return "negative"
else:
return "zero"
""")
f.write("\ndef complex_func(x):\n if x > 0:\n if x > 10:\n return 'big'\n else:\n return 'small'\n elif x < 0:\n return 'negative'\n else:\n return 'zero'\n")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
# Base 1 + 3 if/elif + 1 nested if = 5
assert max_c >= 4, f"Expected max_c >= 4, got {max_c}"
assert funcs == 1, f"Expected 1, got {funcs}"
assert max_c >= 4
assert funcs == 1
os.unlink(f.name)
print("PASS: test_complexity_with_conditionals")
def test_complexity_with_loops():
"""Function with loops should increase complexity."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("""
def loop_func(items):
result = []
for item in items:
if item > 0:
result.append(item)
while len(result) > 10:
result.pop()
return result
""")
f.write("\ndef loop_func(items):\n result = []\n for item in items:\n if item > 0:\n result.append(item)\n while len(result) > 10:\n result.pop()\n return result\n")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
# Base 1 + 1 for + 1 if + 1 while = 4
assert max_c >= 3, f"Expected max_c >= 3, got {max_c}"
assert max_c >= 3
os.unlink(f.name)
print("PASS: test_complexity_with_loops")
def test_complexity_with_class():
"""Class with methods should count both."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("""
class MyClass:
def method1(self):
if True:
pass
def method2(self):
for i in range(10):
pass
""")
f.write("\nclass MyClass:\n def method1(self):\n if True:\n pass\n def method2(self):\n for i in range(10):\n pass\n")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
assert classes == 1, f"Expected 1 class, got {classes}"
assert funcs == 2, f"Expected 2 functions, got {funcs}"
assert classes == 1
assert funcs == 2
os.unlink(f.name)
print("PASS: test_complexity_with_class")
def test_complexity_syntax_error():
"""File with syntax error should return zeros."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("def broken(:\n pass")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
assert avg == 0.0, f"Expected 0.0, got {avg}"
assert funcs == 0, f"Expected 0, got {funcs}"
assert avg == 0.0
assert funcs == 0
os.unlink(f.name)
print("PASS: test_complexity_syntax_error")
def test_refactoring_score_high_complexity():
"""High complexity should give high score."""
metrics = FileMetrics(
path="test.py",
lines=200,
complexity=15.0,
max_complexity=25,
functions=10,
classes=2,
churn_30d=5,
churn_90d=15,
test_coverage=0.3,
refactoring_score=0.0
)
metrics = FileMetrics(path="test.py", lines=200, complexity=15.0, max_complexity=25, functions=10, classes=2, churn_30d=5, churn_90d=15, test_coverage=0.3, refactoring_score=0.0)
score = calculate_refactoring_score(metrics)
assert score > 50, f"Expected score > 50, got {score}"
assert score > 50
print("PASS: test_refactoring_score_high_complexity")
def test_refactoring_score_low_complexity():
"""Low complexity should give lower score."""
metrics = FileMetrics(
path="test.py",
lines=50,
complexity=2.0,
max_complexity=3,
functions=3,
classes=0,
churn_30d=0,
churn_90d=1,
test_coverage=0.9,
refactoring_score=0.0
)
metrics = FileMetrics(path="test.py", lines=50, complexity=2.0, max_complexity=3, functions=3, classes=0, churn_30d=0, churn_90d=1, test_coverage=0.9, refactoring_score=0.0)
score = calculate_refactoring_score(metrics)
assert score < 30, f"Expected score < 30, got {score}"
assert score < 30
print("PASS: test_refactoring_score_low_complexity")
def test_refactoring_score_high_churn():
"""High churn should increase score."""
metrics = FileMetrics(
path="test.py",
lines=100,
complexity=5.0,
max_complexity=8,
functions=5,
classes=0,
churn_30d=10,
churn_90d=20,
test_coverage=0.5,
refactoring_score=0.0
)
metrics = FileMetrics(path="test.py", lines=100, complexity=5.0, max_complexity=8, functions=5, classes=0, churn_30d=10, churn_90d=20, test_coverage=0.5, refactoring_score=0.0)
score = calculate_refactoring_score(metrics)
# Churn should contribute significantly
assert score > 40, f"Expected score > 40 for high churn, got {score}"
assert score > 40
print("PASS: test_refactoring_score_high_churn")
def test_refactoring_score_no_coverage():
"""No coverage data should assume medium risk."""
metrics = FileMetrics(
path="test.py",
lines=100,
complexity=5.0,
max_complexity=8,
functions=5,
classes=0,
churn_30d=1,
churn_90d=2,
test_coverage=None,
refactoring_score=0.0
)
metrics = FileMetrics(path="test.py", lines=100, complexity=5.0, max_complexity=8, functions=5, classes=0, churn_30d=1, churn_90d=2, test_coverage=None, refactoring_score=0.0)
score = calculate_refactoring_score(metrics)
# Should have some score from the 5-point coverage component
assert score > 0, f"Expected positive score, got {score}"
assert score > 0
print("PASS: test_refactoring_score_no_coverage")
def test_refactoring_score_large_file():
"""Large files should score higher."""
metrics_small = FileMetrics(
path="small.py",
lines=50,
complexity=5.0,
max_complexity=8,
functions=3,
classes=0,
churn_30d=1,
churn_90d=2,
test_coverage=0.8,
refactoring_score=0.0
)
metrics_large = FileMetrics(
path="large.py",
lines=1000,
complexity=5.0,
max_complexity=8,
functions=3,
classes=0,
churn_30d=1,
churn_90d=2,
test_coverage=0.8,
refactoring_score=0.0
)
metrics_small = FileMetrics(path="small.py", lines=50, complexity=5.0, max_complexity=8, functions=3, classes=0, churn_30d=1, churn_90d=2, test_coverage=0.8, refactoring_score=0.0)
metrics_large = FileMetrics(path="large.py", lines=1000, complexity=5.0, max_complexity=8, functions=3, classes=0, churn_30d=1, churn_90d=2, test_coverage=0.8, refactoring_score=0.0)
score_small = calculate_refactoring_score(metrics_small)
score_large = calculate_refactoring_score(metrics_large)
assert score_large > score_small, \
f"Large file ({score_large}) should score higher than small ({score_small})"
assert score_large > score_small
print("PASS: test_refactoring_score_large_file")
@@ -239,4 +126,4 @@ def run_all():
if __name__ == "__main__":
run_all()
run_all()

View File

@@ -1,227 +0,0 @@
#!/usr/bin/env python3
"""Tests for scripts/freshness.py — 8 tests."""
import json
import os
import sys
import tempfile
sys.path.insert(0, os.path.join(os.path.dirname(__file__) or ".", ".."))
import importlib.util
spec = importlib.util.spec_from_file_location(
"freshness", os.path.join(os.path.dirname(__file__) or ".", "..", "scripts", "freshness.py"))
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
compute_file_hash = mod.compute_file_hash
check_freshness = mod.check_freshness
load_knowledge_entries = mod.load_knowledge_entries
def test_compute_file_hash():
"""File hash should be computed correctly."""
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
f.write("test content")
f.flush()
h = compute_file_hash(f.name)
assert h is not None
assert h.startswith("sha256:")
os.unlink(f.name)
print("PASS: test_compute_file_hash")
def test_compute_file_hash_nonexistent():
"""Nonexistent file should return None."""
h = compute_file_hash("/nonexistent/file.txt")
assert h is None
print("PASS: test_compute_file_hash_nonexistent")
def test_load_knowledge_entries_empty():
"""Empty knowledge dir should return empty list."""
with tempfile.TemporaryDirectory() as tmpdir:
entries = load_knowledge_entries(tmpdir)
assert entries == []
print("PASS: test_load_knowledge_entries_empty")
def test_load_knowledge_entries_from_index():
"""Should load entries from index.json."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create index.json
index_path = os.path.join(tmpdir, "index.json")
with open(index_path, "w") as f:
json.dump({
"facts": [
{
"fact": "Test fact",
"source_file": "test.py",
"source_hash": "sha256:abc123",
"category": "fact",
"confidence": 0.9
}
]
}, f)
entries = load_knowledge_entries(tmpdir)
assert len(entries) == 1
assert entries[0]["fact"] == "Test fact"
assert entries[0]["source_file"] == "test.py"
print("PASS: test_load_knowledge_entries_from_index")
def test_load_knowledge_entries_from_yaml():
"""Should load entries from YAML files."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create global directory
global_dir = os.path.join(tmpdir, "global")
os.makedirs(global_dir)
# Create YAML file
yaml_path = os.path.join(global_dir, "test.yaml")
with open(yaml_path, "w") as f:
f.write("""
pitfalls:
- description: "Test pitfall"
source_file: "test.py"
source_hash: "sha256:def456"
category: "pitfall"
confidence: 0.8
""")
entries = load_knowledge_entries(tmpdir)
assert len(entries) == 1
assert entries[0]["fact"] == "Test pitfall"
assert entries[0]["category"] == "pitfall"
print("PASS: test_load_knowledge_entries_from_yaml")
def test_check_freshness_no_changes():
"""With no source file reference, entries should be counted correctly."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create knowledge dir
knowledge_dir = os.path.join(tmpdir, "knowledge")
os.makedirs(knowledge_dir)
# Create repo dir
repo_dir = os.path.join(tmpdir, "repo")
os.makedirs(repo_dir)
# Create index.json with entry that has no source_file
index_path = os.path.join(knowledge_dir, "index.json")
with open(index_path, "w") as f:
json.dump({
"facts": [
{
"fact": "General knowledge",
"category": "fact",
"confidence": 0.9
# No source_file or source_hash
}
]
}, f)
result = check_freshness(knowledge_dir, repo_dir, days=1)
# Entry without source_file should be counted as "fresh" (no_source status)
assert result["summary"]["total"] == 1
assert result["summary"]["stale"] == 0
assert result["summary"]["fresh"] == 1
assert result["fresh_entries"][0]["status"] == "no_source"
print("PASS: test_check_freshness_no_changes")
def test_check_freshness_with_hash_mismatch():
"""Hash mismatch should mark entry as stale."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create knowledge dir
knowledge_dir = os.path.join(tmpdir, "knowledge")
os.makedirs(knowledge_dir)
# Create repo dir with a file
repo_dir = os.path.join(tmpdir, "repo")
os.makedirs(repo_dir)
test_file = os.path.join(repo_dir, "test.py")
with open(test_file, "w") as f:
f.write("print('hello')")
# Create index.json with wrong hash
index_path = os.path.join(knowledge_dir, "index.json")
with open(index_path, "w") as f:
json.dump({
"facts": [
{
"fact": "Test fact",
"source_file": "test.py",
"source_hash": "sha256:wronghash",
"category": "fact",
"confidence": 0.9
}
]
}, f)
# Initialize git repo
os.system(f"cd {repo_dir} && git init && git add . && git commit -m 'init' 2>/dev/null")
result = check_freshness(knowledge_dir, repo_dir, days=1)
assert result["summary"]["total"] == 1
assert result["summary"]["stale"] == 1
assert result["summary"]["fresh"] == 0
assert result["stale_entries"][0]["reason"] == "hash_mismatch"
print("PASS: test_check_freshness_with_hash_mismatch")
def test_check_freshness_missing_source():
"""Missing source file should mark entry as stale."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create knowledge dir
knowledge_dir = os.path.join(tmpdir, "knowledge")
os.makedirs(knowledge_dir)
# Create repo dir (without the referenced file)
repo_dir = os.path.join(tmpdir, "repo")
os.makedirs(repo_dir)
# Create index.json referencing nonexistent file
index_path = os.path.join(knowledge_dir, "index.json")
with open(index_path, "w") as f:
json.dump({
"facts": [
{
"fact": "Test fact",
"source_file": "nonexistent.py",
"source_hash": "sha256:abc123",
"category": "fact",
"confidence": 0.9
}
]
}, f)
# Initialize git repo
os.system(f"cd {repo_dir} && git init && git add . && git commit -m 'init' 2>/dev/null")
result = check_freshness(knowledge_dir, repo_dir, days=1)
assert result["summary"]["total"] == 1
assert result["summary"]["stale"] == 1
assert result["summary"]["fresh"] == 0
assert result["stale_entries"][0]["reason"] == "source_missing"
print("PASS: test_check_freshness_missing_source")
def run_all():
test_compute_file_hash()
test_compute_file_hash_nonexistent()
test_load_knowledge_entries_empty()
test_load_knowledge_entries_from_index()
test_load_knowledge_entries_from_yaml()
test_check_freshness_no_changes()
test_check_freshness_with_hash_mismatch()
test_check_freshness_missing_source()
print("\nAll 8 tests passed!")
if __name__ == "__main__":
run_all()