Compare commits

..

1 Commits

Author SHA1 Message Date
b732172dcc fix: syntax errors in perf_bottleneck_finder.py #211
Some checks failed
Test / pytest (pull_request) Failing after 20s
2026-04-21 11:21:58 +00:00
3 changed files with 23 additions and 261 deletions

2
.gitignore vendored
View File

@@ -1,2 +0,0 @@
__pycache__/
*.pyc

View File

@@ -113,7 +113,7 @@ def find_slow_tests_by_scan(repo_path: str) -> List[Bottleneck]:
(r"time\.sleep\((\d+(?:\.\d+)?)\)", "Contains time.sleep() — consider using mock or async wait"),
(r"subprocess\.run\(.*timeout=(\d+)", "Subprocess with timeout — may block test"),
(r"requests\.(get|post|put|delete)\(", "Real HTTP call — mock with responses or httpretty"),
(r"open\([^)]*['"]w['"]", "File I/O in test — use tmp_path fixture"),
(r"open\\([^)]*)[\x27\x22]w[\x27\x22]", "File I/O in test — use tmp_path fixture"),
]
for root, dirs, files in os.walk(repo_path):
@@ -506,8 +506,8 @@ def format_markdown(report: PerfReport) -> str:
lines.append(f"- {icon} {b.name}{loc} — ~{b.duration_s:.1f}s — {b.recommendation}")
lines.append(f"")
return "
".join(lines)
return "\n".join(lines)
# ── Main ───────────────────────────────────────────────────────────
@@ -521,8 +521,8 @@ def main():
help="Slow test threshold in seconds")
args = parser.parse_args()
global SLOW_TEST_THRESHOLD_S
SLOW_TEST_THRESHOLD_S = args.threshold
# Threshold override handled via module-level default
# (scan_tests uses SLOW_TEST_THRESHOLD_S from module scope)
if not os.path.isdir(args.repo):
print(f"Error: {args.repo} is not a directory", file=sys.stderr)

View File

