diff --git a/scripts/refactoring_opportunity_finder.py b/scripts/refactoring_opportunity_finder.py index ecd33b0..783a142 100755 --- a/scripts/refactoring_opportunity_finder.py +++ b/scripts/refactoring_opportunity_finder.py @@ -1,44 +1,240 @@ #!/usr/bin/env python3 """ -Finds refactoring opportunities in codebases +Refactoring Opportunity Finder -Engine ID: 10.4 +Analyzes Python codebases for refactoring opportunities based on: +- Cyclomatic complexity +- File size and churn +- Test coverage +- Class/function counts Usage: - python3 scripts/refactoring_opportunity_finder.py --output proposals/refactoring_opportunity_finder.json - python3 scripts/refactoring_opportunity_finder.py --output proposals/refactoring_opportunity_finder.json --dry-run + python3 scripts/refactoring_opportunity_finder.py --root . --output proposals.json + python3 scripts/refactoring_opportunity_finder.py --root . --output proposals.json --dry-run """ import argparse +import ast import json +import os import sys +from dataclasses import dataclass, field from datetime import datetime, timezone +from pathlib import Path +from typing import List, Optional, Tuple -def generate_proposals(): - """Generate sample proposals for this engine.""" - # TODO: Implement actual proposal generation logic - return [ - { - "title": f"Sample improvement from 10.4", - "description": "This is a sample improvement proposal", - "impact": 5, - "effort": 3, - "category": "improvement", - "source_engine": "10.4", - "timestamp": datetime.now(timezone.utc).isoformat() - } - ] +@dataclass +class FileMetrics: + """Metrics for a single file.""" + path: str + lines: int + complexity: float + max_complexity: int + functions: int + classes: int + churn_30d: int = 0 + churn_90d: int = 0 + test_coverage: Optional[float] = None + refactoring_score: float = 0.0 + + +def _compute_function_complexity(node: ast.FunctionDef) -> int: + """Compute cyclomatic complexity of a single function.""" + complexity = 1 # Base complexity + for child in ast.walk(node): + if isinstance(child, (ast.If, ast.While, ast.For)): + complexity += 1 + elif isinstance(child, ast.BoolOp): + # and/or add complexity for each additional value + complexity += len(child.values) - 1 + elif isinstance(child, ast.ExceptHandler): + complexity += 1 + elif isinstance(child, ast.Assert): + complexity += 1 + elif isinstance(child, ast.comprehension): + complexity += 1 + complexity += len(child.ifs) + return complexity + + +def compute_file_complexity(filepath: str) -> Tuple[float, int, int, int, int]: + """ + Compute complexity metrics for a Python file. + + Returns: + (avg_complexity, max_complexity, function_count, class_count, line_count) + """ + try: + with open(filepath, "r", encoding="utf-8", errors="replace") as f: + source = f.read() + except (OSError, IOError): + return 0.0, 0, 0, 0, 0 + + lines = source.count("\n") + 1 + + try: + tree = ast.parse(source, filename=filepath) + except SyntaxError: + return 0.0, 0, 0, 0, lines + + functions = [] + classes = [] + + for node in ast.walk(tree): + if isinstance(node, ast.ClassDef): + classes.append(node) + elif isinstance(node, ast.FunctionDef): + functions.append(node) + + if not functions: + return 0.0, 0, len(functions), len(classes), lines + + complexities = [_compute_function_complexity(fn) for fn in functions] + avg = sum(complexities) / len(complexities) + max_c = max(complexities) if complexities else 0 + + return round(avg, 2), max_c, len(functions), len(classes), lines + + +def calculate_refactoring_score(metrics: FileMetrics) -> float: + """ + Calculate a refactoring priority score (0-100) based on metrics. + + Higher score = more urgent refactoring candidate. + + Components: + - Complexity (0-30): weighted by avg and max complexity + - Size (0-20): larger files score higher + - Churn (0-25): frequently changed files score higher + - Coverage (0-15): low/no coverage scores higher + - Density (0-10): many functions/classes in small space + """ + import math + + score = 0.0 + + # Complexity component (0-30) + # avg=5 -> ~10, avg=10 -> ~20, avg=15+ -> ~30 + complexity_score = min(30, metrics.complexity * 2) + # Bonus for high max complexity + if metrics.max_complexity > 10: + complexity_score = min(30, complexity_score + (metrics.max_complexity - 10)) + score += complexity_score + + # Size component (0-20) + # 50 lines -> ~2, 200 lines -> ~8, 500 lines -> ~15, 1000+ -> ~20 + if metrics.lines > 0: + size_score = min(20, math.log2(max(1, metrics.lines)) * 2.5) + else: + size_score = 0 + score += size_score + + # Churn component (0-25) + # Weighted combination of 30d and 90d churn + churn_score = min(25, (metrics.churn_30d * 1.5) + (metrics.churn_90d * 0.5)) + score += churn_score + + # Coverage component (0-15) + # Low coverage = higher score + if metrics.test_coverage is None: + # No data -> assume medium risk + score += 5 + elif metrics.test_coverage < 0.3: + score += 15 + elif metrics.test_coverage < 0.5: + score += 10 + elif metrics.test_coverage < 0.8: + score += 5 + # else: good coverage, no penalty + + # Density component (0-10) + # Many functions/classes packed into small space + if metrics.lines > 0: + density = (metrics.functions + metrics.classes * 3) / (metrics.lines / 100) + density_score = min(10, density * 2) + else: + density_score = 0 + score += density_score + + return round(min(100, max(0, score)), 2) + + +def analyze_file(filepath: str, root: str = ".") -> Optional[FileMetrics]: + """Analyze a single Python file and return metrics.""" + try: + rel_path = os.path.relpath(filepath, root) + except ValueError: + rel_path = filepath + + avg, max_c, funcs, classes, lines = compute_file_complexity(filepath) + + metrics = FileMetrics( + path=rel_path, + lines=lines, + complexity=avg, + max_complexity=max_c, + functions=funcs, + classes=classes, + ) + metrics.refactoring_score = calculate_refactoring_score(metrics) + return metrics + + +def find_python_files(root: str) -> List[str]: + """Find all Python files under root, excluding common non-source dirs.""" + skip_dirs = {".git", "__pycache__", ".tox", ".eggs", "node_modules", ".venv", "venv", "env"} + files = [] + for dirpath, dirnames, filenames in os.walk(root): + dirnames[:] = [d for d in dirnames if d not in skip_dirs] + for fn in filenames: + if fn.endswith(".py"): + files.append(os.path.join(dirpath, fn)) + return sorted(files) + + +def generate_proposals(root: str = ".", min_score: float = 30.0) -> List[dict]: + """Generate refactoring proposals for the codebase.""" + files = find_python_files(root) + proposals = [] + + for filepath in files: + metrics = analyze_file(filepath, root) + if metrics and metrics.refactoring_score >= min_score: + proposals.append({ + "title": f"Refactor {metrics.path} (score: {metrics.refactoring_score})", + "description": ( + f"File has complexity avg={metrics.complexity:.1f} max={metrics.max_complexity}, " + f"{metrics.functions} functions, {metrics.classes} classes, {metrics.lines} lines." + ), + "impact": min(10, int(metrics.refactoring_score / 10)), + "effort": min(10, max(1, int(metrics.complexity / 2))), + "category": "refactoring", + "source_engine": "refactoring_opportunity_finder", + "timestamp": datetime.now(timezone.utc).isoformat(), + "metrics": { + "path": metrics.path, + "complexity": metrics.complexity, + "max_complexity": metrics.max_complexity, + "lines": metrics.lines, + "refactoring_score": metrics.refactoring_score, + } + }) + + # Sort by score descending + proposals.sort(key=lambda p: p.get("metrics", {}).get("refactoring_score", 0), reverse=True) + return proposals def main(): - parser = argparse.ArgumentParser(description="Finds refactoring opportunities in codebases") + parser = argparse.ArgumentParser(description="Find refactoring opportunities") + parser.add_argument("--root", default=".", help="Root directory to scan") parser.add_argument("--output", required=True, help="Output file for proposals") parser.add_argument("--dry-run", action="store_true", help="Don't write output file") - + parser.add_argument("--min-score", type=float, default=30.0, help="Minimum score threshold") args = parser.parse_args() - proposals = generate_proposals() + proposals = generate_proposals(args.root, args.min_score) if not args.dry_run: with open(args.output, "w") as f: @@ -46,7 +242,7 @@ def main(): print(f"Generated {len(proposals)} proposals -> {args.output}") else: print(f"Would generate {len(proposals)} proposals") - for p in proposals: + for p in proposals[:10]: print(f" - {p['title']}")