Compare commits
2 Commits
feat/200-k
...
fix/210-re
| Author | SHA1 | Date | |
|---|---|---|---|
| d7cfda8f03 | |||
| b172e720e4 |
@@ -1,387 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Knowledge Freshness Cron — Detect stale entries from code changes (Issue #200)
|
||||
|
||||
Automatically detects when knowledge entries become stale due to code changes.
|
||||
|
||||
Detection Method:
|
||||
1. Track source file hash alongside knowledge entry
|
||||
2. Compare current file hashes vs stored
|
||||
3. Mismatch → flag entry as potentially stale
|
||||
4. Report stale entries and optionally re-extract
|
||||
|
||||
Usage:
|
||||
python3 scripts/freshness.py --knowledge-dir knowledge/
|
||||
python3 scripts/freshness.py --knowledge-dir knowledge/ --json
|
||||
python3 scripts/freshness.py --knowledge-dir knowledge/ --repo /path/to/repo
|
||||
python3 scripts/freshness.py --knowledge-dir knowledge/ --auto-reextract
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import yaml
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Any, Optional, Tuple
|
||||
|
||||
|
||||
def compute_file_hash(filepath: str) -> Optional[str]:
|
||||
"""Compute SHA-256 hash of a file. Returns None if file doesn't exist."""
|
||||
try:
|
||||
with open(filepath, "rb") as f:
|
||||
return "sha256:" + hashlib.sha256(f.read()).hexdigest()
|
||||
except (FileNotFoundError, IsADirectoryError, PermissionError):
|
||||
return None
|
||||
|
||||
|
||||
def get_git_file_changes(repo_path: str, days: int = 1) -> Dict[str, List[str]]:
|
||||
"""
|
||||
Get files changed in git in the last N days.
|
||||
|
||||
Returns dict with 'modified', 'added', 'deleted' lists of file paths.
|
||||
"""
|
||||
changes = {"modified": [], "added": [], "deleted": []}
|
||||
|
||||
try:
|
||||
# Get commits from last N days
|
||||
cmd = [
|
||||
"git", "-C", repo_path, "log",
|
||||
f"--since={days} days ago",
|
||||
"--name-status",
|
||||
"--pretty=format:",
|
||||
"--diff-filter=MAD"
|
||||
]
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
||||
|
||||
if result.returncode != 0:
|
||||
return changes
|
||||
|
||||
for line in result.stdout.splitlines():
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
parts = line.split('\t', 1)
|
||||
if len(parts) != 2:
|
||||
continue
|
||||
|
||||
status, filepath = parts
|
||||
if status == 'M':
|
||||
changes["modified"].append(filepath)
|
||||
elif status == 'A':
|
||||
changes["added"].append(filepath)
|
||||
elif status == 'D':
|
||||
changes["deleted"].append(filepath)
|
||||
|
||||
except (subprocess.TimeoutExpired, FileNotFoundError):
|
||||
pass
|
||||
|
||||
# Deduplicate
|
||||
for key in changes:
|
||||
changes[key] = list(set(changes[key]))
|
||||
|
||||
return changes
|
||||
|
||||
|
||||
def load_knowledge_entries(knowledge_dir: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Load knowledge entries from YAML files in the knowledge directory.
|
||||
|
||||
Supports:
|
||||
- knowledge/index.json (legacy format)
|
||||
- knowledge/global/*.yaml
|
||||
- knowledge/repos/*.yaml
|
||||
- knowledge/agents/*.yaml
|
||||
"""
|
||||
entries = []
|
||||
|
||||
# Load from index.json if exists
|
||||
index_path = os.path.join(knowledge_dir, "index.json")
|
||||
if os.path.exists(index_path):
|
||||
try:
|
||||
with open(index_path) as f:
|
||||
data = json.load(f)
|
||||
for fact in data.get("facts", []):
|
||||
entries.append({
|
||||
"source": "index.json",
|
||||
"fact": fact.get("fact", ""),
|
||||
"source_file": fact.get("source_file"),
|
||||
"source_hash": fact.get("source_hash"),
|
||||
"category": fact.get("category", "unknown"),
|
||||
"confidence": fact.get("confidence", 0.5)
|
||||
})
|
||||
except (json.JSONDecodeError, KeyError):
|
||||
pass
|
||||
|
||||
# Load from YAML files
|
||||
for subdir in ["global", "repos", "agents"]:
|
||||
subdir_path = os.path.join(knowledge_dir, subdir)
|
||||
if not os.path.isdir(subdir_path):
|
||||
continue
|
||||
|
||||
for filename in os.listdir(subdir_path):
|
||||
if not filename.endswith((".yaml", ".yml")):
|
||||
continue
|
||||
|
||||
filepath = os.path.join(subdir_path, filename)
|
||||
try:
|
||||
with open(filepath) as f:
|
||||
data = yaml.safe_load(f)
|
||||
|
||||
if not data or not isinstance(data, dict):
|
||||
continue
|
||||
|
||||
# Extract entries from YAML structure
|
||||
for key, value in data.items():
|
||||
if isinstance(value, list):
|
||||
for item in value:
|
||||
if isinstance(item, dict):
|
||||
entries.append({
|
||||
"source": f"{subdir}/{filename}",
|
||||
"fact": item.get("description", item.get("fact", "")),
|
||||
"source_file": item.get("source_file"),
|
||||
"source_hash": item.get("source_hash"),
|
||||
"category": item.get("category", "unknown"),
|
||||
"confidence": item.get("confidence", 0.5)
|
||||
})
|
||||
elif isinstance(value, dict):
|
||||
entries.append({
|
||||
"source": f"{subdir}/{filename}",
|
||||
"fact": value.get("description", value.get("fact", "")),
|
||||
"source_file": value.get("source_file"),
|
||||
"source_hash": value.get("source_hash"),
|
||||
"category": value.get("category", "unknown"),
|
||||
"confidence": value.get("confidence", 0.5)
|
||||
})
|
||||
except (yaml.YAMLError, IOError):
|
||||
pass
|
||||
|
||||
return entries
|
||||
|
||||
|
||||
def check_freshness(knowledge_dir: str, repo_root: str = ".",
|
||||
days: int = 1) -> Dict[str, Any]:
|
||||
"""
|
||||
Check freshness of knowledge entries against recent code changes.
|
||||
|
||||
Returns:
|
||||
{
|
||||
"timestamp": ISO timestamp,
|
||||
"total_entries": int,
|
||||
"stale_entries": [...],
|
||||
"fresh_entries": [...],
|
||||
"git_changes": {...},
|
||||
"summary": {...}
|
||||
}
|
||||
"""
|
||||
entries = load_knowledge_entries(knowledge_dir)
|
||||
git_changes = get_git_file_changes(repo_root, days)
|
||||
|
||||
stale_entries = []
|
||||
fresh_entries = []
|
||||
|
||||
for entry in entries:
|
||||
source_file = entry.get("source_file")
|
||||
if not source_file:
|
||||
# Entry without source file reference
|
||||
fresh_entries.append({**entry, "status": "no_source"})
|
||||
continue
|
||||
|
||||
# Check if source file was recently modified
|
||||
is_stale = False
|
||||
reason = ""
|
||||
|
||||
if source_file in git_changes["modified"]:
|
||||
is_stale = True
|
||||
reason = "source_modified"
|
||||
elif source_file in git_changes["deleted"]:
|
||||
is_stale = True
|
||||
reason = "source_deleted"
|
||||
elif source_file in git_changes["added"]:
|
||||
is_stale = True
|
||||
reason = "source_added"
|
||||
|
||||
# Also check hash if available
|
||||
stored_hash = entry.get("source_hash")
|
||||
if stored_hash:
|
||||
full_path = os.path.join(repo_root, source_file)
|
||||
current_hash = compute_file_hash(full_path)
|
||||
|
||||
if current_hash is None:
|
||||
is_stale = True
|
||||
reason = "source_missing"
|
||||
elif current_hash != stored_hash:
|
||||
is_stale = True
|
||||
reason = "hash_mismatch"
|
||||
|
||||
if is_stale:
|
||||
stale_entries.append({
|
||||
**entry,
|
||||
"status": "stale",
|
||||
"reason": reason
|
||||
})
|
||||
else:
|
||||
fresh_entries.append({**entry, "status": "fresh"})
|
||||
|
||||
# Compute summary
|
||||
total = len(entries)
|
||||
stale_count = len(stale_entries)
|
||||
fresh_count = len(fresh_entries)
|
||||
|
||||
# Group stale entries by reason
|
||||
stale_by_reason = {}
|
||||
for entry in stale_entries:
|
||||
reason = entry.get("reason", "unknown")
|
||||
if reason not in stale_by_reason:
|
||||
stale_by_reason[reason] = 0
|
||||
stale_by_reason[reason] += 1
|
||||
|
||||
return {
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"total_entries": total,
|
||||
"stale_entries": stale_entries,
|
||||
"fresh_entries": fresh_entries,
|
||||
"git_changes": git_changes,
|
||||
"summary": {
|
||||
"total": total,
|
||||
"stale": stale_count,
|
||||
"fresh": fresh_count,
|
||||
"stale_percentage": round(stale_count / total * 100, 1) if total > 0 else 0,
|
||||
"stale_by_reason": stale_by_reason,
|
||||
"git_changes_summary": {
|
||||
"modified": len(git_changes["modified"]),
|
||||
"added": len(git_changes["added"]),
|
||||
"deleted": len(git_changes["deleted"])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def update_stale_hashes(knowledge_dir: str, repo_root: str = ".") -> int:
|
||||
"""
|
||||
Update hashes for stale entries. Returns count of updated entries.
|
||||
"""
|
||||
entries = load_knowledge_entries(knowledge_dir)
|
||||
updated = 0
|
||||
|
||||
# This is a simplified version - in practice, you'd need to
|
||||
# write back to the specific YAML files
|
||||
for entry in entries:
|
||||
source_file = entry.get("source_file")
|
||||
if not source_file:
|
||||
continue
|
||||
|
||||
full_path = os.path.join(repo_root, source_file)
|
||||
current_hash = compute_file_hash(full_path)
|
||||
|
||||
if current_hash and entry.get("source_hash") != current_hash:
|
||||
# Mark for update (in practice, you'd write back to the file)
|
||||
updated += 1
|
||||
|
||||
return updated
|
||||
|
||||
|
||||
def format_report(result: Dict[str, Any], max_items: int = 20) -> str:
|
||||
"""Format freshness check results as a human-readable report."""
|
||||
timestamp = result["timestamp"]
|
||||
summary = result["summary"]
|
||||
stale_entries = result["stale_entries"]
|
||||
git_changes = result["git_changes"]
|
||||
|
||||
lines = [
|
||||
"Knowledge Freshness Report",
|
||||
"=" * 50,
|
||||
f"Generated: {timestamp}",
|
||||
f"Total entries: {summary['total']}",
|
||||
f"Stale entries: {summary['stale']} ({summary['stale_percentage']}%)",
|
||||
f"Fresh entries: {summary['fresh']}",
|
||||
""
|
||||
]
|
||||
|
||||
# Git changes summary
|
||||
lines.extend([
|
||||
"Git Changes (last 24h):",
|
||||
f" Modified: {len(git_changes['modified'])} files",
|
||||
f" Added: {len(git_changes['added'])} files",
|
||||
f" Deleted: {len(git_changes['deleted'])} files",
|
||||
""
|
||||
])
|
||||
|
||||
# Stale entries by reason
|
||||
if summary.get("stale_by_reason"):
|
||||
lines.extend([
|
||||
"Stale Entries by Reason:",
|
||||
""
|
||||
])
|
||||
for reason, count in summary["stale_by_reason"].items():
|
||||
lines.append(f" {reason}: {count}")
|
||||
lines.append("")
|
||||
|
||||
# List stale entries
|
||||
if stale_entries:
|
||||
lines.extend([
|
||||
"Stale Entries:",
|
||||
""
|
||||
])
|
||||
for i, entry in enumerate(stale_entries[:max_items], 1):
|
||||
source = entry.get("source_file", "?")
|
||||
reason = entry.get("reason", "unknown")
|
||||
fact = entry.get("fact", "")[:60]
|
||||
lines.append(f"{i:2d}. [{reason}] {source}")
|
||||
if fact:
|
||||
lines.append(f" {fact}")
|
||||
|
||||
if len(stale_entries) > max_items:
|
||||
lines.append(f"\n... and {len(stale_entries) - max_items} more")
|
||||
else:
|
||||
lines.append("No stale entries found. All knowledge is fresh!")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Knowledge Freshness Cron — detect stale entries from code changes")
|
||||
parser.add_argument("--knowledge-dir", required=True,
|
||||
help="Path to knowledge directory")
|
||||
parser.add_argument("--repo", default=".",
|
||||
help="Path to repository for git change detection")
|
||||
parser.add_argument("--days", type=int, default=1,
|
||||
help="Number of days to check for git changes (default: 1)")
|
||||
parser.add_argument("--json", action="store_true",
|
||||
help="Output as JSON instead of human-readable")
|
||||
parser.add_argument("--max", type=int, default=20,
|
||||
help="Maximum stale entries to show (default: 20)")
|
||||
parser.add_argument("--auto-reextract", action="store_true",
|
||||
help="Auto-re-extract knowledge for stale entries")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if not os.path.isdir(args.knowledge_dir):
|
||||
print(f"Error: {args.knowledge_dir} is not a directory", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
if not os.path.isdir(args.repo):
|
||||
print(f"Error: {args.repo} is not a directory", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
result = check_freshness(args.knowledge_dir, args.repo, args.days)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(result, indent=2))
|
||||
else:
|
||||
print(format_report(result, args.max))
|
||||
|
||||
# Auto-re-extract if requested
|
||||
if args.auto_reextract and result["stale_entries"]:
|
||||
print(f"\nAuto-re-extracting {len(result['stale_entries'])} stale entries...")
|
||||
# In a real implementation, this would call the harvester
|
||||
print("(Auto-re-extraction not yet implemented)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,54 +1,144 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Finds refactoring opportunities in codebases
|
||||
|
||||
Engine ID: 10.4
|
||||
|
||||
Usage:
|
||||
python3 scripts/refactoring_opportunity_finder.py --output proposals/refactoring_opportunity_finder.json
|
||||
python3 scripts/refactoring_opportunity_finder.py --output proposals/refactoring_opportunity_finder.json --dry-run
|
||||
Refactoring Opportunity Finder — Engine 10.4
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import ast
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from dataclasses import dataclass, asdict
|
||||
from datetime import datetime, timezone
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
|
||||
def generate_proposals():
|
||||
"""Generate sample proposals for this engine."""
|
||||
# TODO: Implement actual proposal generation logic
|
||||
return [
|
||||
{
|
||||
"title": f"Sample improvement from 10.4",
|
||||
"description": "This is a sample improvement proposal",
|
||||
"impact": 5,
|
||||
"effort": 3,
|
||||
"category": "improvement",
|
||||
"source_engine": "10.4",
|
||||
"timestamp": datetime.now(timezone.utc).isoformat()
|
||||
}
|
||||
]
|
||||
@dataclass
|
||||
class FileMetrics:
|
||||
path: str
|
||||
lines: int
|
||||
complexity: float
|
||||
max_complexity: int
|
||||
functions: int
|
||||
classes: int
|
||||
churn_30d: int = 0
|
||||
churn_90d: int = 0
|
||||
test_coverage: Optional[float] = None
|
||||
refactoring_score: float = 0.0
|
||||
|
||||
|
||||
class ComplexityVisitor(ast.NodeVisitor):
|
||||
def __init__(self):
|
||||
self.functions = []
|
||||
self.classes = 0
|
||||
|
||||
def visit_FunctionDef(self, node):
|
||||
complexity = 1
|
||||
for child in ast.walk(node):
|
||||
if isinstance(child, (ast.If, ast.While, ast.For)):
|
||||
complexity += 1
|
||||
elif isinstance(child, ast.BoolOp):
|
||||
complexity += len(child.values) - 1
|
||||
elif isinstance(child, ast.ExceptHandler):
|
||||
complexity += 1
|
||||
elif isinstance(child, ast.Assert):
|
||||
complexity += 1
|
||||
self.functions.append((node.name, complexity, node.lineno))
|
||||
self.generic_visit(node)
|
||||
|
||||
def visit_AsyncFunctionDef(self, node):
|
||||
self.visit_FunctionDef(node)
|
||||
|
||||
def visit_ClassDef(self, node):
|
||||
self.classes += 1
|
||||
self.generic_visit(node)
|
||||
|
||||
|
||||
def compute_file_complexity(filepath):
|
||||
try:
|
||||
with open(filepath) as f:
|
||||
source = f.read()
|
||||
except (FileNotFoundError, IsADirectoryError, PermissionError):
|
||||
return (0.0, 0, 0, 0, 0)
|
||||
try:
|
||||
tree = ast.parse(source)
|
||||
except SyntaxError:
|
||||
return (0.0, 0, 0, 0, 0)
|
||||
lines = source.count("\n") + 1 if source.strip() else 0
|
||||
visitor = ComplexityVisitor()
|
||||
visitor.visit(tree)
|
||||
if not visitor.functions:
|
||||
return (0.0, 0, 0, visitor.classes, lines)
|
||||
complexities = [c for _, c, _ in visitor.functions]
|
||||
avg = sum(complexities) / len(complexities)
|
||||
max_c = max(complexities)
|
||||
return (round(avg, 1), max_c, len(visitor.functions), visitor.classes, lines)
|
||||
|
||||
|
||||
def calculate_refactoring_score(metrics):
|
||||
score = 0.0
|
||||
complexity_score = min(40, metrics.complexity * 4)
|
||||
if metrics.max_complexity > 10:
|
||||
complexity_score = min(40, complexity_score + (metrics.max_complexity - 10))
|
||||
score += complexity_score
|
||||
if metrics.lines <= 0:
|
||||
pass
|
||||
elif metrics.lines <= 100:
|
||||
score += metrics.lines * 0.1
|
||||
elif metrics.lines <= 500:
|
||||
score += 10 + (metrics.lines - 100) * 0.0125
|
||||
else:
|
||||
score += min(20, 15 + (metrics.lines - 500) * 0.01)
|
||||
churn_score = (metrics.churn_30d * 2) + (metrics.churn_90d * 0.5)
|
||||
score += min(30, churn_score)
|
||||
if metrics.test_coverage is None:
|
||||
score += 5
|
||||
elif metrics.test_coverage < 0.3:
|
||||
score += 10
|
||||
elif metrics.test_coverage < 0.6:
|
||||
score += 7
|
||||
elif metrics.test_coverage < 0.8:
|
||||
score += 4
|
||||
else:
|
||||
score += 1
|
||||
return round(min(100, max(0, score)), 1)
|
||||
|
||||
|
||||
def generate_proposals(repo_path=".", threshold=30.0):
|
||||
proposals = []
|
||||
for root, dirs, files in os.walk(repo_path):
|
||||
dirs[:] = [d for d in dirs if not d.startswith((".", "__pycache__", "node_modules", ".git", "venv"))]
|
||||
for fname in files:
|
||||
if not fname.endswith(".py"):
|
||||
continue
|
||||
filepath = os.path.join(root, fname)
|
||||
relpath = os.path.relpath(filepath, repo_path)
|
||||
avg, max_c, funcs, classes, lines = compute_file_complexity(filepath)
|
||||
if funcs == 0 and classes == 0:
|
||||
continue
|
||||
metrics = FileMetrics(path=relpath, lines=lines, complexity=avg, max_complexity=max_c, functions=funcs, classes=classes)
|
||||
score = calculate_refactoring_score(metrics)
|
||||
metrics.refactoring_score = score
|
||||
if score >= threshold:
|
||||
proposals.append({"title": f"Refactor {relpath} (score: {score})", "impact": min(10, int(score / 10)), "effort": min(10, max(1, funcs // 3)), "category": "refactoring", "source_engine": "10.4", "timestamp": datetime.now(timezone.utc).isoformat(), "metrics": asdict(metrics)})
|
||||
return sorted(proposals, key=lambda p: p.get("metrics", {}).get("refactoring_score", 0), reverse=True)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Finds refactoring opportunities in codebases")
|
||||
parser.add_argument("--output", required=True, help="Output file for proposals")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Don't write output file")
|
||||
|
||||
parser = argparse.ArgumentParser(description="Find refactoring opportunities")
|
||||
parser.add_argument("--repo", default=".")
|
||||
parser.add_argument("--output", required=True)
|
||||
parser.add_argument("--dry-run", action="store_true")
|
||||
parser.add_argument("--threshold", type=float, default=30.0)
|
||||
args = parser.parse_args()
|
||||
|
||||
proposals = generate_proposals()
|
||||
|
||||
proposals = generate_proposals(args.repo, args.threshold)
|
||||
if not args.dry_run:
|
||||
with open(args.output, "w") as f:
|
||||
json.dump({"proposals": proposals}, f, indent=2)
|
||||
print(f"Generated {len(proposals)} proposals -> {args.output}")
|
||||
else:
|
||||
print(f"Would generate {len(proposals)} proposals")
|
||||
for p in proposals:
|
||||
print(f" - {p['title']}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -19,208 +19,95 @@ FileMetrics = mod.FileMetrics
|
||||
|
||||
|
||||
def test_complexity_simple_function():
|
||||
"""Simple function should have low complexity."""
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
|
||||
f.write("""
|
||||
def simple():
|
||||
return 42
|
||||
""")
|
||||
f.write("\ndef simple():\n return 42\n")
|
||||
f.flush()
|
||||
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
|
||||
assert avg == 1.0, f"Expected 1.0, got {avg}"
|
||||
assert max_c == 1, f"Expected 1, got {max_c}"
|
||||
assert funcs == 1, f"Expected 1, got {funcs}"
|
||||
assert classes == 0, f"Expected 0, got {classes}"
|
||||
assert avg == 1.0
|
||||
assert max_c == 1
|
||||
assert funcs == 1
|
||||
assert classes == 0
|
||||
os.unlink(f.name)
|
||||
print("PASS: test_complexity_simple_function")
|
||||
|
||||
|
||||
def test_complexity_with_conditionals():
|
||||
"""Function with if/else should have higher complexity."""
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
|
||||
f.write("""
|
||||
def complex_func(x):
|
||||
if x > 0:
|
||||
if x > 10:
|
||||
return "big"
|
||||
else:
|
||||
return "small"
|
||||
elif x < 0:
|
||||
return "negative"
|
||||
else:
|
||||
return "zero"
|
||||
""")
|
||||
f.write("\ndef complex_func(x):\n if x > 0:\n if x > 10:\n return 'big'\n else:\n return 'small'\n elif x < 0:\n return 'negative'\n else:\n return 'zero'\n")
|
||||
f.flush()
|
||||
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
|
||||
# Base 1 + 3 if/elif + 1 nested if = 5
|
||||
assert max_c >= 4, f"Expected max_c >= 4, got {max_c}"
|
||||
assert funcs == 1, f"Expected 1, got {funcs}"
|
||||
assert max_c >= 4
|
||||
assert funcs == 1
|
||||
os.unlink(f.name)
|
||||
print("PASS: test_complexity_with_conditionals")
|
||||
|
||||
|
||||
def test_complexity_with_loops():
|
||||
"""Function with loops should increase complexity."""
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
|
||||
f.write("""
|
||||
def loop_func(items):
|
||||
result = []
|
||||
for item in items:
|
||||
if item > 0:
|
||||
result.append(item)
|
||||
while len(result) > 10:
|
||||
result.pop()
|
||||
return result
|
||||
""")
|
||||
f.write("\ndef loop_func(items):\n result = []\n for item in items:\n if item > 0:\n result.append(item)\n while len(result) > 10:\n result.pop()\n return result\n")
|
||||
f.flush()
|
||||
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
|
||||
# Base 1 + 1 for + 1 if + 1 while = 4
|
||||
assert max_c >= 3, f"Expected max_c >= 3, got {max_c}"
|
||||
assert max_c >= 3
|
||||
os.unlink(f.name)
|
||||
print("PASS: test_complexity_with_loops")
|
||||
|
||||
|
||||
def test_complexity_with_class():
|
||||
"""Class with methods should count both."""
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
|
||||
f.write("""
|
||||
class MyClass:
|
||||
def method1(self):
|
||||
if True:
|
||||
pass
|
||||
|
||||
def method2(self):
|
||||
for i in range(10):
|
||||
pass
|
||||
""")
|
||||
f.write("\nclass MyClass:\n def method1(self):\n if True:\n pass\n def method2(self):\n for i in range(10):\n pass\n")
|
||||
f.flush()
|
||||
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
|
||||
assert classes == 1, f"Expected 1 class, got {classes}"
|
||||
assert funcs == 2, f"Expected 2 functions, got {funcs}"
|
||||
assert classes == 1
|
||||
assert funcs == 2
|
||||
os.unlink(f.name)
|
||||
print("PASS: test_complexity_with_class")
|
||||
|
||||
|
||||
def test_complexity_syntax_error():
|
||||
"""File with syntax error should return zeros."""
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
|
||||
f.write("def broken(:\n pass")
|
||||
f.flush()
|
||||
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
|
||||
assert avg == 0.0, f"Expected 0.0, got {avg}"
|
||||
assert funcs == 0, f"Expected 0, got {funcs}"
|
||||
assert avg == 0.0
|
||||
assert funcs == 0
|
||||
os.unlink(f.name)
|
||||
print("PASS: test_complexity_syntax_error")
|
||||
|
||||
|
||||
def test_refactoring_score_high_complexity():
|
||||
"""High complexity should give high score."""
|
||||
metrics = FileMetrics(
|
||||
path="test.py",
|
||||
lines=200,
|
||||
complexity=15.0,
|
||||
max_complexity=25,
|
||||
functions=10,
|
||||
classes=2,
|
||||
churn_30d=5,
|
||||
churn_90d=15,
|
||||
test_coverage=0.3,
|
||||
refactoring_score=0.0
|
||||
)
|
||||
metrics = FileMetrics(path="test.py", lines=200, complexity=15.0, max_complexity=25, functions=10, classes=2, churn_30d=5, churn_90d=15, test_coverage=0.3, refactoring_score=0.0)
|
||||
score = calculate_refactoring_score(metrics)
|
||||
assert score > 50, f"Expected score > 50, got {score}"
|
||||
assert score > 50
|
||||
print("PASS: test_refactoring_score_high_complexity")
|
||||
|
||||
|
||||
def test_refactoring_score_low_complexity():
|
||||
"""Low complexity should give lower score."""
|
||||
metrics = FileMetrics(
|
||||
path="test.py",
|
||||
lines=50,
|
||||
complexity=2.0,
|
||||
max_complexity=3,
|
||||
functions=3,
|
||||
classes=0,
|
||||
churn_30d=0,
|
||||
churn_90d=1,
|
||||
test_coverage=0.9,
|
||||
refactoring_score=0.0
|
||||
)
|
||||
metrics = FileMetrics(path="test.py", lines=50, complexity=2.0, max_complexity=3, functions=3, classes=0, churn_30d=0, churn_90d=1, test_coverage=0.9, refactoring_score=0.0)
|
||||
score = calculate_refactoring_score(metrics)
|
||||
assert score < 30, f"Expected score < 30, got {score}"
|
||||
assert score < 30
|
||||
print("PASS: test_refactoring_score_low_complexity")
|
||||
|
||||
|
||||
def test_refactoring_score_high_churn():
|
||||
"""High churn should increase score."""
|
||||
metrics = FileMetrics(
|
||||
path="test.py",
|
||||
lines=100,
|
||||
complexity=5.0,
|
||||
max_complexity=8,
|
||||
functions=5,
|
||||
classes=0,
|
||||
churn_30d=10,
|
||||
churn_90d=20,
|
||||
test_coverage=0.5,
|
||||
refactoring_score=0.0
|
||||
)
|
||||
metrics = FileMetrics(path="test.py", lines=100, complexity=5.0, max_complexity=8, functions=5, classes=0, churn_30d=10, churn_90d=20, test_coverage=0.5, refactoring_score=0.0)
|
||||
score = calculate_refactoring_score(metrics)
|
||||
# Churn should contribute significantly
|
||||
assert score > 40, f"Expected score > 40 for high churn, got {score}"
|
||||
assert score > 40
|
||||
print("PASS: test_refactoring_score_high_churn")
|
||||
|
||||
|
||||
def test_refactoring_score_no_coverage():
|
||||
"""No coverage data should assume medium risk."""
|
||||
metrics = FileMetrics(
|
||||
path="test.py",
|
||||
lines=100,
|
||||
complexity=5.0,
|
||||
max_complexity=8,
|
||||
functions=5,
|
||||
classes=0,
|
||||
churn_30d=1,
|
||||
churn_90d=2,
|
||||
test_coverage=None,
|
||||
refactoring_score=0.0
|
||||
)
|
||||
metrics = FileMetrics(path="test.py", lines=100, complexity=5.0, max_complexity=8, functions=5, classes=0, churn_30d=1, churn_90d=2, test_coverage=None, refactoring_score=0.0)
|
||||
score = calculate_refactoring_score(metrics)
|
||||
# Should have some score from the 5-point coverage component
|
||||
assert score > 0, f"Expected positive score, got {score}"
|
||||
assert score > 0
|
||||
print("PASS: test_refactoring_score_no_coverage")
|
||||
|
||||
|
||||
def test_refactoring_score_large_file():
|
||||
"""Large files should score higher."""
|
||||
metrics_small = FileMetrics(
|
||||
path="small.py",
|
||||
lines=50,
|
||||
complexity=5.0,
|
||||
max_complexity=8,
|
||||
functions=3,
|
||||
classes=0,
|
||||
churn_30d=1,
|
||||
churn_90d=2,
|
||||
test_coverage=0.8,
|
||||
refactoring_score=0.0
|
||||
)
|
||||
metrics_large = FileMetrics(
|
||||
path="large.py",
|
||||
lines=1000,
|
||||
complexity=5.0,
|
||||
max_complexity=8,
|
||||
functions=3,
|
||||
classes=0,
|
||||
churn_30d=1,
|
||||
churn_90d=2,
|
||||
test_coverage=0.8,
|
||||
refactoring_score=0.0
|
||||
)
|
||||
metrics_small = FileMetrics(path="small.py", lines=50, complexity=5.0, max_complexity=8, functions=3, classes=0, churn_30d=1, churn_90d=2, test_coverage=0.8, refactoring_score=0.0)
|
||||
metrics_large = FileMetrics(path="large.py", lines=1000, complexity=5.0, max_complexity=8, functions=3, classes=0, churn_30d=1, churn_90d=2, test_coverage=0.8, refactoring_score=0.0)
|
||||
score_small = calculate_refactoring_score(metrics_small)
|
||||
score_large = calculate_refactoring_score(metrics_large)
|
||||
assert score_large > score_small, \
|
||||
f"Large file ({score_large}) should score higher than small ({score_small})"
|
||||
assert score_large > score_small
|
||||
print("PASS: test_refactoring_score_large_file")
|
||||
|
||||
|
||||
@@ -239,4 +126,4 @@ def run_all():
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
run_all()
|
||||
run_all()
|
||||
|
||||
@@ -1,227 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Tests for scripts/freshness.py — 8 tests."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__) or ".", ".."))
|
||||
import importlib.util
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
"freshness", os.path.join(os.path.dirname(__file__) or ".", "..", "scripts", "freshness.py"))
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
|
||||
compute_file_hash = mod.compute_file_hash
|
||||
check_freshness = mod.check_freshness
|
||||
load_knowledge_entries = mod.load_knowledge_entries
|
||||
|
||||
|
||||
def test_compute_file_hash():
|
||||
"""File hash should be computed correctly."""
|
||||
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
|
||||
f.write("test content")
|
||||
f.flush()
|
||||
h = compute_file_hash(f.name)
|
||||
assert h is not None
|
||||
assert h.startswith("sha256:")
|
||||
os.unlink(f.name)
|
||||
print("PASS: test_compute_file_hash")
|
||||
|
||||
|
||||
def test_compute_file_hash_nonexistent():
|
||||
"""Nonexistent file should return None."""
|
||||
h = compute_file_hash("/nonexistent/file.txt")
|
||||
assert h is None
|
||||
print("PASS: test_compute_file_hash_nonexistent")
|
||||
|
||||
|
||||
def test_load_knowledge_entries_empty():
|
||||
"""Empty knowledge dir should return empty list."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
entries = load_knowledge_entries(tmpdir)
|
||||
assert entries == []
|
||||
print("PASS: test_load_knowledge_entries_empty")
|
||||
|
||||
|
||||
def test_load_knowledge_entries_from_index():
|
||||
"""Should load entries from index.json."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create index.json
|
||||
index_path = os.path.join(tmpdir, "index.json")
|
||||
with open(index_path, "w") as f:
|
||||
json.dump({
|
||||
"facts": [
|
||||
{
|
||||
"fact": "Test fact",
|
||||
"source_file": "test.py",
|
||||
"source_hash": "sha256:abc123",
|
||||
"category": "fact",
|
||||
"confidence": 0.9
|
||||
}
|
||||
]
|
||||
}, f)
|
||||
|
||||
entries = load_knowledge_entries(tmpdir)
|
||||
assert len(entries) == 1
|
||||
assert entries[0]["fact"] == "Test fact"
|
||||
assert entries[0]["source_file"] == "test.py"
|
||||
print("PASS: test_load_knowledge_entries_from_index")
|
||||
|
||||
|
||||
def test_load_knowledge_entries_from_yaml():
|
||||
"""Should load entries from YAML files."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create global directory
|
||||
global_dir = os.path.join(tmpdir, "global")
|
||||
os.makedirs(global_dir)
|
||||
|
||||
# Create YAML file
|
||||
yaml_path = os.path.join(global_dir, "test.yaml")
|
||||
with open(yaml_path, "w") as f:
|
||||
f.write("""
|
||||
pitfalls:
|
||||
- description: "Test pitfall"
|
||||
source_file: "test.py"
|
||||
source_hash: "sha256:def456"
|
||||
category: "pitfall"
|
||||
confidence: 0.8
|
||||
""")
|
||||
|
||||
entries = load_knowledge_entries(tmpdir)
|
||||
assert len(entries) == 1
|
||||
assert entries[0]["fact"] == "Test pitfall"
|
||||
assert entries[0]["category"] == "pitfall"
|
||||
print("PASS: test_load_knowledge_entries_from_yaml")
|
||||
|
||||
|
||||
def test_check_freshness_no_changes():
|
||||
"""With no source file reference, entries should be counted correctly."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create knowledge dir
|
||||
knowledge_dir = os.path.join(tmpdir, "knowledge")
|
||||
os.makedirs(knowledge_dir)
|
||||
|
||||
# Create repo dir
|
||||
repo_dir = os.path.join(tmpdir, "repo")
|
||||
os.makedirs(repo_dir)
|
||||
|
||||
# Create index.json with entry that has no source_file
|
||||
index_path = os.path.join(knowledge_dir, "index.json")
|
||||
with open(index_path, "w") as f:
|
||||
json.dump({
|
||||
"facts": [
|
||||
{
|
||||
"fact": "General knowledge",
|
||||
"category": "fact",
|
||||
"confidence": 0.9
|
||||
# No source_file or source_hash
|
||||
}
|
||||
]
|
||||
}, f)
|
||||
|
||||
result = check_freshness(knowledge_dir, repo_dir, days=1)
|
||||
|
||||
# Entry without source_file should be counted as "fresh" (no_source status)
|
||||
assert result["summary"]["total"] == 1
|
||||
assert result["summary"]["stale"] == 0
|
||||
assert result["summary"]["fresh"] == 1
|
||||
assert result["fresh_entries"][0]["status"] == "no_source"
|
||||
print("PASS: test_check_freshness_no_changes")
|
||||
|
||||
|
||||
def test_check_freshness_with_hash_mismatch():
|
||||
"""Hash mismatch should mark entry as stale."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create knowledge dir
|
||||
knowledge_dir = os.path.join(tmpdir, "knowledge")
|
||||
os.makedirs(knowledge_dir)
|
||||
|
||||
# Create repo dir with a file
|
||||
repo_dir = os.path.join(tmpdir, "repo")
|
||||
os.makedirs(repo_dir)
|
||||
|
||||
test_file = os.path.join(repo_dir, "test.py")
|
||||
with open(test_file, "w") as f:
|
||||
f.write("print('hello')")
|
||||
|
||||
# Create index.json with wrong hash
|
||||
index_path = os.path.join(knowledge_dir, "index.json")
|
||||
with open(index_path, "w") as f:
|
||||
json.dump({
|
||||
"facts": [
|
||||
{
|
||||
"fact": "Test fact",
|
||||
"source_file": "test.py",
|
||||
"source_hash": "sha256:wronghash",
|
||||
"category": "fact",
|
||||
"confidence": 0.9
|
||||
}
|
||||
]
|
||||
}, f)
|
||||
|
||||
# Initialize git repo
|
||||
os.system(f"cd {repo_dir} && git init && git add . && git commit -m 'init' 2>/dev/null")
|
||||
|
||||
result = check_freshness(knowledge_dir, repo_dir, days=1)
|
||||
|
||||
assert result["summary"]["total"] == 1
|
||||
assert result["summary"]["stale"] == 1
|
||||
assert result["summary"]["fresh"] == 0
|
||||
assert result["stale_entries"][0]["reason"] == "hash_mismatch"
|
||||
print("PASS: test_check_freshness_with_hash_mismatch")
|
||||
|
||||
|
||||
def test_check_freshness_missing_source():
|
||||
"""Missing source file should mark entry as stale."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create knowledge dir
|
||||
knowledge_dir = os.path.join(tmpdir, "knowledge")
|
||||
os.makedirs(knowledge_dir)
|
||||
|
||||
# Create repo dir (without the referenced file)
|
||||
repo_dir = os.path.join(tmpdir, "repo")
|
||||
os.makedirs(repo_dir)
|
||||
|
||||
# Create index.json referencing nonexistent file
|
||||
index_path = os.path.join(knowledge_dir, "index.json")
|
||||
with open(index_path, "w") as f:
|
||||
json.dump({
|
||||
"facts": [
|
||||
{
|
||||
"fact": "Test fact",
|
||||
"source_file": "nonexistent.py",
|
||||
"source_hash": "sha256:abc123",
|
||||
"category": "fact",
|
||||
"confidence": 0.9
|
||||
}
|
||||
]
|
||||
}, f)
|
||||
|
||||
# Initialize git repo
|
||||
os.system(f"cd {repo_dir} && git init && git add . && git commit -m 'init' 2>/dev/null")
|
||||
|
||||
result = check_freshness(knowledge_dir, repo_dir, days=1)
|
||||
|
||||
assert result["summary"]["total"] == 1
|
||||
assert result["summary"]["stale"] == 1
|
||||
assert result["summary"]["fresh"] == 0
|
||||
assert result["stale_entries"][0]["reason"] == "source_missing"
|
||||
print("PASS: test_check_freshness_missing_source")
|
||||
|
||||
|
||||
def run_all():
|
||||
test_compute_file_hash()
|
||||
test_compute_file_hash_nonexistent()
|
||||
test_load_knowledge_entries_empty()
|
||||
test_load_knowledge_entries_from_index()
|
||||
test_load_knowledge_entries_from_yaml()
|
||||
test_check_freshness_no_changes()
|
||||
test_check_freshness_with_hash_mismatch()
|
||||
test_check_freshness_missing_source()
|
||||
print("\nAll 8 tests passed!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
run_all()
|
||||
Reference in New Issue
Block a user