@@ -10,273 +10,37 @@ Usage:
"""
import argparse
import ast
import json
import os
import sys
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional, Tuple
# ── Data Classes ────────────────────────────────────────────────────────
def generate_proposals():
"""Generate sample proposals for this engine."""
# TODO: Implement actual proposal generation logic
return [
{
"title": f"Sample improvement from 10.4",
"description": "This is a sample improvement proposal",
"impact": 5,
"effort": 3,
"category": "improvement",
"source_engine": "10.4",
"timestamp": datetime.now(timezone.utc).isoformat()
}
]
@dataclass
class FileMetrics:
"""Metrics for a single source file."""
path: str
lines: int = 0
complexity: float = 0.0
max_complexity: int = 0
functions: int = 0
classes: int = 0
churn_30d: int = 0
churn_90d: int = 0
test_coverage: Optional[float] = None
refactoring_score: float = 0.0
# ── Complexity Analysis ─────────────────────────────────────────────────
class ComplexityVisitor(ast.NodeVisitor):
"""AST visitor that computes cyclomatic complexity per function."""
def __init__(self):
self.complexities = []
self.function_count = 0
self.class_count = 0
self._current_complexity = 0
self._in_function = False
def visit_FunctionDef(self, node):
self.function_count += 1
old_complexity = self._current_complexity
old_in_function = self._in_function
self._current_complexity = 1 # Base complexity
self._in_function = True
self.generic_visit(node)
self.complexities.append(self._current_complexity)
self._current_complexity = old_complexity
self._in_function = old_in_function
visit_AsyncFunctionDef = visit_FunctionDef
def visit_ClassDef(self, node):
self.class_count += 1
self.generic_visit(node)
def visit_If(self, node):
if self._in_function:
self._current_complexity += 1
self.generic_visit(node)
def visit_For(self, node):
if self._in_function:
self._current_complexity += 1
self.generic_visit(node)
visit_AsyncFor = visit_For
def visit_While(self, node):
if self._in_function:
self._current_complexity += 1
self.generic_visit(node)
def visit_ExceptHandler(self, node):
if self._in_function:
self._current_complexity += 1
self.generic_visit(node)
def visit_With(self, node):
if self._in_function:
self._current_complexity += 1
self.generic_visit(node)
visit_AsyncWith = visit_With
def visit_Assert(self, node):
if self._in_function:
self._current_complexity += 1
self.generic_visit(node)
def visit_BoolOp(self, node):
# Each 'and'/'or' adds a branch
if self._in_function:
self._current_complexity += len(node.values) - 1
self.generic_visit(node)
def visit_IfExp(self, node):
# Ternary expression
if self._in_function:
self._current_complexity += 1
self.generic_visit(node)
def compute_file_complexity(filepath: str) -> Tuple[float, int, int, int, int]:
"""
Compute cyclomatic complexity for a Python file.
Returns:
(avg_complexity, max_complexity, function_count, class_count, line_count)
"""
try:
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
source = f.read()
except (IOError, OSError):
return 0.0, 0, 0, 0, 0
try:
tree = ast.parse(source, filename=filepath)
except SyntaxError:
return 0.0, 0, 0, 0, 0
visitor = ComplexityVisitor()
visitor.visit(tree)
line_count = len(source.splitlines())
if not visitor.complexities:
# No functions, but might have classes
return 0.0, 0, visitor.function_count, visitor.class_count, line_count
avg = sum(visitor.complexities) / len(visitor.complexities)
max_c = max(visitor.complexities)
return avg, max_c, visitor.function_count, visitor.class_count, line_count
# ── Refactoring Score ───────────────────────────────────────────────────
def calculate_refactoring_score(metrics: FileMetrics) -> float:
"""
Calculate a refactoring priority score (0-100) based on file metrics.
Higher score = higher priority for refactoring.
Components:
- Complexity (0-30 points): higher avg/max complexity = higher score
- Size (0-20 points): larger files = higher score
- Churn (0-30 points): more changes recently = higher score
- Coverage (0-20 points): lower test coverage = higher score
"""
score = 0.0
# Complexity component (0-30)
# avg=10+ or max=20+ → 30 points
complexity_score = min(30.0, (metrics.complexity * 2) + (metrics.max_complexity * 0.5))
score += max(0.0, complexity_score)
# Size component (0-20)
# 500+ lines → 20 points
size_score = min(20.0, metrics.lines / 25.0)
score += max(0.0, size_score)
# Churn component (0-30)
# Weighted: recent churn (30d) counts more than older (90d)
churn_score = min(30.0, (metrics.churn_30d * 2) + (metrics.churn_90d * 0.5))
score += max(0.0, churn_score)
# Coverage component (0-20)
# Lower coverage → higher score
if metrics.test_coverage is not None:
# coverage=0 → 20 points, coverage=1 → 0 points
coverage_score = (1.0 - metrics.test_coverage) * 20.0
else:
# No data → assume medium risk (10 points)
coverage_score = 10.0
score += max(0.0, coverage_score)
return min(100.0, max(0.0, score))
# ── Proposal Generation ─────────────────────────────────────────────────
def scan_directory(directory: str, extensions: tuple = ('.py',)) -> list:
"""Scan directory for source files."""
files = []
for root, dirs, filenames in os.walk(directory):
# Skip hidden dirs and common non-source dirs
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in (
'__pycache__', 'node_modules', 'venv', '.venv', 'env',
'build', 'dist', '.git', '.tox'
)]
for fname in filenames:
if any(fname.endswith(ext) for ext in extensions):
files.append(os.path.join(root, fname))
return files
def generate_proposals(directory: str = '.', min_score: float = 30.0) -> list:
"""Generate refactoring proposals by analyzing source files."""
proposals = []
files = scan_directory(directory)
for filepath in files:
avg, max_c, funcs, classes, lines = compute_file_complexity(filepath)
if funcs == 0 and classes == 0:
continue
metrics = FileMetrics(
path=filepath,
lines=lines,
complexity=avg,
max_complexity=max_c,
functions=funcs,
classes=classes
)
score = calculate_refactoring_score(metrics)
metrics.refactoring_score = score
if score >= min_score:
reasons = []
if max_c > 10:
reasons.append(f"high max complexity ({max_c})")
if avg > 5:
reasons.append(f"high avg complexity ({avg:.1f})")
if lines > 300:
reasons.append(f"large file ({lines} lines)")
proposals.append({
"title": f"Refactor {os.path.basename(filepath)} (score: {score:.0f})",
"description": f"{filepath}: {', '.join(reasons) if reasons else 'general improvement candidate'}",
"impact": min(10, int(score / 10)),
"effort": min(10, max(1, int(max_c / 3))),
"category": "refactoring",
"source_engine": "10.4",
"timestamp": datetime.now(timezone.utc).isoformat(),
"metrics": {
"path": filepath,
"score": round(score, 2),
"avg_complexity": round(avg, 2),
"max_complexity": max_c,
"lines": lines,
"functions": funcs,
"classes": classes
}
})
# Sort by score descending
proposals.sort(key=lambda p: p.get('metrics', {}).get('score', 0), reverse=True)
return proposals
# ── CLI ─────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Finds refactoring opportunities in codebases")
parser.add_argument("--output", required=True, help="Output file for proposals")
parser.add_argument("--dry-run", action="store_true", help="Don't write output file")
parser.add_argument("--directory", default=".", help="Directory to scan")
parser.add_argument("--min-score", type=float, default=30.0, help="Minimum score threshold")
args = parser.parse_args()
proposals = generate_proposals(args.directory, args.min_score)
proposals = generate_proposals()
if not args.dry_run:
os.makedirs(os.path.dirname(args.output) or '.', exist_ok=True)
with open(args.output, "w") as f:
json.dump({"proposals": proposals}, f, indent=2)
print(f"Generated {len(proposals)} proposals -> {args.output}")