Compare commits

...

2 Commits

Author SHA1 Message Date
d7cfda8f03 test: sync refactoring opportunity finder tests (#210)
Some checks failed
Test / pytest (pull_request) Failing after 49s
2026-04-21 11:30:59 +00:00
b172e720e4 feat: implement refactoring opportunity finder — AST complexity + scoring (#210) 2026-04-21 11:30:58 +00:00
2 changed files with 147 additions and 170 deletions

View File

@@ -1,54 +1,144 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Finds refactoring opportunities in codebases Refactoring Opportunity Finder — Engine 10.4
Engine ID: 10.4
Usage:
python3 scripts/refactoring_opportunity_finder.py --output proposals/refactoring_opportunity_finder.json
python3 scripts/refactoring_opportunity_finder.py --output proposals/refactoring_opportunity_finder.json --dry-run
""" """
import argparse import argparse
import ast
import json import json
import os
import sys import sys
from dataclasses import dataclass, asdict
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import List, Optional, Tuple
def generate_proposals(): @dataclass
"""Generate sample proposals for this engine.""" class FileMetrics:
# TODO: Implement actual proposal generation logic path: str
return [ lines: int
{ complexity: float
"title": f"Sample improvement from 10.4", max_complexity: int
"description": "This is a sample improvement proposal", functions: int
"impact": 5, classes: int
"effort": 3, churn_30d: int = 0
"category": "improvement", churn_90d: int = 0
"source_engine": "10.4", test_coverage: Optional[float] = None
"timestamp": datetime.now(timezone.utc).isoformat() refactoring_score: float = 0.0
}
]
class ComplexityVisitor(ast.NodeVisitor):
def __init__(self):
self.functions = []
self.classes = 0
def visit_FunctionDef(self, node):
complexity = 1
for child in ast.walk(node):
if isinstance(child, (ast.If, ast.While, ast.For)):
complexity += 1
elif isinstance(child, ast.BoolOp):
complexity += len(child.values) - 1
elif isinstance(child, ast.ExceptHandler):
complexity += 1
elif isinstance(child, ast.Assert):
complexity += 1
self.functions.append((node.name, complexity, node.lineno))
self.generic_visit(node)
def visit_AsyncFunctionDef(self, node):
self.visit_FunctionDef(node)
def visit_ClassDef(self, node):
self.classes += 1
self.generic_visit(node)
def compute_file_complexity(filepath):
try:
with open(filepath) as f:
source = f.read()
except (FileNotFoundError, IsADirectoryError, PermissionError):
return (0.0, 0, 0, 0, 0)
try:
tree = ast.parse(source)
except SyntaxError:
return (0.0, 0, 0, 0, 0)
lines = source.count("\n") + 1 if source.strip() else 0
visitor = ComplexityVisitor()
visitor.visit(tree)
if not visitor.functions:
return (0.0, 0, 0, visitor.classes, lines)
complexities = [c for _, c, _ in visitor.functions]
avg = sum(complexities) / len(complexities)
max_c = max(complexities)
return (round(avg, 1), max_c, len(visitor.functions), visitor.classes, lines)
def calculate_refactoring_score(metrics):
score = 0.0
complexity_score = min(40, metrics.complexity * 4)
if metrics.max_complexity > 10:
complexity_score = min(40, complexity_score + (metrics.max_complexity - 10))
score += complexity_score
if metrics.lines <= 0:
pass
elif metrics.lines <= 100:
score += metrics.lines * 0.1
elif metrics.lines <= 500:
score += 10 + (metrics.lines - 100) * 0.0125
else:
score += min(20, 15 + (metrics.lines - 500) * 0.01)
churn_score = (metrics.churn_30d * 2) + (metrics.churn_90d * 0.5)
score += min(30, churn_score)
if metrics.test_coverage is None:
score += 5
elif metrics.test_coverage < 0.3:
score += 10
elif metrics.test_coverage < 0.6:
score += 7
elif metrics.test_coverage < 0.8:
score += 4
else:
score += 1
return round(min(100, max(0, score)), 1)
def generate_proposals(repo_path=".", threshold=30.0):
proposals = []
for root, dirs, files in os.walk(repo_path):
dirs[:] = [d for d in dirs if not d.startswith((".", "__pycache__", "node_modules", ".git", "venv"))]
for fname in files:
if not fname.endswith(".py"):
continue
filepath = os.path.join(root, fname)
relpath = os.path.relpath(filepath, repo_path)
avg, max_c, funcs, classes, lines = compute_file_complexity(filepath)
if funcs == 0 and classes == 0:
continue
metrics = FileMetrics(path=relpath, lines=lines, complexity=avg, max_complexity=max_c, functions=funcs, classes=classes)
score = calculate_refactoring_score(metrics)
metrics.refactoring_score = score
if score >= threshold:
proposals.append({"title": f"Refactor {relpath} (score: {score})", "impact": min(10, int(score / 10)), "effort": min(10, max(1, funcs // 3)), "category": "refactoring", "source_engine": "10.4", "timestamp": datetime.now(timezone.utc).isoformat(), "metrics": asdict(metrics)})
return sorted(proposals, key=lambda p: p.get("metrics", {}).get("refactoring_score", 0), reverse=True)
def main(): def main():
parser = argparse.ArgumentParser(description="Finds refactoring opportunities in codebases") parser = argparse.ArgumentParser(description="Find refactoring opportunities")
parser.add_argument("--output", required=True, help="Output file for proposals") parser.add_argument("--repo", default=".")
parser.add_argument("--dry-run", action="store_true", help="Don't write output file") parser.add_argument("--output", required=True)
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--threshold", type=float, default=30.0)
args = parser.parse_args() args = parser.parse_args()
proposals = generate_proposals(args.repo, args.threshold)
proposals = generate_proposals()
if not args.dry_run: if not args.dry_run:
with open(args.output, "w") as f: with open(args.output, "w") as f:
json.dump({"proposals": proposals}, f, indent=2) json.dump({"proposals": proposals}, f, indent=2)
print(f"Generated {len(proposals)} proposals -> {args.output}") print(f"Generated {len(proposals)} proposals -> {args.output}")
else: else:
print(f"Would generate {len(proposals)} proposals")
for p in proposals: for p in proposals:
print(f" - {p['title']}") print(f" - {p['title']}")
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -19,208 +19,95 @@ FileMetrics = mod.FileMetrics
def test_complexity_simple_function(): def test_complexity_simple_function():
"""Simple function should have low complexity."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(""" f.write("\ndef simple():\n return 42\n")
def simple():
return 42
""")
f.flush() f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name) avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
assert avg == 1.0, f"Expected 1.0, got {avg}" assert avg == 1.0
assert max_c == 1, f"Expected 1, got {max_c}" assert max_c == 1
assert funcs == 1, f"Expected 1, got {funcs}" assert funcs == 1
assert classes == 0, f"Expected 0, got {classes}" assert classes == 0
os.unlink(f.name) os.unlink(f.name)
print("PASS: test_complexity_simple_function") print("PASS: test_complexity_simple_function")
def test_complexity_with_conditionals(): def test_complexity_with_conditionals():
"""Function with if/else should have higher complexity."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(""" f.write("\ndef complex_func(x):\n if x > 0:\n if x > 10:\n return 'big'\n else:\n return 'small'\n elif x < 0:\n return 'negative'\n else:\n return 'zero'\n")
def complex_func(x):
if x > 0:
if x > 10:
return "big"
else:
return "small"
elif x < 0:
return "negative"
else:
return "zero"
""")
f.flush() f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name) avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
# Base 1 + 3 if/elif + 1 nested if = 5 assert max_c >= 4
assert max_c >= 4, f"Expected max_c >= 4, got {max_c}" assert funcs == 1
assert funcs == 1, f"Expected 1, got {funcs}"
os.unlink(f.name) os.unlink(f.name)
print("PASS: test_complexity_with_conditionals") print("PASS: test_complexity_with_conditionals")
def test_complexity_with_loops(): def test_complexity_with_loops():
"""Function with loops should increase complexity."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(""" f.write("\ndef loop_func(items):\n result = []\n for item in items:\n if item > 0:\n result.append(item)\n while len(result) > 10:\n result.pop()\n return result\n")
def loop_func(items):
result = []
for item in items:
if item > 0:
result.append(item)
while len(result) > 10:
result.pop()
return result
""")
f.flush() f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name) avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
# Base 1 + 1 for + 1 if + 1 while = 4 assert max_c >= 3
assert max_c >= 3, f"Expected max_c >= 3, got {max_c}"
os.unlink(f.name) os.unlink(f.name)
print("PASS: test_complexity_with_loops") print("PASS: test_complexity_with_loops")
def test_complexity_with_class(): def test_complexity_with_class():
"""Class with methods should count both."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(""" f.write("\nclass MyClass:\n def method1(self):\n if True:\n pass\n def method2(self):\n for i in range(10):\n pass\n")
class MyClass:
def method1(self):
if True:
pass
def method2(self):
for i in range(10):
pass
""")
f.flush() f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name) avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
assert classes == 1, f"Expected 1 class, got {classes}" assert classes == 1
assert funcs == 2, f"Expected 2 functions, got {funcs}" assert funcs == 2
os.unlink(f.name) os.unlink(f.name)
print("PASS: test_complexity_with_class") print("PASS: test_complexity_with_class")
def test_complexity_syntax_error(): def test_complexity_syntax_error():
"""File with syntax error should return zeros."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("def broken(:\n pass") f.write("def broken(:\n pass")
f.flush() f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name) avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
assert avg == 0.0, f"Expected 0.0, got {avg}" assert avg == 0.0
assert funcs == 0, f"Expected 0, got {funcs}" assert funcs == 0
os.unlink(f.name) os.unlink(f.name)
print("PASS: test_complexity_syntax_error") print("PASS: test_complexity_syntax_error")
def test_refactoring_score_high_complexity(): def test_refactoring_score_high_complexity():
"""High complexity should give high score.""" metrics = FileMetrics(path="test.py", lines=200, complexity=15.0, max_complexity=25, functions=10, classes=2, churn_30d=5, churn_90d=15, test_coverage=0.3, refactoring_score=0.0)
metrics = FileMetrics(
path="test.py",
lines=200,
complexity=15.0,
max_complexity=25,
functions=10,
classes=2,
churn_30d=5,
churn_90d=15,
test_coverage=0.3,
refactoring_score=0.0
)
score = calculate_refactoring_score(metrics) score = calculate_refactoring_score(metrics)
assert score > 50, f"Expected score > 50, got {score}" assert score > 50
print("PASS: test_refactoring_score_high_complexity") print("PASS: test_refactoring_score_high_complexity")
def test_refactoring_score_low_complexity(): def test_refactoring_score_low_complexity():
"""Low complexity should give lower score.""" metrics = FileMetrics(path="test.py", lines=50, complexity=2.0, max_complexity=3, functions=3, classes=0, churn_30d=0, churn_90d=1, test_coverage=0.9, refactoring_score=0.0)
metrics = FileMetrics(
path="test.py",
lines=50,
complexity=2.0,
max_complexity=3,
functions=3,
classes=0,
churn_30d=0,
churn_90d=1,
test_coverage=0.9,
refactoring_score=0.0
)
score = calculate_refactoring_score(metrics) score = calculate_refactoring_score(metrics)
assert score < 30, f"Expected score < 30, got {score}" assert score < 30
print("PASS: test_refactoring_score_low_complexity") print("PASS: test_refactoring_score_low_complexity")
def test_refactoring_score_high_churn(): def test_refactoring_score_high_churn():
"""High churn should increase score.""" metrics = FileMetrics(path="test.py", lines=100, complexity=5.0, max_complexity=8, functions=5, classes=0, churn_30d=10, churn_90d=20, test_coverage=0.5, refactoring_score=0.0)
metrics = FileMetrics(
path="test.py",
lines=100,
complexity=5.0,
max_complexity=8,
functions=5,
classes=0,
churn_30d=10,
churn_90d=20,
test_coverage=0.5,
refactoring_score=0.0
)
score = calculate_refactoring_score(metrics) score = calculate_refactoring_score(metrics)
# Churn should contribute significantly assert score > 40
assert score > 40, f"Expected score > 40 for high churn, got {score}"
print("PASS: test_refactoring_score_high_churn") print("PASS: test_refactoring_score_high_churn")
def test_refactoring_score_no_coverage(): def test_refactoring_score_no_coverage():
"""No coverage data should assume medium risk.""" metrics = FileMetrics(path="test.py", lines=100, complexity=5.0, max_complexity=8, functions=5, classes=0, churn_30d=1, churn_90d=2, test_coverage=None, refactoring_score=0.0)
metrics = FileMetrics(
path="test.py",
lines=100,
complexity=5.0,
max_complexity=8,
functions=5,
classes=0,
churn_30d=1,
churn_90d=2,
test_coverage=None,
refactoring_score=0.0
)
score = calculate_refactoring_score(metrics) score = calculate_refactoring_score(metrics)
# Should have some score from the 5-point coverage component assert score > 0
assert score > 0, f"Expected positive score, got {score}"
print("PASS: test_refactoring_score_no_coverage") print("PASS: test_refactoring_score_no_coverage")
def test_refactoring_score_large_file(): def test_refactoring_score_large_file():
"""Large files should score higher.""" metrics_small = FileMetrics(path="small.py", lines=50, complexity=5.0, max_complexity=8, functions=3, classes=0, churn_30d=1, churn_90d=2, test_coverage=0.8, refactoring_score=0.0)
metrics_small = FileMetrics( metrics_large = FileMetrics(path="large.py", lines=1000, complexity=5.0, max_complexity=8, functions=3, classes=0, churn_30d=1, churn_90d=2, test_coverage=0.8, refactoring_score=0.0)
path="small.py",
lines=50,
complexity=5.0,
max_complexity=8,
functions=3,
classes=0,
churn_30d=1,
churn_90d=2,
test_coverage=0.8,
refactoring_score=0.0
)
metrics_large = FileMetrics(
path="large.py",
lines=1000,
complexity=5.0,
max_complexity=8,
functions=3,
classes=0,
churn_30d=1,
churn_90d=2,
test_coverage=0.8,
refactoring_score=0.0
)
score_small = calculate_refactoring_score(metrics_small) score_small = calculate_refactoring_score(metrics_small)
score_large = calculate_refactoring_score(metrics_large) score_large = calculate_refactoring_score(metrics_large)
assert score_large > score_small, \ assert score_large > score_small
f"Large file ({score_large}) should score higher than small ({score_small})"
print("PASS: test_refactoring_score_large_file") print("PASS: test_refactoring_score_large_file")
@@ -239,4 +126,4 @@ def run_all():
if __name__ == "__main__": if __name__ == "__main__":
run_all() run_all()