Compare commits

..

3 Commits

2 changed files with 80 additions and 416 deletions

View File

@@ -1,240 +1,44 @@
#!/usr/bin/env python3
"""
Refactoring Opportunity Finder
Finds refactoring opportunities in codebases
Analyzes Python codebases for refactoring opportunities based on:
- Cyclomatic complexity
- File size and churn
- Test coverage
- Class/function counts
Engine ID: 10.4
Usage:
python3 scripts/refactoring_opportunity_finder.py --root . --output proposals.json
python3 scripts/refactoring_opportunity_finder.py --root . --output proposals.json --dry-run
python3 scripts/refactoring_opportunity_finder.py --output proposals/refactoring_opportunity_finder.json
python3 scripts/refactoring_opportunity_finder.py --output proposals/refactoring_opportunity_finder.json --dry-run
"""
import argparse
import ast
import json
import os
import sys
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Optional, Tuple
@dataclass
class FileMetrics:
"""Metrics for a single file."""
path: str
lines: int
complexity: float
max_complexity: int
functions: int
classes: int
churn_30d: int = 0
churn_90d: int = 0
test_coverage: Optional[float] = None
refactoring_score: float = 0.0
def _compute_function_complexity(node: ast.FunctionDef) -> int:
"""Compute cyclomatic complexity of a single function."""
complexity = 1 # Base complexity
for child in ast.walk(node):
if isinstance(child, (ast.If, ast.While, ast.For)):
complexity += 1
elif isinstance(child, ast.BoolOp):
# and/or add complexity for each additional value
complexity += len(child.values) - 1
elif isinstance(child, ast.ExceptHandler):
complexity += 1
elif isinstance(child, ast.Assert):
complexity += 1
elif isinstance(child, ast.comprehension):
complexity += 1
complexity += len(child.ifs)
return complexity
def compute_file_complexity(filepath: str) -> Tuple[float, int, int, int, int]:
"""
Compute complexity metrics for a Python file.
Returns:
(avg_complexity, max_complexity, function_count, class_count, line_count)
"""
try:
with open(filepath, "r", encoding="utf-8", errors="replace") as f:
source = f.read()
except (OSError, IOError):
return 0.0, 0, 0, 0, 0
lines = source.count("\n") + 1
try:
tree = ast.parse(source, filename=filepath)
except SyntaxError:
return 0.0, 0, 0, 0, lines
functions = []
classes = []
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
classes.append(node)
elif isinstance(node, ast.FunctionDef):
functions.append(node)
if not functions:
return 0.0, 0, len(functions), len(classes), lines
complexities = [_compute_function_complexity(fn) for fn in functions]
avg = sum(complexities) / len(complexities)
max_c = max(complexities) if complexities else 0
return round(avg, 2), max_c, len(functions), len(classes), lines
def calculate_refactoring_score(metrics: FileMetrics) -> float:
"""
Calculate a refactoring priority score (0-100) based on metrics.
Higher score = more urgent refactoring candidate.
Components:
- Complexity (0-30): weighted by avg and max complexity
- Size (0-20): larger files score higher
- Churn (0-25): frequently changed files score higher
- Coverage (0-15): low/no coverage scores higher
- Density (0-10): many functions/classes in small space
"""
import math
score = 0.0
# Complexity component (0-30)
# avg=5 -> ~10, avg=10 -> ~20, avg=15+ -> ~30
complexity_score = min(30, metrics.complexity * 2)
# Bonus for high max complexity
if metrics.max_complexity > 10:
complexity_score = min(30, complexity_score + (metrics.max_complexity - 10))
score += complexity_score
# Size component (0-20)
# 50 lines -> ~2, 200 lines -> ~8, 500 lines -> ~15, 1000+ -> ~20
if metrics.lines > 0:
size_score = min(20, math.log2(max(1, metrics.lines)) * 2.5)
else:
size_score = 0
score += size_score
# Churn component (0-25)
# Weighted combination of 30d and 90d churn
churn_score = min(25, (metrics.churn_30d * 1.5) + (metrics.churn_90d * 0.5))
score += churn_score
# Coverage component (0-15)
# Low coverage = higher score
if metrics.test_coverage is None:
# No data -> assume medium risk
score += 5
elif metrics.test_coverage < 0.3:
score += 15
elif metrics.test_coverage < 0.5:
score += 10
elif metrics.test_coverage < 0.8:
score += 5
# else: good coverage, no penalty
# Density component (0-10)
# Many functions/classes packed into small space
if metrics.lines > 0:
density = (metrics.functions + metrics.classes * 3) / (metrics.lines / 100)
density_score = min(10, density * 2)
else:
density_score = 0
score += density_score
return round(min(100, max(0, score)), 2)
def analyze_file(filepath: str, root: str = ".") -> Optional[FileMetrics]:
"""Analyze a single Python file and return metrics."""
try:
rel_path = os.path.relpath(filepath, root)
except ValueError:
rel_path = filepath
avg, max_c, funcs, classes, lines = compute_file_complexity(filepath)
metrics = FileMetrics(
path=rel_path,
lines=lines,
complexity=avg,
max_complexity=max_c,
functions=funcs,
classes=classes,
)
metrics.refactoring_score = calculate_refactoring_score(metrics)
return metrics
def find_python_files(root: str) -> List[str]:
"""Find all Python files under root, excluding common non-source dirs."""
skip_dirs = {".git", "__pycache__", ".tox", ".eggs", "node_modules", ".venv", "venv", "env"}
files = []
for dirpath, dirnames, filenames in os.walk(root):
dirnames[:] = [d for d in dirnames if d not in skip_dirs]
for fn in filenames:
if fn.endswith(".py"):
files.append(os.path.join(dirpath, fn))
return sorted(files)
def generate_proposals(root: str = ".", min_score: float = 30.0) -> List[dict]:
"""Generate refactoring proposals for the codebase."""
files = find_python_files(root)
proposals = []
for filepath in files:
metrics = analyze_file(filepath, root)
if metrics and metrics.refactoring_score >= min_score:
proposals.append({
"title": f"Refactor {metrics.path} (score: {metrics.refactoring_score})",
"description": (
f"File has complexity avg={metrics.complexity:.1f} max={metrics.max_complexity}, "
f"{metrics.functions} functions, {metrics.classes} classes, {metrics.lines} lines."
),
"impact": min(10, int(metrics.refactoring_score / 10)),
"effort": min(10, max(1, int(metrics.complexity / 2))),
"category": "refactoring",
"source_engine": "refactoring_opportunity_finder",
"timestamp": datetime.now(timezone.utc).isoformat(),
"metrics": {
"path": metrics.path,
"complexity": metrics.complexity,
"max_complexity": metrics.max_complexity,
"lines": metrics.lines,
"refactoring_score": metrics.refactoring_score,
}
})
# Sort by score descending
proposals.sort(key=lambda p: p.get("metrics", {}).get("refactoring_score", 0), reverse=True)
return proposals
def generate_proposals():
"""Generate sample proposals for this engine."""
# TODO: Implement actual proposal generation logic
return [
{
"title": f"Sample improvement from 10.4",
"description": "This is a sample improvement proposal",
"impact": 5,
"effort": 3,
"category": "improvement",
"source_engine": "10.4",
"timestamp": datetime.now(timezone.utc).isoformat()
}
]
def main():
parser = argparse.ArgumentParser(description="Find refactoring opportunities")
parser.add_argument("--root", default=".", help="Root directory to scan")
parser = argparse.ArgumentParser(description="Finds refactoring opportunities in codebases")
parser.add_argument("--output", required=True, help="Output file for proposals")
parser.add_argument("--dry-run", action="store_true", help="Don't write output file")
parser.add_argument("--min-score", type=float, default=30.0, help="Minimum score threshold")
args = parser.parse_args()
proposals = generate_proposals(args.root, args.min_score)
proposals = generate_proposals()
if not args.dry_run:
with open(args.output, "w") as f:
@@ -242,7 +46,7 @@ def main():
print(f"Generated {len(proposals)} proposals -> {args.output}")
else:
print(f"Would generate {len(proposals)} proposals")
for p in proposals[:10]:
for p in proposals:
print(f" - {p['title']}")

