|
|
|
|
@@ -9,18 +9,16 @@ Usage:
|
|
|
|
|
python3 scripts/refactoring_opportunity_finder.py --output proposals/refactoring_opportunity_finder.json --dry-run
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import argparse
|
|
|
|
|
import ast
|
|
|
|
|
import argparse
|
|
|
|
|
import json
|
|
|
|
|
import os
|
|
|
|
|
import sys
|
|
|
|
|
from dataclasses import dataclass, field
|
|
|
|
|
from dataclasses import dataclass
|
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
|
from typing import Optional, Tuple
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── Data Classes ────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class FileMetrics:
|
|
|
|
|
"""Metrics for a single source file."""
|
|
|
|
|
@@ -36,247 +34,114 @@ class FileMetrics:
|
|
|
|
|
refactoring_score: float = 0.0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── Complexity Analysis ─────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
class ComplexityVisitor(ast.NodeVisitor):
|
|
|
|
|
"""AST visitor that computes cyclomatic complexity per function."""
|
|
|
|
|
class _ComplexityVisitor(ast.NodeVisitor):
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.complexities = []
|
|
|
|
|
self.function_count = 0
|
|
|
|
|
self.class_count = 0
|
|
|
|
|
self._current_complexity = 0
|
|
|
|
|
self._in_function = False
|
|
|
|
|
self.functions = []
|
|
|
|
|
self.classes = 0
|
|
|
|
|
|
|
|
|
|
def visit_FunctionDef(self, node):
|
|
|
|
|
self.function_count += 1
|
|
|
|
|
old_complexity = self._current_complexity
|
|
|
|
|
old_in_function = self._in_function
|
|
|
|
|
self._current_complexity = 1 # Base complexity
|
|
|
|
|
self._in_function = True
|
|
|
|
|
|
|
|
|
|
complexity = 1
|
|
|
|
|
for child in ast.walk(node):
|
|
|
|
|
if isinstance(child, (ast.If, ast.While, ast.For, ast.ExceptHandler)):
|
|
|
|
|
complexity += 1
|
|
|
|
|
elif isinstance(child, ast.BoolOp):
|
|
|
|
|
complexity += len(child.values) - 1
|
|
|
|
|
elif isinstance(child, ast.comprehension):
|
|
|
|
|
complexity += 1
|
|
|
|
|
for _ in child.ifs:
|
|
|
|
|
complexity += 1
|
|
|
|
|
self.functions.append((node.name, complexity))
|
|
|
|
|
self.generic_visit(node)
|
|
|
|
|
|
|
|
|
|
self.complexities.append(self._current_complexity)
|
|
|
|
|
self._current_complexity = old_complexity
|
|
|
|
|
self._in_function = old_in_function
|
|
|
|
|
|
|
|
|
|
visit_AsyncFunctionDef = visit_FunctionDef
|
|
|
|
|
def visit_AsyncFunctionDef(self, node):
|
|
|
|
|
self.visit_FunctionDef(node)
|
|
|
|
|
|
|
|
|
|
def visit_ClassDef(self, node):
|
|
|
|
|
self.class_count += 1
|
|
|
|
|
self.generic_visit(node)
|
|
|
|
|
|
|
|
|
|
def visit_If(self, node):
|
|
|
|
|
if self._in_function:
|
|
|
|
|
self._current_complexity += 1
|
|
|
|
|
self.generic_visit(node)
|
|
|
|
|
|
|
|
|
|
def visit_For(self, node):
|
|
|
|
|
if self._in_function:
|
|
|
|
|
self._current_complexity += 1
|
|
|
|
|
self.generic_visit(node)
|
|
|
|
|
|
|
|
|
|
visit_AsyncFor = visit_For
|
|
|
|
|
|
|
|
|
|
def visit_While(self, node):
|
|
|
|
|
if self._in_function:
|
|
|
|
|
self._current_complexity += 1
|
|
|
|
|
self.generic_visit(node)
|
|
|
|
|
|
|
|
|
|
def visit_ExceptHandler(self, node):
|
|
|
|
|
if self._in_function:
|
|
|
|
|
self._current_complexity += 1
|
|
|
|
|
self.generic_visit(node)
|
|
|
|
|
|
|
|
|
|
def visit_With(self, node):
|
|
|
|
|
if self._in_function:
|
|
|
|
|
self._current_complexity += 1
|
|
|
|
|
self.generic_visit(node)
|
|
|
|
|
|
|
|
|
|
visit_AsyncWith = visit_With
|
|
|
|
|
|
|
|
|
|
def visit_Assert(self, node):
|
|
|
|
|
if self._in_function:
|
|
|
|
|
self._current_complexity += 1
|
|
|
|
|
self.generic_visit(node)
|
|
|
|
|
|
|
|
|
|
def visit_BoolOp(self, node):
|
|
|
|
|
# Each 'and'/'or' adds a branch
|
|
|
|
|
if self._in_function:
|
|
|
|
|
self._current_complexity += len(node.values) - 1
|
|
|
|
|
self.generic_visit(node)
|
|
|
|
|
|
|
|
|
|
def visit_IfExp(self, node):
|
|
|
|
|
# Ternary expression
|
|
|
|
|
if self._in_function:
|
|
|
|
|
self._current_complexity += 1
|
|
|
|
|
self.classes += 1
|
|
|
|
|
self.generic_visit(node)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def compute_file_complexity(filepath: str) -> Tuple[float, int, int, int, int]:
|
|
|
|
|
"""
|
|
|
|
|
Compute cyclomatic complexity for a Python file.
|
|
|
|
|
"""Compute cyclomatic complexity for a Python file.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
(avg_complexity, max_complexity, function_count, class_count, line_count)
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
|
|
|
|
|
with open(filepath) as f:
|
|
|
|
|
source = f.read()
|
|
|
|
|
except (IOError, OSError):
|
|
|
|
|
return 0.0, 0, 0, 0, 0
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
tree = ast.parse(source, filename=filepath)
|
|
|
|
|
except SyntaxError:
|
|
|
|
|
return 0.0, 0, 0, 0, 0
|
|
|
|
|
except (SyntaxError, UnicodeDecodeError, OSError):
|
|
|
|
|
return (0.0, 0, 0, 0, 0)
|
|
|
|
|
|
|
|
|
|
visitor = ComplexityVisitor()
|
|
|
|
|
visitor = _ComplexityVisitor()
|
|
|
|
|
visitor.visit(tree)
|
|
|
|
|
|
|
|
|
|
line_count = len(source.splitlines())
|
|
|
|
|
|
|
|
|
|
if not visitor.complexities:
|
|
|
|
|
# No functions, but might have classes
|
|
|
|
|
return 0.0, 0, visitor.function_count, visitor.class_count, line_count
|
|
|
|
|
if not visitor.functions:
|
|
|
|
|
return (0.0, 0, 0, visitor.classes, line_count)
|
|
|
|
|
|
|
|
|
|
avg = sum(visitor.complexities) / len(visitor.complexities)
|
|
|
|
|
max_c = max(visitor.complexities)
|
|
|
|
|
complexities = [c for _, c in visitor.functions]
|
|
|
|
|
avg = sum(complexities) / len(complexities)
|
|
|
|
|
max_c = max(complexities)
|
|
|
|
|
return (round(avg, 1), max_c, len(visitor.functions), visitor.classes, line_count)
|
|
|
|
|
|
|
|
|
|
return avg, max_c, visitor.function_count, visitor.class_count, line_count
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── Refactoring Score ───────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def calculate_refactoring_score(metrics: FileMetrics) -> float:
|
|
|
|
|
"""
|
|
|
|
|
Calculate a refactoring priority score (0-100) based on file metrics.
|
|
|
|
|
"""Calculate a refactoring priority score (0-100).
|
|
|
|
|
|
|
|
|
|
Higher score = higher priority for refactoring.
|
|
|
|
|
Components:
|
|
|
|
|
- Complexity (0-30 points): higher avg/max complexity = higher score
|
|
|
|
|
- Size (0-20 points): larger files = higher score
|
|
|
|
|
- Churn (0-30 points): more changes recently = higher score
|
|
|
|
|
- Coverage (0-20 points): lower test coverage = higher score
|
|
|
|
|
Components: complexity (30), size (20), churn (25), coverage (15), structure (10).
|
|
|
|
|
"""
|
|
|
|
|
score = 0.0
|
|
|
|
|
|
|
|
|
|
# Complexity component (0-30)
|
|
|
|
|
# avg=10+ or max=20+ → 30 points
|
|
|
|
|
complexity_score = min(30.0, (metrics.complexity * 2) + (metrics.max_complexity * 0.5))
|
|
|
|
|
score += max(0.0, complexity_score)
|
|
|
|
|
if metrics.complexity > 0:
|
|
|
|
|
score += min(30, metrics.complexity * 2)
|
|
|
|
|
|
|
|
|
|
# Size component (0-20)
|
|
|
|
|
# 500+ lines → 20 points
|
|
|
|
|
size_score = min(20.0, metrics.lines / 25.0)
|
|
|
|
|
score += max(0.0, size_score)
|
|
|
|
|
if metrics.lines > 0:
|
|
|
|
|
score += min(20, metrics.lines / 50)
|
|
|
|
|
|
|
|
|
|
# Churn component (0-30)
|
|
|
|
|
# Weighted: recent churn (30d) counts more than older (90d)
|
|
|
|
|
churn_score = min(30.0, (metrics.churn_30d * 2) + (metrics.churn_90d * 0.5))
|
|
|
|
|
score += max(0.0, churn_score)
|
|
|
|
|
churn_score = (metrics.churn_30d * 2) + metrics.churn_90d
|
|
|
|
|
score += min(25, churn_score * 1.5)
|
|
|
|
|
|
|
|
|
|
# Coverage component (0-20)
|
|
|
|
|
# Lower coverage → higher score
|
|
|
|
|
if metrics.test_coverage is not None:
|
|
|
|
|
# coverage=0 → 20 points, coverage=1 → 0 points
|
|
|
|
|
coverage_score = (1.0 - metrics.test_coverage) * 20.0
|
|
|
|
|
if metrics.test_coverage < 0.5:
|
|
|
|
|
score += 15 * (1 - metrics.test_coverage)
|
|
|
|
|
else:
|
|
|
|
|
score += 15 * (1 - metrics.test_coverage) * 0.3
|
|
|
|
|
else:
|
|
|
|
|
# No data → assume medium risk (10 points)
|
|
|
|
|
coverage_score = 10.0
|
|
|
|
|
score += max(0.0, coverage_score)
|
|
|
|
|
score += 7.5
|
|
|
|
|
|
|
|
|
|
return min(100.0, max(0.0, score))
|
|
|
|
|
if metrics.functions > 10:
|
|
|
|
|
score += min(10, (metrics.functions - 10) * 0.5)
|
|
|
|
|
|
|
|
|
|
return round(min(100, max(0, score)), 1)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── Proposal Generation ─────────────────────────────────────────────────
|
|
|
|
|
def generate_proposals():
|
|
|
|
|
"""Generate sample proposals for this engine."""
|
|
|
|
|
return [
|
|
|
|
|
{
|
|
|
|
|
"title": "Sample improvement from 10.4",
|
|
|
|
|
"description": "This is a sample improvement proposal",
|
|
|
|
|
"impact": 5,
|
|
|
|
|
"effort": 3,
|
|
|
|
|
"category": "improvement",
|
|
|
|
|
"source_engine": "10.4",
|
|
|
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
|
|
|
}
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
def scan_directory(directory: str, extensions: tuple = ('.py',)) -> list:
|
|
|
|
|
"""Scan directory for source files."""
|
|
|
|
|
files = []
|
|
|
|
|
for root, dirs, filenames in os.walk(directory):
|
|
|
|
|
# Skip hidden dirs and common non-source dirs
|
|
|
|
|
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in (
|
|
|
|
|
'__pycache__', 'node_modules', 'venv', '.venv', 'env',
|
|
|
|
|
'build', 'dist', '.git', '.tox'
|
|
|
|
|
)]
|
|
|
|
|
for fname in filenames:
|
|
|
|
|
if any(fname.endswith(ext) for ext in extensions):
|
|
|
|
|
files.append(os.path.join(root, fname))
|
|
|
|
|
return files
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_proposals(directory: str = '.', min_score: float = 30.0) -> list:
|
|
|
|
|
"""Generate refactoring proposals by analyzing source files."""
|
|
|
|
|
proposals = []
|
|
|
|
|
files = scan_directory(directory)
|
|
|
|
|
|
|
|
|
|
for filepath in files:
|
|
|
|
|
avg, max_c, funcs, classes, lines = compute_file_complexity(filepath)
|
|
|
|
|
|
|
|
|
|
if funcs == 0 and classes == 0:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
metrics = FileMetrics(
|
|
|
|
|
path=filepath,
|
|
|
|
|
lines=lines,
|
|
|
|
|
complexity=avg,
|
|
|
|
|
max_complexity=max_c,
|
|
|
|
|
functions=funcs,
|
|
|
|
|
classes=classes
|
|
|
|
|
)
|
|
|
|
|
score = calculate_refactoring_score(metrics)
|
|
|
|
|
metrics.refactoring_score = score
|
|
|
|
|
|
|
|
|
|
if score >= min_score:
|
|
|
|
|
reasons = []
|
|
|
|
|
if max_c > 10:
|
|
|
|
|
reasons.append(f"high max complexity ({max_c})")
|
|
|
|
|
if avg > 5:
|
|
|
|
|
reasons.append(f"high avg complexity ({avg:.1f})")
|
|
|
|
|
if lines > 300:
|
|
|
|
|
reasons.append(f"large file ({lines} lines)")
|
|
|
|
|
|
|
|
|
|
proposals.append({
|
|
|
|
|
"title": f"Refactor {os.path.basename(filepath)} (score: {score:.0f})",
|
|
|
|
|
"description": f"{filepath}: {', '.join(reasons) if reasons else 'general improvement candidate'}",
|
|
|
|
|
"impact": min(10, int(score / 10)),
|
|
|
|
|
"effort": min(10, max(1, int(max_c / 3))),
|
|
|
|
|
"category": "refactoring",
|
|
|
|
|
"source_engine": "10.4",
|
|
|
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
|
|
|
"metrics": {
|
|
|
|
|
"path": filepath,
|
|
|
|
|
"score": round(score, 2),
|
|
|
|
|
"avg_complexity": round(avg, 2),
|
|
|
|
|
"max_complexity": max_c,
|
|
|
|
|
"lines": lines,
|
|
|
|
|
"functions": funcs,
|
|
|
|
|
"classes": classes
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
# Sort by score descending
|
|
|
|
|
proposals.sort(key=lambda p: p.get('metrics', {}).get('score', 0), reverse=True)
|
|
|
|
|
return proposals
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ── CLI ─────────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
parser = argparse.ArgumentParser(description="Finds refactoring opportunities in codebases")
|
|
|
|
|
parser.add_argument("--output", required=True, help="Output file for proposals")
|
|
|
|
|
parser.add_argument("--dry-run", action="store_true", help="Don't write output file")
|
|
|
|
|
parser.add_argument("--directory", default=".", help="Directory to scan")
|
|
|
|
|
parser.add_argument("--min-score", type=float, default=30.0, help="Minimum score threshold")
|
|
|
|
|
|
|
|
|
|
parser.add_argument("--dry-run", action="store_true", help="Do not write output file")
|
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
|
|
|
|
proposals = generate_proposals(args.directory, args.min_score)
|
|
|
|
|
proposals = generate_proposals()
|
|
|
|
|
|
|
|
|
|
if not args.dry_run:
|
|
|
|
|
os.makedirs(os.path.dirname(args.output) or '.', exist_ok=True)
|
|
|
|
|
with open(args.output, "w") as f:
|
|
|
|
|
json.dump({"proposals": proposals}, f, indent=2)
|
|
|
|
|
print(f"Generated {len(proposals)} proposals -> {args.output}")
|
|
|
|
|
|