#!/usr/bin/env python3 """ Finds refactoring opportunities in codebases Engine ID: 10.4 Usage: python3 scripts/refactoring_opportunity_finder.py --output proposals/refactoring_opportunity_finder.json python3 scripts/refactoring_opportunity_finder.py --output proposals/refactoring_opportunity_finder.json --dry-run """ import argparse import ast import json import os import sys from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Optional, Tuple # ── Data Classes ──────────────────────────────────────────────────────── @dataclass class FileMetrics: """Metrics for a single source file.""" path: str lines: int = 0 complexity: float = 0.0 max_complexity: int = 0 functions: int = 0 classes: int = 0 churn_30d: int = 0 churn_90d: int = 0 test_coverage: Optional[float] = None refactoring_score: float = 0.0 # ── Complexity Analysis ───────────────────────────────────────────────── class ComplexityVisitor(ast.NodeVisitor): """AST visitor that computes cyclomatic complexity per function.""" def __init__(self): self.complexities = [] self.function_count = 0 self.class_count = 0 self._current_complexity = 0 self._in_function = False def visit_FunctionDef(self, node): self.function_count += 1 old_complexity = self._current_complexity old_in_function = self._in_function self._current_complexity = 1 # Base complexity self._in_function = True self.generic_visit(node) self.complexities.append(self._current_complexity) self._current_complexity = old_complexity self._in_function = old_in_function visit_AsyncFunctionDef = visit_FunctionDef def visit_ClassDef(self, node): self.class_count += 1 self.generic_visit(node) def visit_If(self, node): if self._in_function: self._current_complexity += 1 self.generic_visit(node) def visit_For(self, node): if self._in_function: self._current_complexity += 1 self.generic_visit(node) visit_AsyncFor = visit_For def visit_While(self, node): if self._in_function: self._current_complexity += 1 self.generic_visit(node) def visit_ExceptHandler(self, node): if self._in_function: self._current_complexity += 1 self.generic_visit(node) def visit_With(self, node): if self._in_function: self._current_complexity += 1 self.generic_visit(node) visit_AsyncWith = visit_With def visit_Assert(self, node): if self._in_function: self._current_complexity += 1 self.generic_visit(node) def visit_BoolOp(self, node): # Each 'and'/'or' adds a branch if self._in_function: self._current_complexity += len(node.values) - 1 self.generic_visit(node) def visit_IfExp(self, node): # Ternary expression if self._in_function: self._current_complexity += 1 self.generic_visit(node) def compute_file_complexity(filepath: str) -> Tuple[float, int, int, int, int]: """ Compute cyclomatic complexity for a Python file. Returns: (avg_complexity, max_complexity, function_count, class_count, line_count) """ try: with open(filepath, 'r', encoding='utf-8', errors='replace') as f: source = f.read() except (IOError, OSError): return 0.0, 0, 0, 0, 0 try: tree = ast.parse(source, filename=filepath) except SyntaxError: return 0.0, 0, 0, 0, 0 visitor = ComplexityVisitor() visitor.visit(tree) line_count = len(source.splitlines()) if not visitor.complexities: # No functions, but might have classes return 0.0, 0, visitor.function_count, visitor.class_count, line_count avg = sum(visitor.complexities) / len(visitor.complexities) max_c = max(visitor.complexities) return avg, max_c, visitor.function_count, visitor.class_count, line_count # ── Refactoring Score ─────────────────────────────────────────────────── def calculate_refactoring_score(metrics: FileMetrics) -> float: """ Calculate a refactoring priority score (0-100) based on file metrics. Higher score = higher priority for refactoring. Components: - Complexity (0-30 points): higher avg/max complexity = higher score - Size (0-20 points): larger files = higher score - Churn (0-30 points): more changes recently = higher score - Coverage (0-20 points): lower test coverage = higher score """ score = 0.0 # Complexity component (0-30) # avg=10+ or max=20+ → 30 points complexity_score = min(30.0, (metrics.complexity * 2) + (metrics.max_complexity * 0.5)) score += max(0.0, complexity_score) # Size component (0-20) # 500+ lines → 20 points size_score = min(20.0, metrics.lines / 25.0) score += max(0.0, size_score) # Churn component (0-30) # Weighted: recent churn (30d) counts more than older (90d) churn_score = min(30.0, (metrics.churn_30d * 2) + (metrics.churn_90d * 0.5)) score += max(0.0, churn_score) # Coverage component (0-20) # Lower coverage → higher score if metrics.test_coverage is not None: # coverage=0 → 20 points, coverage=1 → 0 points coverage_score = (1.0 - metrics.test_coverage) * 20.0 else: # No data → assume medium risk (10 points) coverage_score = 10.0 score += max(0.0, coverage_score) return min(100.0, max(0.0, score)) # ── Proposal Generation ───────────────────────────────────────────────── def scan_directory(directory: str, extensions: tuple = ('.py',)) -> list: """Scan directory for source files.""" files = [] for root, dirs, filenames in os.walk(directory): # Skip hidden dirs and common non-source dirs dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ( '__pycache__', 'node_modules', 'venv', '.venv', 'env', 'build', 'dist', '.git', '.tox' )] for fname in filenames: if any(fname.endswith(ext) for ext in extensions): files.append(os.path.join(root, fname)) return files def generate_proposals(directory: str = '.', min_score: float = 30.0) -> list: """Generate refactoring proposals by analyzing source files.""" proposals = [] files = scan_directory(directory) for filepath in files: avg, max_c, funcs, classes, lines = compute_file_complexity(filepath) if funcs == 0 and classes == 0: continue metrics = FileMetrics( path=filepath, lines=lines, complexity=avg, max_complexity=max_c, functions=funcs, classes=classes ) score = calculate_refactoring_score(metrics) metrics.refactoring_score = score if score >= min_score: reasons = [] if max_c > 10: reasons.append(f"high max complexity ({max_c})") if avg > 5: reasons.append(f"high avg complexity ({avg:.1f})") if lines > 300: reasons.append(f"large file ({lines} lines)") proposals.append({ "title": f"Refactor {os.path.basename(filepath)} (score: {score:.0f})", "description": f"{filepath}: {', '.join(reasons) if reasons else 'general improvement candidate'}", "impact": min(10, int(score / 10)), "effort": min(10, max(1, int(max_c / 3))), "category": "refactoring", "source_engine": "10.4", "timestamp": datetime.now(timezone.utc).isoformat(), "metrics": { "path": filepath, "score": round(score, 2), "avg_complexity": round(avg, 2), "max_complexity": max_c, "lines": lines, "functions": funcs, "classes": classes } }) # Sort by score descending proposals.sort(key=lambda p: p.get('metrics', {}).get('score', 0), reverse=True) return proposals # ── CLI ───────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Finds refactoring opportunities in codebases") parser.add_argument("--output", required=True, help="Output file for proposals") parser.add_argument("--dry-run", action="store_true", help="Don't write output file") parser.add_argument("--directory", default=".", help="Directory to scan") parser.add_argument("--min-score", type=float, default=30.0, help="Minimum score threshold") args = parser.parse_args() proposals = generate_proposals(args.directory, args.min_score) if not args.dry_run: os.makedirs(os.path.dirname(args.output) or '.', exist_ok=True) with open(args.output, "w") as f: json.dump({"proposals": proposals}, f, indent=2) print(f"Generated {len(proposals)} proposals -> {args.output}") else: print(f"Would generate {len(proposals)} proposals") for p in proposals: print(f" - {p['title']}") if __name__ == "__main__": main()