View File

@@ -1,212 +1,72 @@
#!/usr/bin/env python3
"""
Comprehensive test script for knowledge extraction prompt.
Validates prompt structure, requirements, and consistency.
"""
import json
import re
"""Comprehensive tests for knowledge extraction prompt."""
import json, re
from pathlib import Path
def test_prompt_structure():
"""Test that the prompt has the required structure."""
prompt_path = Path("templates/harvest-prompt.md")
if not prompt_path.exists():
return False, "harvest-prompt.md not found"
content = prompt_path.read_text()
# Check for required sections
required_sections = [
"System Prompt",
"Instructions",
"Categories",
"Output Format",
"Confidence Scoring",
"Constraints",
"Example"
]
for section in required_sections:
if section.lower() not in content.lower():
return False, f"Missing required section: {section}"
# Check for required categories
required_categories = ["fact", "pitfall", "pattern", "tool-quirk", "question"]
for category in required_categories:
if category not in content:
return False, f"Missing required category: {category}"
# Check for required output fields
required_fields = ["fact", "category", "repo", "confidence"]
for field in required_fields:
if field not in content:
return False, f"Missing required output field: {field}"
# Check prompt size (should be ~1k tokens, roughly 4k chars)
if len(content) > 5000:
return False, f"Prompt too large: {len(content)} chars (max ~5000)"
if len(content) < 1000:
return False, f"Prompt too small: {len(content)} chars (min ~1000)"
def check_prompt_structure():
p = Path("templates/harvest-prompt.md")
if not p.exists(): return False, "harvest-prompt.md not found"
c = p.read_text()
for s in ["System Prompt","Instructions","Categories","Output Format","Confidence Scoring","Constraints","Example"]:
if s.lower() not in c.lower(): return False, f"Missing section: {s}"
for cat in ["fact","pitfall","pattern","tool-quirk","question"]:
if cat not in c: return False, f"Missing category: {cat}"
if len(c) > 5000: return False, f"Too large: {len(c)}"
if len(c) < 1000: return False, f"Too small: {len(c)}"
return True, "Prompt structure is valid"
def check_confidence_scoring():
c = Path("templates/harvest-prompt.md").read_text()
for l in ["0.9-1.0","0.7-0.8","0.5-0.6","0.3-0.4","0.1-0.2"]:
if l not in c: return False, f"Missing level: {l}"
return True, "Confidence scoring defined"
def check_example_quality():
c = Path("templates/harvest-prompt.md").read_text()
if "example" not in c.lower(): return False, "No examples"
m = re.search(r'"knowledge"', c[c.lower().find("example"):])
if not m: return False, "No JSON example"
return True, "Examples present"
def check_constraint_coverage():
c = Path("templates/harvest-prompt.md").read_text()
for x in ["no hallucination","explicitly","partial","failed sessions"]:
if x not in c.lower(): return False, f"Missing: {x}"
return True, "Constraints covered"
def check_test_sessions():
d = Path("test_sessions")
if not d.exists(): return False, "test_sessions/ not found"
files = list(d.glob("*.jsonl"))
if len(files) < 5: return False, f"Only {len(files)} sessions"
for f in files:
for i, line in enumerate(f.read_text().strip().split("\n"), 1):
try: json.loads(line)
except json.JSONDecodeError as e: return False, f"{f.name}:{i}: {e}"
return True, f"{len(files)} valid sessions"
def test_prompt_structure():
passed, msg = check_prompt_structure()
assert passed, msg
def test_confidence_scoring():
"""Test that confidence scoring is properly defined."""
prompt_path = Path("templates/harvest-prompt.md")
content = prompt_path.read_text()
# Check for confidence scale definitions
confidence_levels = [
("0.9-1.0", "explicitly stated"),
("0.7-0.8", "clearly implied"),
("0.5-0.6", "suggested"),
("0.3-0.4", "inferred"),
("0.1-0.2", "speculative")
]
for level, description in confidence_levels:
if level not in content:
return False, f"Missing confidence level: {level}"
if description.lower() not in content.lower():
return False, f"Missing confidence description: {description}"
return True, "Confidence scoring is properly defined"
passed, msg = check_confidence_scoring()
assert passed, msg
def test_example_quality():
"""Test that examples are clear and complete."""
prompt_path = Path("templates/harvest-prompt.md")
content = prompt_path.read_text()
# Check for example input/output
if "example" not in content.lower():
return False, "No examples provided"
# Check that example includes all categories
example_section = content[content.lower().find("example"):]
# Look for JSON example
json_match = re.search(r'\{[\s\S]*"knowledge"[\s\S]*\}', example_section)
if not json_match:
return False, "No JSON example found"
example_json = json_match.group(0)
# Check for all categories in example
for category in ["fact", "pitfall", "pattern", "tool-quirk", "question"]:
if category not in example_json:
return False, f"Example missing category: {category}"
return True, "Examples are clear and complete"
passed, msg = check_example_quality()
assert passed, msg
def test_constraint_coverage():
"""Test that constraints cover all requirements."""
prompt_path = Path("templates/harvest-prompt.md")
content = prompt_path.read_text()
required_constraints = [
"No hallucination",
"only extract",
"explicitly",
"partial",
"failed sessions",
"1k tokens"
]
for constraint in required_constraints:
if constraint.lower() not in content.lower():
return False, f"Missing constraint: {constraint}"
return True, "Constraints cover all requirements"
passed, msg = check_constraint_coverage()
assert passed, msg
def test_test_sessions():
"""Test that test sessions exist and are valid."""
test_sessions_dir = Path("test_sessions")
if not test_sessions_dir.exists():
return False, "test_sessions directory not found"
session_files = list(test_sessions_dir.glob("*.jsonl"))
if len(session_files) < 5:
return False, f"Only {len(session_files)} test sessions found, need 5"
# Check each session file
for session_file in session_files:
content = session_file.read_text()
lines = content.strip().split("\n")
# Check that each line is valid JSON
for i, line in enumerate(lines, 1):
try:
json.loads(line)
except json.JSONDecodeError as e:
return False, f"Invalid JSON in {session_file.name}, line {i}: {e}"
return True, f"Found {len(session_files)} valid test sessions"
def run_all_tests():
"""Run all tests and return results."""
tests = [
("Prompt Structure", test_prompt_structure),
("Confidence Scoring", test_confidence_scoring),
("Example Quality", test_example_quality),
("Constraint Coverage", test_constraint_coverage),
("Test Sessions", test_test_sessions)
]
results = []
all_passed = True
for test_name, test_func in tests:
try:
passed, message = test_func()
results.append({
"test": test_name,
"passed": passed,
"message": message
})
if not passed:
all_passed = False
except Exception as e:
results.append({
"test": test_name,
"passed": False,
"message": f"Error: {str(e)}"
})
all_passed = False
# Print results
print("=" * 60)
print("HARVEST PROMPT TEST RESULTS")
print("=" * 60)
for result in results:
status = "✓ PASS" if result["passed"] else "✗ FAIL"
print(f"{status}: {result['test']}")
print(f" {result['message']}")
print()
print("=" * 60)
if all_passed:
print("ALL TESTS PASSED!")
else:
print("SOME TESTS FAILED!")
print("=" * 60)
return all_passed, results
passed, msg = check_test_sessions()
assert passed, msg
if __name__ == "__main__":
all_passed, results = run_all_tests()
# Save results to file
with open("test_results.json", "w") as f:
json.dump({
"all_passed": all_passed,
"results": results,
"timestamp": "2026-04-14T19:05:00Z"
}, f, indent=2)
print(f"Results saved to test_results.json")
# Exit with appropriate code
exit(0 if all_passed else 1)
checks = [check_prompt_structure, check_confidence_scoring, check_example_quality, check_constraint_coverage, check_test_sessions]
for fn in checks:
ok, msg = fn()
print(f"{'PASS' if ok else 'FAIL'}: {fn.__name__} -- {msg}")