From 55adcb31dcdab9969748d5db95b7d58794b053bd Mon Sep 17 00:00:00 2001 From: Timmy Date: Tue, 21 Apr 2026 07:29:01 -0400 Subject: [PATCH] fix: implement refactoring_opportunity_finder API (#210) The test file expects compute_file_complexity(), calculate_refactoring_score(), and FileMetrics from the script, but only a stub generate_proposals() existed. Implemented: - compute_file_complexity(): AST-based cyclomatic complexity analysis - calculate_refactoring_score(): weighted scoring (complexity, size, churn, coverage) - FileMetrics: dataclass with all required fields - Full generate_proposals() that scans directories and produces scored proposals All 10 tests pass. py_compile succeeds. Closes #210 --- .gitignore | 2 + scripts/refactoring_opportunity_finder.py | 272 ++++++++++++++++++++-- 2 files changed, 256 insertions(+), 18 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7a60b85 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__/ +*.pyc diff --git a/scripts/refactoring_opportunity_finder.py b/scripts/refactoring_opportunity_finder.py index ecd33b0..de35587 100755 --- a/scripts/refactoring_opportunity_finder.py +++ b/scripts/refactoring_opportunity_finder.py @@ -10,37 +10,273 @@ Usage: """ import argparse +import ast import json +import os import sys +from dataclasses import dataclass, field from datetime import datetime, timezone +from typing import Optional, Tuple -def generate_proposals(): - """Generate sample proposals for this engine.""" - # TODO: Implement actual proposal generation logic - return [ - { - "title": f"Sample improvement from 10.4", - "description": "This is a sample improvement proposal", - "impact": 5, - "effort": 3, - "category": "improvement", - "source_engine": "10.4", - "timestamp": datetime.now(timezone.utc).isoformat() - } - ] +# ── Data Classes ──────────────────────────────────────────────────────── +@dataclass +class FileMetrics: + """Metrics for a single source file.""" + path: str + lines: int = 0 + complexity: float = 0.0 + max_complexity: int = 0 + functions: int = 0 + classes: int = 0 + churn_30d: int = 0 + churn_90d: int = 0 + test_coverage: Optional[float] = None + refactoring_score: float = 0.0 + + +# ── Complexity Analysis ───────────────────────────────────────────────── + +class ComplexityVisitor(ast.NodeVisitor): + """AST visitor that computes cyclomatic complexity per function.""" + + def __init__(self): + self.complexities = [] + self.function_count = 0 + self.class_count = 0 + self._current_complexity = 0 + self._in_function = False + + def visit_FunctionDef(self, node): + self.function_count += 1 + old_complexity = self._current_complexity + old_in_function = self._in_function + self._current_complexity = 1 # Base complexity + self._in_function = True + + self.generic_visit(node) + + self.complexities.append(self._current_complexity) + self._current_complexity = old_complexity + self._in_function = old_in_function + + visit_AsyncFunctionDef = visit_FunctionDef + + def visit_ClassDef(self, node): + self.class_count += 1 + self.generic_visit(node) + + def visit_If(self, node): + if self._in_function: + self._current_complexity += 1 + self.generic_visit(node) + + def visit_For(self, node): + if self._in_function: + self._current_complexity += 1 + self.generic_visit(node) + + visit_AsyncFor = visit_For + + def visit_While(self, node): + if self._in_function: + self._current_complexity += 1 + self.generic_visit(node) + + def visit_ExceptHandler(self, node): + if self._in_function: + self._current_complexity += 1 + self.generic_visit(node) + + def visit_With(self, node): + if self._in_function: + self._current_complexity += 1 + self.generic_visit(node) + + visit_AsyncWith = visit_With + + def visit_Assert(self, node): + if self._in_function: + self._current_complexity += 1 + self.generic_visit(node) + + def visit_BoolOp(self, node): + # Each 'and'/'or' adds a branch + if self._in_function: + self._current_complexity += len(node.values) - 1 + self.generic_visit(node) + + def visit_IfExp(self, node): + # Ternary expression + if self._in_function: + self._current_complexity += 1 + self.generic_visit(node) + + +def compute_file_complexity(filepath: str) -> Tuple[float, int, int, int, int]: + """ + Compute cyclomatic complexity for a Python file. + + Returns: + (avg_complexity, max_complexity, function_count, class_count, line_count) + """ + try: + with open(filepath, 'r', encoding='utf-8', errors='replace') as f: + source = f.read() + except (IOError, OSError): + return 0.0, 0, 0, 0, 0 + + try: + tree = ast.parse(source, filename=filepath) + except SyntaxError: + return 0.0, 0, 0, 0, 0 + + visitor = ComplexityVisitor() + visitor.visit(tree) + + line_count = len(source.splitlines()) + + if not visitor.complexities: + # No functions, but might have classes + return 0.0, 0, visitor.function_count, visitor.class_count, line_count + + avg = sum(visitor.complexities) / len(visitor.complexities) + max_c = max(visitor.complexities) + + return avg, max_c, visitor.function_count, visitor.class_count, line_count + + +# ── Refactoring Score ─────────────────────────────────────────────────── + +def calculate_refactoring_score(metrics: FileMetrics) -> float: + """ + Calculate a refactoring priority score (0-100) based on file metrics. + + Higher score = higher priority for refactoring. + Components: + - Complexity (0-30 points): higher avg/max complexity = higher score + - Size (0-20 points): larger files = higher score + - Churn (0-30 points): more changes recently = higher score + - Coverage (0-20 points): lower test coverage = higher score + """ + score = 0.0 + + # Complexity component (0-30) + # avg=10+ or max=20+ → 30 points + complexity_score = min(30.0, (metrics.complexity * 2) + (metrics.max_complexity * 0.5)) + score += max(0.0, complexity_score) + + # Size component (0-20) + # 500+ lines → 20 points + size_score = min(20.0, metrics.lines / 25.0) + score += max(0.0, size_score) + + # Churn component (0-30) + # Weighted: recent churn (30d) counts more than older (90d) + churn_score = min(30.0, (metrics.churn_30d * 2) + (metrics.churn_90d * 0.5)) + score += max(0.0, churn_score) + + # Coverage component (0-20) + # Lower coverage → higher score + if metrics.test_coverage is not None: + # coverage=0 → 20 points, coverage=1 → 0 points + coverage_score = (1.0 - metrics.test_coverage) * 20.0 + else: + # No data → assume medium risk (10 points) + coverage_score = 10.0 + score += max(0.0, coverage_score) + + return min(100.0, max(0.0, score)) + + +# ── Proposal Generation ───────────────────────────────────────────────── + +def scan_directory(directory: str, extensions: tuple = ('.py',)) -> list: + """Scan directory for source files.""" + files = [] + for root, dirs, filenames in os.walk(directory): + # Skip hidden dirs and common non-source dirs + dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ( + '__pycache__', 'node_modules', 'venv', '.venv', 'env', + 'build', 'dist', '.git', '.tox' + )] + for fname in filenames: + if any(fname.endswith(ext) for ext in extensions): + files.append(os.path.join(root, fname)) + return files + + +def generate_proposals(directory: str = '.', min_score: float = 30.0) -> list: + """Generate refactoring proposals by analyzing source files.""" + proposals = [] + files = scan_directory(directory) + + for filepath in files: + avg, max_c, funcs, classes, lines = compute_file_complexity(filepath) + + if funcs == 0 and classes == 0: + continue + + metrics = FileMetrics( + path=filepath, + lines=lines, + complexity=avg, + max_complexity=max_c, + functions=funcs, + classes=classes + ) + score = calculate_refactoring_score(metrics) + metrics.refactoring_score = score + + if score >= min_score: + reasons = [] + if max_c > 10: + reasons.append(f"high max complexity ({max_c})") + if avg > 5: + reasons.append(f"high avg complexity ({avg:.1f})") + if lines > 300: + reasons.append(f"large file ({lines} lines)") + + proposals.append({ + "title": f"Refactor {os.path.basename(filepath)} (score: {score:.0f})", + "description": f"{filepath}: {', '.join(reasons) if reasons else 'general improvement candidate'}", + "impact": min(10, int(score / 10)), + "effort": min(10, max(1, int(max_c / 3))), + "category": "refactoring", + "source_engine": "10.4", + "timestamp": datetime.now(timezone.utc).isoformat(), + "metrics": { + "path": filepath, + "score": round(score, 2), + "avg_complexity": round(avg, 2), + "max_complexity": max_c, + "lines": lines, + "functions": funcs, + "classes": classes + } + }) + + # Sort by score descending + proposals.sort(key=lambda p: p.get('metrics', {}).get('score', 0), reverse=True) + return proposals + + +# ── CLI ───────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Finds refactoring opportunities in codebases") parser.add_argument("--output", required=True, help="Output file for proposals") parser.add_argument("--dry-run", action="store_true", help="Don't write output file") - + parser.add_argument("--directory", default=".", help="Directory to scan") + parser.add_argument("--min-score", type=float, default=30.0, help="Minimum score threshold") + args = parser.parse_args() - - proposals = generate_proposals() - + + proposals = generate_proposals(args.directory, args.min_score) + if not args.dry_run: + os.makedirs(os.path.dirname(args.output) or '.', exist_ok=True) with open(args.output, "w") as f: json.dump({"proposals": proposals}, f, indent=2) print(f"Generated {len(proposals)} proposals -> {args.output}")