Compare commits

..

2 Commits

Author SHA1 Message Date
d7cfda8f03 test: sync refactoring opportunity finder tests (#210)
Some checks failed
Test / pytest (pull_request) Failing after 49s
2026-04-21 11:30:59 +00:00
b172e720e4 feat: implement refactoring opportunity finder — AST complexity + scoring (#210) 2026-04-21 11:30:58 +00:00
2 changed files with 108 additions and 327 deletions

View File

@@ -1,16 +1,6 @@
#!/usr/bin/env python3
"""
Refactoring Opportunity Finder
Analyzes Python codebases for refactoring opportunities based on:
- Cyclomatic complexity
- File size and churn
- Test coverage
- Class/function counts
Usage:
python3 scripts/refactoring_opportunity_finder.py --root . --output proposals.json
python3 scripts/refactoring_opportunity_finder.py --root . --output proposals.json --dry-run
Refactoring Opportunity Finder — Engine 10.4
"""
import argparse
@@ -18,15 +8,13 @@ import ast
import json
import os
import sys
from dataclasses import dataclass, field
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Optional, Tuple
@dataclass
class FileMetrics:
"""Metrics for a single file."""
path: str
lines: int
complexity: float
@@ -39,212 +27,118 @@ class FileMetrics:
refactoring_score: float = 0.0
def _compute_function_complexity(node: ast.FunctionDef) -> int:
"""Compute cyclomatic complexity of a single function."""
complexity = 1 # Base complexity
for child in ast.walk(node):
if isinstance(child, (ast.If, ast.While, ast.For)):
complexity += 1
elif isinstance(child, ast.BoolOp):
# and/or add complexity for each additional value
complexity += len(child.values) - 1
elif isinstance(child, ast.ExceptHandler):
complexity += 1
elif isinstance(child, ast.Assert):
complexity += 1
elif isinstance(child, ast.comprehension):
complexity += 1
complexity += len(child.ifs)
return complexity
class ComplexityVisitor(ast.NodeVisitor):
def __init__(self):
self.functions = []
self.classes = 0
def visit_FunctionDef(self, node):
complexity = 1
for child in ast.walk(node):
if isinstance(child, (ast.If, ast.While, ast.For)):
complexity += 1
elif isinstance(child, ast.BoolOp):
complexity += len(child.values) - 1
elif isinstance(child, ast.ExceptHandler):
complexity += 1
elif isinstance(child, ast.Assert):
complexity += 1
self.functions.append((node.name, complexity, node.lineno))
self.generic_visit(node)
def visit_AsyncFunctionDef(self, node):
self.visit_FunctionDef(node)
def visit_ClassDef(self, node):
self.classes += 1
self.generic_visit(node)
def compute_file_complexity(filepath: str) -> Tuple[float, int, int, int, int]:
"""
Compute complexity metrics for a Python file.
Returns:
(avg_complexity, max_complexity, function_count, class_count, line_count)
"""
def compute_file_complexity(filepath):
try:
with open(filepath, "r", encoding="utf-8", errors="replace") as f:
with open(filepath) as f:
source = f.read()
except (OSError, IOError):
return 0.0, 0, 0, 0, 0
lines = source.count("\n") + 1
except (FileNotFoundError, IsADirectoryError, PermissionError):
return (0.0, 0, 0, 0, 0)
try:
tree = ast.parse(source, filename=filepath)
tree = ast.parse(source)
except SyntaxError:
return 0.0, 0, 0, 0, lines
functions = []
classes = []
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
classes.append(node)
elif isinstance(node, ast.FunctionDef):
functions.append(node)
if not functions:
return 0.0, 0, len(functions), len(classes), lines
complexities = [_compute_function_complexity(fn) for fn in functions]
return (0.0, 0, 0, 0, 0)
lines = source.count("\n") + 1 if source.strip() else 0
visitor = ComplexityVisitor()
visitor.visit(tree)
if not visitor.functions:
return (0.0, 0, 0, visitor.classes, lines)
complexities = [c for _, c, _ in visitor.functions]
avg = sum(complexities) / len(complexities)
max_c = max(complexities) if complexities else 0
return round(avg, 2), max_c, len(functions), len(classes), lines
max_c = max(complexities)
return (round(avg, 1), max_c, len(visitor.functions), visitor.classes, lines)
def calculate_refactoring_score(metrics: FileMetrics) -> float:
"""
Calculate a refactoring priority score (0-100) based on metrics.
Higher score = more urgent refactoring candidate.
Components:
- Complexity (0-30): weighted by avg and max complexity
- Size (0-20): larger files score higher
- Churn (0-25): frequently changed files score higher
- Coverage (0-15): low/no coverage scores higher
- Density (0-10): many functions/classes in small space
"""
import math
def calculate_refactoring_score(metrics):
score = 0.0
# Complexity component (0-30)
# avg=5 -> ~10, avg=10 -> ~20, avg=15+ -> ~30
complexity_score = min(30, metrics.complexity * 2)
# Bonus for high max complexity
complexity_score = min(40, metrics.complexity * 4)
if metrics.max_complexity > 10:
complexity_score = min(30, complexity_score + (metrics.max_complexity - 10))
complexity_score = min(40, complexity_score + (metrics.max_complexity - 10))
score += complexity_score
# Size component (0-20)
# 50 lines -> ~2, 200 lines -> ~8, 500 lines -> ~15, 1000+ -> ~20
if metrics.lines > 0:
size_score = min(20, math.log2(max(1, metrics.lines)) * 2.5)
if metrics.lines <= 0:
pass
elif metrics.lines <= 100:
score += metrics.lines * 0.1
elif metrics.lines <= 500:
score += 10 + (metrics.lines - 100) * 0.0125
else:
size_score = 0
score += size_score
# Churn component (0-25)
# Weighted combination of 30d and 90d churn
churn_score = min(25, (metrics.churn_30d * 1.5) + (metrics.churn_90d * 0.5))
score += churn_score
# Coverage component (0-15)
# Low coverage = higher score
score += min(20, 15 + (metrics.lines - 500) * 0.01)
churn_score = (metrics.churn_30d * 2) + (metrics.churn_90d * 0.5)
score += min(30, churn_score)
if metrics.test_coverage is None:
# No data -> assume medium risk
score += 5
elif metrics.test_coverage < 0.3:
score += 15
elif metrics.test_coverage < 0.5:
score += 10
elif metrics.test_coverage < 0.6:
score += 7
elif metrics.test_coverage < 0.8:
score += 5
# else: good coverage, no penalty
# Density component (0-10)
# Many functions/classes packed into small space
if metrics.lines > 0:
density = (metrics.functions + metrics.classes * 3) / (metrics.lines / 100)
density_score = min(10, density * 2)
score += 4
else:
density_score = 0
score += density_score
return round(min(100, max(0, score)), 2)
score += 1
return round(min(100, max(0, score)), 1)
def analyze_file(filepath: str, root: str = ".") -> Optional[FileMetrics]:
"""Analyze a single Python file and return metrics."""
try:
rel_path = os.path.relpath(filepath, root)
except ValueError:
rel_path = filepath
avg, max_c, funcs, classes, lines = compute_file_complexity(filepath)
metrics = FileMetrics(
path=rel_path,
lines=lines,
complexity=avg,
max_complexity=max_c,
functions=funcs,
classes=classes,
)
metrics.refactoring_score = calculate_refactoring_score(metrics)
return metrics
def find_python_files(root: str) -> List[str]:
"""Find all Python files under root, excluding common non-source dirs."""
skip_dirs = {".git", "__pycache__", ".tox", ".eggs", "node_modules", ".venv", "venv", "env"}
files = []
for dirpath, dirnames, filenames in os.walk(root):
dirnames[:] = [d for d in dirnames if d not in skip_dirs]
for fn in filenames:
if fn.endswith(".py"):
files.append(os.path.join(dirpath, fn))
return sorted(files)
def generate_proposals(root: str = ".", min_score: float = 30.0) -> List[dict]:
"""Generate refactoring proposals for the codebase."""
files = find_python_files(root)
def generate_proposals(repo_path=".", threshold=30.0):
proposals = []
for filepath in files:
metrics = analyze_file(filepath, root)
if metrics and metrics.refactoring_score >= min_score:
proposals.append({
"title": f"Refactor {metrics.path} (score: {metrics.refactoring_score})",
"description": (
f"File has complexity avg={metrics.complexity:.1f} max={metrics.max_complexity}, "
f"{metrics.functions} functions, {metrics.classes} classes, {metrics.lines} lines."
),
"impact": min(10, int(metrics.refactoring_score / 10)),
"effort": min(10, max(1, int(metrics.complexity / 2))),
"category": "refactoring",
"source_engine": "refactoring_opportunity_finder",
"timestamp": datetime.now(timezone.utc).isoformat(),
"metrics": {
"path": metrics.path,
"complexity": metrics.complexity,
"max_complexity": metrics.max_complexity,
"lines": metrics.lines,
"refactoring_score": metrics.refactoring_score,
}
})
# Sort by score descending
proposals.sort(key=lambda p: p.get("metrics", {}).get("refactoring_score", 0), reverse=True)
return proposals
for root, dirs, files in os.walk(repo_path):
dirs[:] = [d for d in dirs if not d.startswith((".", "__pycache__", "node_modules", ".git", "venv"))]
for fname in files:
if not fname.endswith(".py"):
continue
filepath = os.path.join(root, fname)
relpath = os.path.relpath(filepath, repo_path)
avg, max_c, funcs, classes, lines = compute_file_complexity(filepath)
if funcs == 0 and classes == 0:
continue
metrics = FileMetrics(path=relpath, lines=lines, complexity=avg, max_complexity=max_c, functions=funcs, classes=classes)
score = calculate_refactoring_score(metrics)
metrics.refactoring_score = score
if score >= threshold:
proposals.append({"title": f"Refactor {relpath} (score: {score})", "impact": min(10, int(score / 10)), "effort": min(10, max(1, funcs // 3)), "category": "refactoring", "source_engine": "10.4", "timestamp": datetime.now(timezone.utc).isoformat(), "metrics": asdict(metrics)})
return sorted(proposals, key=lambda p: p.get("metrics", {}).get("refactoring_score", 0), reverse=True)
def main():
parser = argparse.ArgumentParser(description="Find refactoring opportunities")
parser.add_argument("--root", default=".", help="Root directory to scan")
parser.add_argument("--output", required=True, help="Output file for proposals")
parser.add_argument("--dry-run", action="store_true", help="Don't write output file")
parser.add_argument("--min-score", type=float, default=30.0, help="Minimum score threshold")
parser.add_argument("--repo", default=".")
parser.add_argument("--output", required=True)
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--threshold", type=float, default=30.0)
args = parser.parse_args()
proposals = generate_proposals(args.root, args.min_score)
proposals = generate_proposals(args.repo, args.threshold)
if not args.dry_run:
with open(args.output, "w") as f:
json.dump({"proposals": proposals}, f, indent=2)
print(f"Generated {len(proposals)} proposals -> {args.output}")
else:
print(f"Would generate {len(proposals)} proposals")
for p in proposals[:10]:
for p in proposals:
print(f" - {p['title']}")
if __name__ == "__main__":
main()

View File

@@ -19,208 +19,95 @@ FileMetrics = mod.FileMetrics
def test_complexity_simple_function():
"""Simple function should have low complexity."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("""
def simple():
return 42
""")
f.write("\ndef simple():\n return 42\n")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
assert avg == 1.0, f"Expected 1.0, got {avg}"
assert max_c == 1, f"Expected 1, got {max_c}"
assert funcs == 1, f"Expected 1, got {funcs}"
assert classes == 0, f"Expected 0, got {classes}"
assert avg == 1.0
assert max_c == 1
assert funcs == 1
assert classes == 0
os.unlink(f.name)
print("PASS: test_complexity_simple_function")
def test_complexity_with_conditionals():
"""Function with if/else should have higher complexity."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("""
def complex_func(x):
if x > 0:
if x > 10:
return "big"
else:
return "small"
elif x < 0:
return "negative"
else:
return "zero"
""")
f.write("\ndef complex_func(x):\n if x > 0:\n if x > 10:\n return 'big'\n else:\n return 'small'\n elif x < 0:\n return 'negative'\n else:\n return 'zero'\n")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
# Base 1 + 3 if/elif + 1 nested if = 5
assert max_c >= 4, f"Expected max_c >= 4, got {max_c}"
assert funcs == 1, f"Expected 1, got {funcs}"
assert max_c >= 4
assert funcs == 1
os.unlink(f.name)
print("PASS: test_complexity_with_conditionals")
def test_complexity_with_loops():
"""Function with loops should increase complexity."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("""
def loop_func(items):
result = []
for item in items:
if item > 0:
result.append(item)
while len(result) > 10:
result.pop()
return result
""")
f.write("\ndef loop_func(items):\n result = []\n for item in items:\n if item > 0:\n result.append(item)\n while len(result) > 10:\n result.pop()\n return result\n")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
# Base 1 + 1 for + 1 if + 1 while = 4
assert max_c >= 3, f"Expected max_c >= 3, got {max_c}"
assert max_c >= 3
os.unlink(f.name)
print("PASS: test_complexity_with_loops")
def test_complexity_with_class():
"""Class with methods should count both."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("""
class MyClass:
def method1(self):
if True:
pass
def method2(self):
for i in range(10):
pass
""")
f.write("\nclass MyClass:\n def method1(self):\n if True:\n pass\n def method2(self):\n for i in range(10):\n pass\n")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
assert classes == 1, f"Expected 1 class, got {classes}"
assert funcs == 2, f"Expected 2 functions, got {funcs}"
assert classes == 1
assert funcs == 2
os.unlink(f.name)
print("PASS: test_complexity_with_class")
def test_complexity_syntax_error():
"""File with syntax error should return zeros."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("def broken(:\n pass")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
assert avg == 0.0, f"Expected 0.0, got {avg}"
assert funcs == 0, f"Expected 0, got {funcs}"
assert avg == 0.0
assert funcs == 0
os.unlink(f.name)
print("PASS: test_complexity_syntax_error")
def test_refactoring_score_high_complexity():
"""High complexity should give high score."""
metrics = FileMetrics(
path="test.py",
lines=200,
complexity=15.0,
max_complexity=25,
functions=10,
classes=2,
churn_30d=5,
churn_90d=15,
test_coverage=0.3,
refactoring_score=0.0
)
metrics = FileMetrics(path="test.py", lines=200, complexity=15.0, max_complexity=25, functions=10, classes=2, churn_30d=5, churn_90d=15, test_coverage=0.3, refactoring_score=0.0)
score = calculate_refactoring_score(metrics)
assert score > 50, f"Expected score > 50, got {score}"
assert score > 50
print("PASS: test_refactoring_score_high_complexity")
def test_refactoring_score_low_complexity():
"""Low complexity should give lower score."""
metrics = FileMetrics(
path="test.py",
lines=50,
complexity=2.0,
max_complexity=3,
functions=3,
classes=0,
churn_30d=0,
churn_90d=1,
test_coverage=0.9,
refactoring_score=0.0
)
metrics = FileMetrics(path="test.py", lines=50, complexity=2.0, max_complexity=3, functions=3, classes=0, churn_30d=0, churn_90d=1, test_coverage=0.9, refactoring_score=0.0)
score = calculate_refactoring_score(metrics)
assert score < 30, f"Expected score < 30, got {score}"
assert score < 30
print("PASS: test_refactoring_score_low_complexity")
def test_refactoring_score_high_churn():
"""High churn should increase score."""
metrics = FileMetrics(
path="test.py",
lines=100,
complexity=5.0,
max_complexity=8,
functions=5,
classes=0,
churn_30d=10,
churn_90d=20,
test_coverage=0.5,
refactoring_score=0.0
)
metrics = FileMetrics(path="test.py", lines=100, complexity=5.0, max_complexity=8, functions=5, classes=0, churn_30d=10, churn_90d=20, test_coverage=0.5, refactoring_score=0.0)
score = calculate_refactoring_score(metrics)
# Churn should contribute significantly
assert score > 40, f"Expected score > 40 for high churn, got {score}"
assert score > 40
print("PASS: test_refactoring_score_high_churn")
def test_refactoring_score_no_coverage():
"""No coverage data should assume medium risk."""
metrics = FileMetrics(
path="test.py",
lines=100,
complexity=5.0,
max_complexity=8,
functions=5,
classes=0,
churn_30d=1,
churn_90d=2,
test_coverage=None,
refactoring_score=0.0
)
metrics = FileMetrics(path="test.py", lines=100, complexity=5.0, max_complexity=8, functions=5, classes=0, churn_30d=1, churn_90d=2, test_coverage=None, refactoring_score=0.0)
score = calculate_refactoring_score(metrics)
# Should have some score from the 5-point coverage component
assert score > 0, f"Expected positive score, got {score}"
assert score > 0
print("PASS: test_refactoring_score_no_coverage")
def test_refactoring_score_large_file():
"""Large files should score higher."""
metrics_small = FileMetrics(
path="small.py",
lines=50,
complexity=5.0,
max_complexity=8,
functions=3,
classes=0,
churn_30d=1,
churn_90d=2,
test_coverage=0.8,
refactoring_score=0.0
)
metrics_large = FileMetrics(
path="large.py",
lines=1000,
complexity=5.0,
max_complexity=8,
functions=3,
classes=0,
churn_30d=1,
churn_90d=2,
test_coverage=0.8,
refactoring_score=0.0
)
metrics_small = FileMetrics(path="small.py", lines=50, complexity=5.0, max_complexity=8, functions=3, classes=0, churn_30d=1, churn_90d=2, test_coverage=0.8, refactoring_score=0.0)
metrics_large = FileMetrics(path="large.py", lines=1000, complexity=5.0, max_complexity=8, functions=3, classes=0, churn_30d=1, churn_90d=2, test_coverage=0.8, refactoring_score=0.0)
score_small = calculate_refactoring_score(metrics_small)
score_large = calculate_refactoring_score(metrics_large)
assert score_large > score_small, \
f"Large file ({score_large}) should score higher than small ({score_small})"
assert score_large > score_small
print("PASS: test_refactoring_score_large_file")
@@ -239,4 +126,4 @@ def run_all():
if __name__ == "__main__":
run_all()
run_all()