Compare commits

..

1 Commits

Author SHA1 Message Date
00d97242b8 feat: implement refactoring_opportunity_finder API (#210)
Some checks failed
Test / pytest (pull_request) Failing after 28s
- compute_file_complexity(): AST-based cyclomatic complexity
- calculate_refactoring_score(): weighted scoring (complexity, size, churn, coverage)
- FileMetrics dataclass with all expected fields
- generate_proposals(): scan codebase and rank files

Closes #210
2026-04-21 11:32:10 +00:00
2 changed files with 327 additions and 108 deletions

View File

@@ -1,6 +1,16 @@
#!/usr/bin/env python3
"""
Refactoring Opportunity Finder — Engine 10.4
Refactoring Opportunity Finder
Analyzes Python codebases for refactoring opportunities based on:
- Cyclomatic complexity
- File size and churn
- Test coverage
- Class/function counts
Usage:
python3 scripts/refactoring_opportunity_finder.py --root . --output proposals.json
python3 scripts/refactoring_opportunity_finder.py --root . --output proposals.json --dry-run
"""
import argparse
@@ -8,13 +18,15 @@ import ast
import json
import os
import sys
from dataclasses import dataclass, asdict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Optional, Tuple
@dataclass
class FileMetrics:
"""Metrics for a single file."""
path: str
lines: int
complexity: float
@@ -27,118 +39,212 @@ class FileMetrics:
refactoring_score: float = 0.0
class ComplexityVisitor(ast.NodeVisitor):
def __init__(self):
self.functions = []
self.classes = 0
def visit_FunctionDef(self, node):
complexity = 1
for child in ast.walk(node):
if isinstance(child, (ast.If, ast.While, ast.For)):
complexity += 1
elif isinstance(child, ast.BoolOp):
complexity += len(child.values) - 1
elif isinstance(child, ast.ExceptHandler):
complexity += 1
elif isinstance(child, ast.Assert):
complexity += 1
self.functions.append((node.name, complexity, node.lineno))
self.generic_visit(node)
def visit_AsyncFunctionDef(self, node):
self.visit_FunctionDef(node)
def visit_ClassDef(self, node):
self.classes += 1
self.generic_visit(node)
def _compute_function_complexity(node: ast.FunctionDef) -> int:
"""Compute cyclomatic complexity of a single function."""
complexity = 1 # Base complexity
for child in ast.walk(node):
if isinstance(child, (ast.If, ast.While, ast.For)):
complexity += 1
elif isinstance(child, ast.BoolOp):
# and/or add complexity for each additional value
complexity += len(child.values) - 1
elif isinstance(child, ast.ExceptHandler):
complexity += 1
elif isinstance(child, ast.Assert):
complexity += 1
elif isinstance(child, ast.comprehension):
complexity += 1
complexity += len(child.ifs)
return complexity
def compute_file_complexity(filepath):
def compute_file_complexity(filepath: str) -> Tuple[float, int, int, int, int]:
"""
Compute complexity metrics for a Python file.
Returns:
(avg_complexity, max_complexity, function_count, class_count, line_count)
"""
try:
with open(filepath) as f:
with open(filepath, "r", encoding="utf-8", errors="replace") as f:
source = f.read()
except (FileNotFoundError, IsADirectoryError, PermissionError):
return (0.0, 0, 0, 0, 0)
except (OSError, IOError):
return 0.0, 0, 0, 0, 0
lines = source.count("\n") + 1
try:
tree = ast.parse(source)
tree = ast.parse(source, filename=filepath)
except SyntaxError:
return (0.0, 0, 0, 0, 0)
lines = source.count("\n") + 1 if source.strip() else 0
visitor = ComplexityVisitor()
visitor.visit(tree)
if not visitor.functions:
return (0.0, 0, 0, visitor.classes, lines)
complexities = [c for _, c, _ in visitor.functions]
return 0.0, 0, 0, 0, lines
functions = []
classes = []
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
classes.append(node)
elif isinstance(node, ast.FunctionDef):
functions.append(node)
if not functions:
return 0.0, 0, len(functions), len(classes), lines
complexities = [_compute_function_complexity(fn) for fn in functions]
avg = sum(complexities) / len(complexities)
max_c = max(complexities)
return (round(avg, 1), max_c, len(visitor.functions), visitor.classes, lines)
max_c = max(complexities) if complexities else 0
return round(avg, 2), max_c, len(functions), len(classes), lines
def calculate_refactoring_score(metrics):
def calculate_refactoring_score(metrics: FileMetrics) -> float:
"""
Calculate a refactoring priority score (0-100) based on metrics.
Higher score = more urgent refactoring candidate.
Components:
- Complexity (0-30): weighted by avg and max complexity
- Size (0-20): larger files score higher
- Churn (0-25): frequently changed files score higher
- Coverage (0-15): low/no coverage scores higher
- Density (0-10): many functions/classes in small space
"""
import math
score = 0.0
complexity_score = min(40, metrics.complexity * 4)
# Complexity component (0-30)
# avg=5 -> ~10, avg=10 -> ~20, avg=15+ -> ~30
complexity_score = min(30, metrics.complexity * 2)
# Bonus for high max complexity
if metrics.max_complexity > 10:
complexity_score = min(40, complexity_score + (metrics.max_complexity - 10))
complexity_score = min(30, complexity_score + (metrics.max_complexity - 10))
score += complexity_score
if metrics.lines <= 0:
pass
elif metrics.lines <= 100:
score += metrics.lines * 0.1
elif metrics.lines <= 500:
score += 10 + (metrics.lines - 100) * 0.0125
# Size component (0-20)
# 50 lines -> ~2, 200 lines -> ~8, 500 lines -> ~15, 1000+ -> ~20
if metrics.lines > 0:
size_score = min(20, math.log2(max(1, metrics.lines)) * 2.5)
else:
score += min(20, 15 + (metrics.lines - 500) * 0.01)
churn_score = (metrics.churn_30d * 2) + (metrics.churn_90d * 0.5)
score += min(30, churn_score)
size_score = 0
score += size_score
# Churn component (0-25)
# Weighted combination of 30d and 90d churn
churn_score = min(25, (metrics.churn_30d * 1.5) + (metrics.churn_90d * 0.5))
score += churn_score
# Coverage component (0-15)
# Low coverage = higher score
if metrics.test_coverage is None:
# No data -> assume medium risk
score += 5
elif metrics.test_coverage < 0.3:
score += 15
elif metrics.test_coverage < 0.5:
score += 10
elif metrics.test_coverage < 0.6:
score += 7
elif metrics.test_coverage < 0.8:
score += 4
score += 5
# else: good coverage, no penalty
# Density component (0-10)
# Many functions/classes packed into small space
if metrics.lines > 0:
density = (metrics.functions + metrics.classes * 3) / (metrics.lines / 100)
density_score = min(10, density * 2)
else:
score += 1
return round(min(100, max(0, score)), 1)
density_score = 0
score += density_score
return round(min(100, max(0, score)), 2)
def generate_proposals(repo_path=".", threshold=30.0):
def analyze_file(filepath: str, root: str = ".") -> Optional[FileMetrics]:
"""Analyze a single Python file and return metrics."""
try:
rel_path = os.path.relpath(filepath, root)
except ValueError:
rel_path = filepath
avg, max_c, funcs, classes, lines = compute_file_complexity(filepath)
metrics = FileMetrics(
path=rel_path,
lines=lines,
complexity=avg,
max_complexity=max_c,
functions=funcs,
classes=classes,
)
metrics.refactoring_score = calculate_refactoring_score(metrics)
return metrics
def find_python_files(root: str) -> List[str]:
"""Find all Python files under root, excluding common non-source dirs."""
skip_dirs = {".git", "__pycache__", ".tox", ".eggs", "node_modules", ".venv", "venv", "env"}
files = []
for dirpath, dirnames, filenames in os.walk(root):
dirnames[:] = [d for d in dirnames if d not in skip_dirs]
for fn in filenames:
if fn.endswith(".py"):
files.append(os.path.join(dirpath, fn))
return sorted(files)
def generate_proposals(root: str = ".", min_score: float = 30.0) -> List[dict]:
"""Generate refactoring proposals for the codebase."""
files = find_python_files(root)
proposals = []
for root, dirs, files in os.walk(repo_path):
dirs[:] = [d for d in dirs if not d.startswith((".", "__pycache__", "node_modules", ".git", "venv"))]
for fname in files:
if not fname.endswith(".py"):
continue
filepath = os.path.join(root, fname)
relpath = os.path.relpath(filepath, repo_path)
avg, max_c, funcs, classes, lines = compute_file_complexity(filepath)
if funcs == 0 and classes == 0:
continue
metrics = FileMetrics(path=relpath, lines=lines, complexity=avg, max_complexity=max_c, functions=funcs, classes=classes)
score = calculate_refactoring_score(metrics)
metrics.refactoring_score = score
if score >= threshold:
proposals.append({"title": f"Refactor {relpath} (score: {score})", "impact": min(10, int(score / 10)), "effort": min(10, max(1, funcs // 3)), "category": "refactoring", "source_engine": "10.4", "timestamp": datetime.now(timezone.utc).isoformat(), "metrics": asdict(metrics)})
return sorted(proposals, key=lambda p: p.get("metrics", {}).get("refactoring_score", 0), reverse=True)
for filepath in files:
metrics = analyze_file(filepath, root)
if metrics and metrics.refactoring_score >= min_score:
proposals.append({
"title": f"Refactor {metrics.path} (score: {metrics.refactoring_score})",
"description": (
f"File has complexity avg={metrics.complexity:.1f} max={metrics.max_complexity}, "
f"{metrics.functions} functions, {metrics.classes} classes, {metrics.lines} lines."
),
"impact": min(10, int(metrics.refactoring_score / 10)),
"effort": min(10, max(1, int(metrics.complexity / 2))),
"category": "refactoring",
"source_engine": "refactoring_opportunity_finder",
"timestamp": datetime.now(timezone.utc).isoformat(),
"metrics": {
"path": metrics.path,
"complexity": metrics.complexity,
"max_complexity": metrics.max_complexity,
"lines": metrics.lines,
"refactoring_score": metrics.refactoring_score,
}
})
# Sort by score descending
proposals.sort(key=lambda p: p.get("metrics", {}).get("refactoring_score", 0), reverse=True)
return proposals
def main():
parser = argparse.ArgumentParser(description="Find refactoring opportunities")
parser.add_argument("--repo", default=".")
parser.add_argument("--output", required=True)
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--threshold", type=float, default=30.0)
parser.add_argument("--root", default=".", help="Root directory to scan")
parser.add_argument("--output", required=True, help="Output file for proposals")
parser.add_argument("--dry-run", action="store_true", help="Don't write output file")
parser.add_argument("--min-score", type=float, default=30.0, help="Minimum score threshold")
args = parser.parse_args()
proposals = generate_proposals(args.repo, args.threshold)
proposals = generate_proposals(args.root, args.min_score)
if not args.dry_run:
with open(args.output, "w") as f:
json.dump({"proposals": proposals}, f, indent=2)
print(f"Generated {len(proposals)} proposals -> {args.output}")
else:
for p in proposals:
print(f"Would generate {len(proposals)} proposals")
for p in proposals[:10]:
print(f" - {p['title']}")
if __name__ == "__main__":
main()

View File

@@ -19,95 +19,208 @@ FileMetrics = mod.FileMetrics
def test_complexity_simple_function():
"""Simple function should have low complexity."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("\ndef simple():\n return 42\n")
f.write("""
def simple():
return 42
""")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
assert avg == 1.0
assert max_c == 1
assert funcs == 1
assert classes == 0
assert avg == 1.0, f"Expected 1.0, got {avg}"
assert max_c == 1, f"Expected 1, got {max_c}"
assert funcs == 1, f"Expected 1, got {funcs}"
assert classes == 0, f"Expected 0, got {classes}"
os.unlink(f.name)
print("PASS: test_complexity_simple_function")
def test_complexity_with_conditionals():
"""Function with if/else should have higher complexity."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("\ndef complex_func(x):\n if x > 0:\n if x > 10:\n return 'big'\n else:\n return 'small'\n elif x < 0:\n return 'negative'\n else:\n return 'zero'\n")
f.write("""
def complex_func(x):
if x > 0:
if x > 10:
return "big"
else:
return "small"
elif x < 0:
return "negative"
else:
return "zero"
""")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
assert max_c >= 4
assert funcs == 1
# Base 1 + 3 if/elif + 1 nested if = 5
assert max_c >= 4, f"Expected max_c >= 4, got {max_c}"
assert funcs == 1, f"Expected 1, got {funcs}"
os.unlink(f.name)
print("PASS: test_complexity_with_conditionals")
def test_complexity_with_loops():
"""Function with loops should increase complexity."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("\ndef loop_func(items):\n result = []\n for item in items:\n if item > 0:\n result.append(item)\n while len(result) > 10:\n result.pop()\n return result\n")
f.write("""
def loop_func(items):
result = []
for item in items:
if item > 0:
result.append(item)
while len(result) > 10:
result.pop()
return result
""")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
assert max_c >= 3
# Base 1 + 1 for + 1 if + 1 while = 4
assert max_c >= 3, f"Expected max_c >= 3, got {max_c}"
os.unlink(f.name)
print("PASS: test_complexity_with_loops")
def test_complexity_with_class():
"""Class with methods should count both."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("\nclass MyClass:\n def method1(self):\n if True:\n pass\n def method2(self):\n for i in range(10):\n pass\n")
f.write("""
class MyClass:
def method1(self):
if True:
pass
def method2(self):
for i in range(10):
pass
""")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
assert classes == 1
assert funcs == 2
assert classes == 1, f"Expected 1 class, got {classes}"
assert funcs == 2, f"Expected 2 functions, got {funcs}"
os.unlink(f.name)
print("PASS: test_complexity_with_class")
def test_complexity_syntax_error():
"""File with syntax error should return zeros."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("def broken(:\n pass")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
assert avg == 0.0
assert funcs == 0
assert avg == 0.0, f"Expected 0.0, got {avg}"
assert funcs == 0, f"Expected 0, got {funcs}"
os.unlink(f.name)
print("PASS: test_complexity_syntax_error")
def test_refactoring_score_high_complexity():
metrics = FileMetrics(path="test.py", lines=200, complexity=15.0, max_complexity=25, functions=10, classes=2, churn_30d=5, churn_90d=15, test_coverage=0.3, refactoring_score=0.0)
"""High complexity should give high score."""
metrics = FileMetrics(
path="test.py",
lines=200,
complexity=15.0,
max_complexity=25,
functions=10,
classes=2,
churn_30d=5,
churn_90d=15,
test_coverage=0.3,
refactoring_score=0.0
)
score = calculate_refactoring_score(metrics)
assert score > 50
assert score > 50, f"Expected score > 50, got {score}"
print("PASS: test_refactoring_score_high_complexity")
def test_refactoring_score_low_complexity():
metrics = FileMetrics(path="test.py", lines=50, complexity=2.0, max_complexity=3, functions=3, classes=0, churn_30d=0, churn_90d=1, test_coverage=0.9, refactoring_score=0.0)
"""Low complexity should give lower score."""
metrics = FileMetrics(
path="test.py",
lines=50,
complexity=2.0,
max_complexity=3,
functions=3,
classes=0,
churn_30d=0,
churn_90d=1,
test_coverage=0.9,
refactoring_score=0.0
)
score = calculate_refactoring_score(metrics)
assert score < 30
assert score < 30, f"Expected score < 30, got {score}"
print("PASS: test_refactoring_score_low_complexity")
def test_refactoring_score_high_churn():
metrics = FileMetrics(path="test.py", lines=100, complexity=5.0, max_complexity=8, functions=5, classes=0, churn_30d=10, churn_90d=20, test_coverage=0.5, refactoring_score=0.0)
"""High churn should increase score."""
metrics = FileMetrics(
path="test.py",
lines=100,
complexity=5.0,
max_complexity=8,
functions=5,
classes=0,
churn_30d=10,
churn_90d=20,
test_coverage=0.5,
refactoring_score=0.0
)
score = calculate_refactoring_score(metrics)
assert score > 40
# Churn should contribute significantly
assert score > 40, f"Expected score > 40 for high churn, got {score}"
print("PASS: test_refactoring_score_high_churn")
def test_refactoring_score_no_coverage():
metrics = FileMetrics(path="test.py", lines=100, complexity=5.0, max_complexity=8, functions=5, classes=0, churn_30d=1, churn_90d=2, test_coverage=None, refactoring_score=0.0)
"""No coverage data should assume medium risk."""
metrics = FileMetrics(
path="test.py",
lines=100,
complexity=5.0,
max_complexity=8,
functions=5,
classes=0,
churn_30d=1,
churn_90d=2,
test_coverage=None,
refactoring_score=0.0
)
score = calculate_refactoring_score(metrics)
assert score > 0
# Should have some score from the 5-point coverage component
assert score > 0, f"Expected positive score, got {score}"
print("PASS: test_refactoring_score_no_coverage")
def test_refactoring_score_large_file():
metrics_small = FileMetrics(path="small.py", lines=50, complexity=5.0, max_complexity=8, functions=3, classes=0, churn_30d=1, churn_90d=2, test_coverage=0.8, refactoring_score=0.0)
metrics_large = FileMetrics(path="large.py", lines=1000, complexity=5.0, max_complexity=8, functions=3, classes=0, churn_30d=1, churn_90d=2, test_coverage=0.8, refactoring_score=0.0)
"""Large files should score higher."""
metrics_small = FileMetrics(
path="small.py",
lines=50,
complexity=5.0,
max_complexity=8,
functions=3,
classes=0,
churn_30d=1,
churn_90d=2,
test_coverage=0.8,
refactoring_score=0.0
)
metrics_large = FileMetrics(
path="large.py",
lines=1000,
complexity=5.0,
max_complexity=8,
functions=3,
classes=0,
churn_30d=1,
churn_90d=2,
test_coverage=0.8,
refactoring_score=0.0
)
score_small = calculate_refactoring_score(metrics_small)
score_large = calculate_refactoring_score(metrics_large)
assert score_large > score_small
assert score_large > score_small, \
f"Large file ({score_large}) should score higher than small ({score_small})"
print("PASS: test_refactoring_score_large_file")
@@ -126,4 +239,4 @@ def run_all():
if __name__ == "__main__":
run_all()
run_all()