Compare commits
2 Commits
step35/113
...
step35/162
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
180464cc5e | ||
|
|
4b5a675355 |
366
scripts/code_duplication_detector.py
Normal file
366
scripts/code_duplication_detector.py
Normal file
@@ -0,0 +1,366 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Code Duplication Detector — Issue #162
|
||||
|
||||
Finds duplicate functions and code blocks across Python source files.
|
||||
Reports duplication percentage and outputs a duplication report.
|
||||
|
||||
Usage:
|
||||
python3 scripts/code_duplication_detector.py --output reports/code_duplication.json
|
||||
python3 scripts/code_duplication_detector.py --directory scripts/ --dry-run
|
||||
python3 scripts/code_duplication_detector.py --test # Run built-in test
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Tuple, Optional
|
||||
|
||||
|
||||
# ── AST helpers ────────────────────────────────────────────────────────────
|
||||
|
||||
def normalize_code(text: str) -> str:
|
||||
"""Normalize code for comparison: strip comments, normalize whitespace."""
|
||||
# Remove comments (both # and docstring triple-quote strings)
|
||||
text = re.sub(r'#.*$', '', text, flags=re.MULTILINE)
|
||||
text = re.sub(r'""".*?"""', '', text, flags=re.DOTALL)
|
||||
text = re.sub(r"'''.*?'''", '', text, flags=re.DOTALL)
|
||||
# Normalize whitespace
|
||||
text = re.sub(r'\s+', ' ', text).strip()
|
||||
return text.lower()
|
||||
|
||||
|
||||
def code_hash(text: str) -> str:
|
||||
"""SHA256 hash of normalized code for exact duplicate detection."""
|
||||
normalized = normalize_code(text)
|
||||
return hashlib.sha256(normalized.encode('utf-8')).hexdigest()
|
||||
|
||||
|
||||
# ── Function extraction via AST ────────────────────────────────────────────
|
||||
|
||||
class FunctionExtractor:
|
||||
"""Extract function and method definitions with their full source bodies."""
|
||||
|
||||
def __init__(self, source: str, filepath: str):
|
||||
self.source = source
|
||||
self.filepath = filepath
|
||||
self.lines = source.splitlines()
|
||||
self.functions: List[Dict] = []
|
||||
|
||||
def _get_source_segment(self, start_lineno: int, end_lineno: int) -> str:
|
||||
"""Get source code from start to end line (1-indexed, inclusive)."""
|
||||
# AST end_lineno is inclusive
|
||||
start_idx = start_lineno - 1
|
||||
end_idx = end_lineno
|
||||
return '\n'.join(self.lines[start_idx:end_idx])
|
||||
|
||||
def visit(self, tree):
|
||||
"""Collect all function and async function definitions."""
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.FunctionDef) or isinstance(node, ast.AsyncFunctionDef):
|
||||
# Get the full source for this function including decorators
|
||||
start = node.lineno
|
||||
end = node.end_lineno
|
||||
body_source = self._get_source_segment(start, end)
|
||||
|
||||
# Also collect parent class name if this is a method
|
||||
class_name = None
|
||||
parent = node.parent if hasattr(node, 'parent') else None
|
||||
if parent and isinstance(parent, ast.ClassDef):
|
||||
class_name = parent.name
|
||||
|
||||
self.functions.append({
|
||||
'name': node.name,
|
||||
'file': self.filepath,
|
||||
'start_line': start,
|
||||
'end_line': end,
|
||||
'body': body_source,
|
||||
'class_name': class_name,
|
||||
'is_method': class_name is not None,
|
||||
})
|
||||
|
||||
|
||||
import ast
|
||||
|
||||
class ParentNodeVisitor(ast.NodeVisitor):
|
||||
"""Annotate nodes with parent references."""
|
||||
def __init__(self, parent=None):
|
||||
self.parent = parent
|
||||
|
||||
def generic_visit(self, node):
|
||||
node.parent = self.parent
|
||||
for child in ast.iter_child_nodes(node):
|
||||
self.__class__(child).parent = node
|
||||
super().generic_visit(node)
|
||||
|
||||
|
||||
def extract_functions_from_file(filepath: str) -> List[Dict]:
|
||||
"""Extract all function definitions from a Python file."""
|
||||
try:
|
||||
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
|
||||
source = f.read()
|
||||
tree = ast.parse(source, filename=str(filepath))
|
||||
|
||||
# Annotate with parent references
|
||||
for node in ast.walk(tree):
|
||||
for child in ast.iter_child_nodes(node):
|
||||
child.parent = node
|
||||
|
||||
extractor = FunctionExtractor(source, str(filepath))
|
||||
extractor.visit(tree)
|
||||
return extractor.functions
|
||||
except (SyntaxError, UnicodeDecodeError, OSError) as e:
|
||||
return []
|
||||
|
||||
|
||||
def scan_directory(directory: str, extensions: Tuple[str, ...] = ('.py',)) -> List[Dict]:
|
||||
"""Scan directory for Python files and extract all functions."""
|
||||
all_functions = []
|
||||
path = Path(directory)
|
||||
|
||||
for filepath in path.rglob('*'):
|
||||
if filepath.is_file() and filepath.suffix in extensions:
|
||||
# Skip common non-source dirs
|
||||
parts = filepath.parts
|
||||
if any(ex in parts for ex in ('__pycache__', 'node_modules', '.git', 'venv', '.venv', 'dist', 'build')):
|
||||
continue
|
||||
if filepath.name.startswith('.'):
|
||||
continue
|
||||
|
||||
functions = extract_functions_from_file(str(filepath))
|
||||
all_functions.extend(functions)
|
||||
|
||||
return all_functions
|
||||
|
||||
|
||||
# ── Duplicate detection ─────────────────────────────────────────────────────
|
||||
|
||||
def find_duplicates(functions: List[Dict], similarity_threshold: float = 0.95) -> Dict:
|
||||
"""
|
||||
Find duplicate and near-duplicate functions.
|
||||
|
||||
Returns dict with:
|
||||
- exact_duplicates: {hash: [function_info, ...]}
|
||||
- near_duplicates: [[function_info, ...], ...]
|
||||
- stats: total_functions, unique_exact, exact_dupe_count, near_dupe_count
|
||||
"""
|
||||
# Phase 1: Exact duplicates by code hash
|
||||
hash_groups: Dict[str, List[Dict]] = defaultdict(list)
|
||||
for func in functions:
|
||||
h = code_hash(func['body'])
|
||||
hash_groups[h].append(func)
|
||||
|
||||
exact_duplicates = {h: group for h, group in hash_groups.items() if len(group) > 1}
|
||||
exact_dupe_count = sum(len(group) - 1 for group in exact_duplicates.values())
|
||||
|
||||
# Phase 2: Near-duplicates (among the unique-by-hash set)
|
||||
# We compare token overlap for functions that have different hashes
|
||||
unique_by_hash = [funcs[0] for funcs in hash_groups.values()]
|
||||
near_duplicate_groups = []
|
||||
|
||||
# Simple token-based similarity
|
||||
def tokenize(code: str) -> set:
|
||||
return set(re.findall(r'[a-zA-Z_][a-zA-Z0-9_]*', code.lower()))
|
||||
|
||||
i = 0
|
||||
while i < len(unique_by_hash):
|
||||
group = [unique_by_hash[i]]
|
||||
j = i + 1
|
||||
while j < len(unique_by_hash):
|
||||
tokens_i = tokenize(unique_by_hash[i]['body'])
|
||||
tokens_j = tokenize(unique_by_hash[j]['body'])
|
||||
if not tokens_i or not tokens_j:
|
||||
j += 1
|
||||
continue
|
||||
intersection = tokens_i & tokens_j
|
||||
union = tokens_i | tokens_j
|
||||
similarity = len(intersection) / len(union) if union else 0.0
|
||||
|
||||
if similarity >= similarity_threshold:
|
||||
group.append(unique_by_hash[j])
|
||||
unique_by_hash.pop(j)
|
||||
else:
|
||||
j += 1
|
||||
|
||||
if len(group) > 1:
|
||||
near_duplicate_groups.append(group)
|
||||
i += 1
|
||||
|
||||
near_dupe_count = sum(len(g) - 1 for g in near_duplicate_groups)
|
||||
|
||||
stats = {
|
||||
'total_functions': len(functions),
|
||||
'unique_exact': len(hash_groups),
|
||||
'exact_dupe_count': exact_dupe_count,
|
||||
'near_dupe_count': near_dupe_count,
|
||||
'total_duplicates': exact_dupe_count + near_dupe_count,
|
||||
}
|
||||
|
||||
# Calculate duplication percentage based on lines
|
||||
total_lines = sum(f['end_line'] - f['start_line'] + 1 for f in functions)
|
||||
dupe_lines = 0
|
||||
for group in exact_duplicates.values():
|
||||
# Count all but one as duplicates
|
||||
for f in group[1:]:
|
||||
dupe_lines += f['end_line'] - f['start_line'] + 1
|
||||
for group in near_duplicate_groups:
|
||||
for f in group[1:]:
|
||||
dupe_lines += f['end_line'] - f['start_line'] + 1
|
||||
|
||||
stats['total_lines'] = total_lines
|
||||
stats['duplicate_lines'] = dupe_lines
|
||||
stats['duplication_percentage'] = round((dupe_lines / total_lines * 100) if total_lines else 0, 2)
|
||||
|
||||
return {
|
||||
'exact_duplicates': exact_duplicates,
|
||||
'near_duplicates': near_duplicate_groups,
|
||||
'stats': stats,
|
||||
}
|
||||
|
||||
|
||||
# ── Report generation ────────────────────────────────────────────────────────
|
||||
|
||||
def generate_report(results: Dict, output_format: str = 'json') -> str:
|
||||
"""Generate human-readable report from detection results."""
|
||||
stats = results['stats']
|
||||
|
||||
if output_format == 'json':
|
||||
return json.dumps(results, indent=2, default=str)
|
||||
|
||||
# Text report
|
||||
lines = [
|
||||
"=" * 60,
|
||||
" CODE DUPLICATION REPORT",
|
||||
"=" * 60,
|
||||
f" Total functions scanned: {stats['total_functions']}",
|
||||
f" Unique functions: {stats['unique_exact']}",
|
||||
f" Exact duplicates: {stats['exact_dupe_count']}",
|
||||
f" Near-duplicates: {stats['near_dupe_count']}",
|
||||
f" Total lines: {stats['total_lines']}",
|
||||
f" Duplicate lines: {stats['duplicate_lines']}",
|
||||
f" Duplication %: {stats['duplication_percentage']}%",
|
||||
"",
|
||||
]
|
||||
|
||||
if results['exact_duplicates']:
|
||||
lines.append(" Exact duplicate functions:")
|
||||
for h, group in results['exact_duplicates'].items():
|
||||
first = group[0]
|
||||
lines.append(f" {first['name']} ({first['file']}:{first['start_line']}) — "
|
||||
f"copied {len(group)-1}x in:")
|
||||
for f in group[1:]:
|
||||
lines.append(f" → {f['file']}:{f['start_line']}")
|
||||
lines.append("")
|
||||
|
||||
if results['near_duplicates']:
|
||||
lines.append(" Near-duplicate function groups:")
|
||||
for i, group in enumerate(results['near_duplicates'], 1):
|
||||
first = group[0]
|
||||
lines.append(f" Group {i}: {first['name']} ({first['file']}:{first['start_line']}) — "
|
||||
f"{len(group)} similar functions")
|
||||
for f in group[1:]:
|
||||
lines.append(f" → {f['file']}:{f['start_line']}")
|
||||
lines.append("")
|
||||
|
||||
lines.append("=" * 60)
|
||||
return '\n'.join(lines)
|
||||
|
||||
|
||||
# ── CLI ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Code Duplication Detector")
|
||||
parser.add_argument('--directory', default='.',
|
||||
help='Directory to scan (default: current directory)')
|
||||
parser.add_argument('--output', help='Output file for JSON report')
|
||||
parser.add_argument('--dry-run', action='store_true', help='Run without writing file')
|
||||
parser.add_argument('--threshold', type=float, default=0.95,
|
||||
help='Similarity threshold for near-dupes (default: 0.95)')
|
||||
parser.add_argument('--json', action='store_true', help='JSON output to stdout')
|
||||
parser.add_argument('--test', action='store_true', help='Run built-in test')
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.test:
|
||||
_run_test()
|
||||
return
|
||||
|
||||
# Scan
|
||||
functions = scan_directory(args.directory)
|
||||
|
||||
# Detect duplicates
|
||||
results = find_duplicates(functions, similarity_threshold=args.threshold)
|
||||
stats = results['stats']
|
||||
|
||||
# Output
|
||||
if args.json:
|
||||
print(json.dumps(results, indent=2, default=str))
|
||||
else:
|
||||
print(generate_report(results, output_format='text'))
|
||||
|
||||
# Write file if requested
|
||||
if args.output and not args.dry_run:
|
||||
os.makedirs(os.path.dirname(args.output) or '.', exist_ok=True)
|
||||
with open(args.output, 'w') as f:
|
||||
json.dump(results, f, indent=2, default=str)
|
||||
print(f"\nReport written to: {args.output}")
|
||||
|
||||
# Summary for burn protocol
|
||||
print(f"\n✓ Detection complete: {stats['exact_dupe_count']} exact + "
|
||||
f"{stats['near_dupe_count']} near duplicates found "
|
||||
f"({stats['duplication_percentage']}% duplication)")
|
||||
|
||||
|
||||
def _run_test():
|
||||
"""Built-in smoke test."""
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create test files with duplicate code
|
||||
f1 = Path(tmpdir) / 'mod1.py'
|
||||
f1.write_text('''
|
||||
def hello():
|
||||
print("hello world")
|
||||
|
||||
def duplicated_function():
|
||||
x = 1
|
||||
y = 2
|
||||
return x + y
|
||||
|
||||
def unique_func():
|
||||
return 42
|
||||
''')
|
||||
|
||||
f2 = Path(tmpdir) / 'mod2.py'
|
||||
f2.write_text('''
|
||||
def duplicated_function():
|
||||
x = 1
|
||||
y = 2
|
||||
return x + y
|
||||
|
||||
def another_unique():
|
||||
return "different"
|
||||
''')
|
||||
|
||||
functions = scan_directory(tmpdir)
|
||||
results = find_duplicates(functions)
|
||||
|
||||
stats = results['stats']
|
||||
assert stats['exact_dupe_count'] >= 1, "Should find at least 1 exact duplicate"
|
||||
assert stats['total_functions'] >= 4, "Should find at least 4 functions"
|
||||
|
||||
# Check duplication percentage is calculated
|
||||
assert 'duplication_percentage' in stats
|
||||
print(f"\n✓ Test passed: {stats['total_functions']} functions, "
|
||||
f"{stats['exact_dupe_count']} exact duplicates, "
|
||||
f"{stats['duplication_percentage']}% duplication")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
351
scripts/pr_complexity_scorer.py
Normal file
351
scripts/pr_complexity_scorer.py
Normal file
@@ -0,0 +1,351 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
PR Complexity Scorer - Estimate review effort for PRs.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from dataclasses import dataclass, asdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
|
||||
DEPENDENCY_FILES = {
|
||||
"requirements.txt", "pyproject.toml", "setup.py", "setup.cfg",
|
||||
"Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile",
|
||||
"go.mod", "Cargo.toml", "pom.xml", "build.gradle"
|
||||
}
|
||||
|
||||
TEST_PATTERNS = [
|
||||
r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$",
|
||||
r"spec/.*\.rb$", r".*_spec\.rb$",
|
||||
r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$"
|
||||
]
|
||||
|
||||
WEIGHT_FILES = 0.25
|
||||
WEIGHT_LINES = 0.25
|
||||
WEIGHT_DEPS = 0.30
|
||||
WEIGHT_TEST_COV = 0.20
|
||||
|
||||
SMALL_FILES = 5
|
||||
MEDIUM_FILES = 20
|
||||
LARGE_FILES = 50
|
||||
|
||||
SMALL_LINES = 100
|
||||
MEDIUM_LINES = 500
|
||||
LARGE_LINES = 2000
|
||||
|
||||
TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120}
|
||||
|
||||
|
||||
@dataclass
|
||||
class PRComplexity:
|
||||
pr_number: int
|
||||
title: str
|
||||
files_changed: int
|
||||
additions: int
|
||||
deletions: int
|
||||
has_dependency_changes: bool
|
||||
test_coverage_delta: Optional[int]
|
||||
score: int
|
||||
estimated_minutes: int
|
||||
reasons: List[str]
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return asdict(self)
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
def __init__(self, token: str):
|
||||
self.token = token
|
||||
self.base_url = GITEA_BASE.rstrip("/")
|
||||
|
||||
def _request(self, path: str, params: Dict = None) -> Any:
|
||||
url = f"{self.base_url}{path}"
|
||||
if params:
|
||||
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
|
||||
url += f"?{qs}"
|
||||
|
||||
req = urllib.request.Request(url)
|
||||
req.add_header("Authorization", f"token {self.token}")
|
||||
req.add_header("Content-Type", "application/json")
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
except urllib.error.HTTPError as e:
|
||||
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
|
||||
return None
|
||||
except urllib.error.URLError as e:
|
||||
print(f"Network error: {e}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
def get_open_prs(self, org: str, repo: str) -> List[Dict]:
|
||||
prs = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"})
|
||||
if not batch:
|
||||
break
|
||||
prs.extend(batch)
|
||||
if len(batch) < 50:
|
||||
break
|
||||
page += 1
|
||||
return prs
|
||||
|
||||
def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]:
|
||||
files = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = self._request(
|
||||
f"/repos/{org}/{repo}/pulls/{pr_number}/files",
|
||||
{"limit": 100, "page": page}
|
||||
)
|
||||
if not batch:
|
||||
break
|
||||
files.extend(batch)
|
||||
if len(batch) < 100:
|
||||
break
|
||||
page += 1
|
||||
return files
|
||||
|
||||
def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool:
|
||||
data = json.dumps({"body": body}).encode("utf-8")
|
||||
req = urllib.request.Request(
|
||||
f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments",
|
||||
data=data,
|
||||
method="POST",
|
||||
headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"}
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return resp.status in (200, 201)
|
||||
except urllib.error.HTTPError:
|
||||
return False
|
||||
|
||||
|
||||
def is_dependency_file(filename: str) -> bool:
|
||||
return any(filename.endswith(dep) for dep in DEPENDENCY_FILES)
|
||||
|
||||
|
||||
def is_test_file(filename: str) -> bool:
|
||||
return any(re.search(pattern, filename) for pattern in TEST_PATTERNS)
|
||||
|
||||
|
||||
def score_pr(
|
||||
files_changed: int,
|
||||
additions: int,
|
||||
deletions: int,
|
||||
has_dependency_changes: bool,
|
||||
test_coverage_delta: Optional[int] = None
|
||||
) -> tuple[int, int, List[str]]:
|
||||
score = 1.0
|
||||
reasons = []
|
||||
|
||||
# Files changed
|
||||
if files_changed <= SMALL_FILES:
|
||||
fscore = 1.0
|
||||
reasons.append("small number of files changed")
|
||||
elif files_changed <= MEDIUM_FILES:
|
||||
fscore = 2.0
|
||||
reasons.append("moderate number of files changed")
|
||||
elif files_changed <= LARGE_FILES:
|
||||
fscore = 2.5
|
||||
reasons.append("large number of files changed")
|
||||
else:
|
||||
fscore = 3.0
|
||||
reasons.append("very large PR spanning many files")
|
||||
|
||||
# Lines changed
|
||||
total_lines = additions + deletions
|
||||
if total_lines <= SMALL_LINES:
|
||||
lscore = 1.0
|
||||
reasons.append("small change size")
|
||||
elif total_lines <= MEDIUM_LINES:
|
||||
lscore = 2.0
|
||||
reasons.append("moderate change size")
|
||||
elif total_lines <= LARGE_LINES:
|
||||
lscore = 3.0
|
||||
reasons.append("large change size")
|
||||
else:
|
||||
lscore = 4.0
|
||||
reasons.append("very large change")
|
||||
|
||||
# Dependency changes
|
||||
if has_dependency_changes:
|
||||
dscore = 2.5
|
||||
reasons.append("dependency changes (architectural impact)")
|
||||
else:
|
||||
dscore = 0.0
|
||||
|
||||
# Test coverage delta
|
||||
tscore = 0.0
|
||||
if test_coverage_delta is not None:
|
||||
if test_coverage_delta > 0:
|
||||
reasons.append(f"test additions (+{test_coverage_delta} test files)")
|
||||
tscore = -min(2.0, test_coverage_delta / 2.0)
|
||||
elif test_coverage_delta < 0:
|
||||
reasons.append(f"test removals ({abs(test_coverage_delta)} test files)")
|
||||
tscore = min(2.0, abs(test_coverage_delta) * 0.5)
|
||||
else:
|
||||
reasons.append("test coverage change not assessed")
|
||||
|
||||
# Weighted sum, scaled by 3 to use full 1-10 range
|
||||
bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV)
|
||||
scaled_bonus = bonus * 3.0
|
||||
score = 1.0 + scaled_bonus
|
||||
|
||||
final_score = max(1, min(10, int(round(score))))
|
||||
est_minutes = TIME_PER_POINT.get(final_score, 30)
|
||||
|
||||
return final_score, est_minutes, reasons
|
||||
|
||||
|
||||
def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity:
|
||||
pr_num = pr_data["number"]
|
||||
title = pr_data.get("title", "")
|
||||
files = client.get_pr_files(org, repo, pr_num)
|
||||
|
||||
additions = sum(f.get("additions", 0) for f in files)
|
||||
deletions = sum(f.get("deletions", 0) for f in files)
|
||||
filenames = [f.get("filename", "") for f in files]
|
||||
|
||||
has_deps = any(is_dependency_file(f) for f in filenames)
|
||||
|
||||
test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", "")))
|
||||
test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", "")))
|
||||
test_delta = test_added - test_removed if (test_added or test_removed) else None
|
||||
|
||||
score, est_min, reasons = score_pr(
|
||||
files_changed=len(files),
|
||||
additions=additions,
|
||||
deletions=deletions,
|
||||
has_dependency_changes=has_deps,
|
||||
test_coverage_delta=test_delta
|
||||
)
|
||||
|
||||
return PRComplexity(
|
||||
pr_number=pr_num,
|
||||
title=title,
|
||||
files_changed=len(files),
|
||||
additions=additions,
|
||||
deletions=deletions,
|
||||
has_dependency_changes=has_deps,
|
||||
test_coverage_delta=test_delta,
|
||||
score=score,
|
||||
estimated_minutes=est_min,
|
||||
reasons=reasons
|
||||
)
|
||||
|
||||
|
||||
def build_comment(complexity: PRComplexity) -> str:
|
||||
change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines"
|
||||
deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else ""
|
||||
test_note = ""
|
||||
if complexity.test_coverage_delta is not None:
|
||||
if complexity.test_coverage_delta > 0:
|
||||
test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added"
|
||||
elif complexity.test_coverage_delta < 0:
|
||||
test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed"
|
||||
|
||||
comment = f"## 📊 PR Complexity Analysis\n\n"
|
||||
comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n"
|
||||
comment += f"| Metric | Value |\n|--------|-------|\n"
|
||||
comment += f"| Changes | {change_desc} |\n"
|
||||
comment += f"| Complexity Score | **{complexity.score}/10** |\n"
|
||||
comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n"
|
||||
comment += f"### Scoring rationale:"
|
||||
for r in complexity.reasons:
|
||||
comment += f"\n- {r}"
|
||||
if deps_note:
|
||||
comment += deps_note
|
||||
if test_note:
|
||||
comment += test_note
|
||||
comment += f"\n\n---\n"
|
||||
comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*"
|
||||
return comment
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="PR Complexity Scorer")
|
||||
parser.add_argument("--org", default="Timmy_Foundation")
|
||||
parser.add_argument("--repo", default="compounding-intelligence")
|
||||
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
|
||||
parser.add_argument("--dry-run", action="store_true")
|
||||
parser.add_argument("--apply", action="store_true")
|
||||
parser.add_argument("--output", default="metrics/pr_complexity.json")
|
||||
args = parser.parse_args()
|
||||
|
||||
token_path = args.token
|
||||
if os.path.exists(token_path):
|
||||
with open(token_path) as f:
|
||||
token = f.read().strip()
|
||||
else:
|
||||
token = args.token
|
||||
|
||||
if not token:
|
||||
print("ERROR: No Gitea token provided", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
client = GiteaClient(token)
|
||||
|
||||
print(f"Fetching open PRs for {args.org}/{args.repo}...")
|
||||
prs = client.get_open_prs(args.org, args.repo)
|
||||
if not prs:
|
||||
print("No open PRs found.")
|
||||
sys.exit(0)
|
||||
|
||||
print(f"Found {len(prs)} open PR(s). Analyzing...")
|
||||
|
||||
results = []
|
||||
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for pr in prs:
|
||||
pr_num = pr["number"]
|
||||
title = pr.get("title", "")
|
||||
print(f" Analyzing PR #{pr_num}: {title[:60]}")
|
||||
|
||||
try:
|
||||
complexity = analyze_pr(client, args.org, args.repo, pr)
|
||||
results.append(complexity.to_dict())
|
||||
|
||||
comment = build_comment(complexity)
|
||||
|
||||
if args.dry_run:
|
||||
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]")
|
||||
elif args.apply:
|
||||
success = client.post_comment(args.org, args.repo, pr_num, comment)
|
||||
status = "[commented]" if success else "[FAILED]"
|
||||
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}")
|
||||
else:
|
||||
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]")
|
||||
|
||||
except Exception as e:
|
||||
print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr)
|
||||
|
||||
with open(args.output, "w") as f:
|
||||
json.dump({
|
||||
"org": args.org,
|
||||
"repo": args.repo,
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"pr_count": len(results),
|
||||
"results": results
|
||||
}, f, indent=2)
|
||||
|
||||
if results:
|
||||
scores = [r["score"] for r in results]
|
||||
print(f"\nResults saved to {args.output}")
|
||||
print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}")
|
||||
else:
|
||||
print("\nNo results to save.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,249 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Security Patch Applier — 5.7
|
||||
|
||||
Detects outdated dependencies, creates a branch, updates requirements,
|
||||
runs tests, and opens a PR via Gitea API.
|
||||
|
||||
Usage:
|
||||
python3 scripts/security_patch_applier.py
|
||||
python3 scripts/security_patch_applier.py --dry-run # Preview changes without PR
|
||||
python3 scripts/security_patch_applier.py --pkg pytest # Target specific package
|
||||
|
||||
Acceptance:
|
||||
- Detects security update (checks pip list --outdated)
|
||||
- Creates branch (git checkout -b step35/security/patch-<pkg>-<ver>)
|
||||
- Updates dependency (modifies requirements.txt)
|
||||
- Runs tests (python3 -m pytest)
|
||||
- Opens PR (Gitea API, Closes #<issue>)
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import subprocess
|
||||
import sys
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
from typing import Optional, Tuple
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent
|
||||
REQUIREMENTS_PATH = REPO_ROOT / "requirements.txt"
|
||||
GITEA_TOKEN_PATH = Path.home() / ".config" / "gitea" / "token"
|
||||
GITEA_API_BASE = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
GITEA_OWNER = "Timmy_Foundation"
|
||||
GITEA_REPO = "compounding-intelligence"
|
||||
|
||||
|
||||
def run_cmd(cmd: list[str], check: bool = True, capture: bool = True) -> subprocess.CompletedProcess:
|
||||
"""Run a subprocess, return result."""
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
cwd=REPO_ROOT,
|
||||
capture_output=capture,
|
||||
text=True
|
||||
)
|
||||
if check and result.returncode != 0:
|
||||
print(f"ERROR: {' '.join(cmd)} failed with code {result.returncode}")
|
||||
print(result.stderr)
|
||||
sys.exit(result.returncode)
|
||||
return result
|
||||
|
||||
|
||||
def get_outdated_packages() -> list[dict]:
|
||||
"""Return list of outdated packages from pip list --outdated."""
|
||||
result = run_cmd([sys.executable, "-m", "pip", "list", "--outdated", "--format=json"])
|
||||
outdated = json.loads(result.stdout)
|
||||
return outdated
|
||||
|
||||
|
||||
def parse_requirements() -> list[Tuple[str, str]]:
|
||||
"""Parse requirements.txt into list of (raw_line, package_name_lower)."""
|
||||
if not REQUIREMENTS_PATH.exists():
|
||||
print(f"ERROR: requirements.txt not found at {REQUIREMENTS_PATH}")
|
||||
sys.exit(1)
|
||||
|
||||
lines = REQUIREMENTS_PATH.read_text().splitlines()
|
||||
parsed = []
|
||||
for line in lines:
|
||||
stripped = line.strip()
|
||||
if not stripped or stripped.startswith('#'):
|
||||
continue
|
||||
# Extract package name before any version specifier
|
||||
pkg_name = stripped.split()[0].split('>=')[0].split('==')[0].split('~=')[0].split('<')[0].split('>')[0].lower()
|
||||
parsed.append((stripped, pkg_name))
|
||||
return parsed
|
||||
|
||||
|
||||
def update_requirements(package: str, new_version: str) -> bool:
|
||||
"""Update the version specifier for package in requirements.txt. Return True if changed."""
|
||||
lines = REQUIREMENTS_PATH.read_text().splitlines()
|
||||
updated = False
|
||||
new_lines = []
|
||||
for line in lines:
|
||||
stripped = line.strip()
|
||||
if not stripped or stripped.startswith('#'):
|
||||
new_lines.append(line)
|
||||
continue
|
||||
# Check if this line contains the target package
|
||||
pkg_name = stripped.split()[0].split('>=')[0].split('==')[0].split('~=')[0].split('<')[0].split('>')[0].lower()
|
||||
if pkg_name == package.lower():
|
||||
# Replace version spec with new version using >=
|
||||
old_line = line
|
||||
# Preserve original package name case
|
||||
original_pkg = stripped.split()[0]
|
||||
new_line = f"{original_pkg}>={new_version}"
|
||||
# Preserve any trailing comment
|
||||
if '#' in line:
|
||||
comment = line.split('#', 1)[1]
|
||||
new_line += f" #{comment}"
|
||||
new_lines.append(new_line)
|
||||
updated = True
|
||||
else:
|
||||
new_lines.append(line)
|
||||
if updated:
|
||||
REQUIREMENTS_PATH.write_text('\n'.join(new_lines) + '\n')
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def create_branch(branch_name: str) -> bool:
|
||||
"""Create and checkout a new branch."""
|
||||
# Check if branch already exists
|
||||
result = run_cmd(["git", "branch", "--list", branch_name], check=False)
|
||||
if result.stdout.strip():
|
||||
print(f"Branch {branch_name} already exists.")
|
||||
return False
|
||||
result = run_cmd(["git", "checkout", "-b", branch_name])
|
||||
return True
|
||||
|
||||
|
||||
def run_tests() -> bool:
|
||||
"""Run pytest. Return True if all pass."""
|
||||
print("\nRunning tests...")
|
||||
result = run_cmd([sys.executable, "-m", "pytest", "tests/test_ci_config.py", "scripts/test_*.py", "-v"], check=False)
|
||||
return result.returncode == 0
|
||||
|
||||
|
||||
def get_gitea_token() -> str:
|
||||
"""Read Gitea token from file."""
|
||||
if not GITEA_TOKEN_PATH.exists():
|
||||
print(f"ERROR: Gitea token not found at {GITEA_TOKEN_PATH}")
|
||||
sys.exit(1)
|
||||
return GITEA_TOKEN_PATH.read_text().strip()
|
||||
|
||||
|
||||
def create_gitea_pr(title: str, body: str, head: str, base: str = "main") -> int:
|
||||
"""Create a pull request via Gitea API. Return PR number."""
|
||||
token = get_gitea_token()
|
||||
payload = json.dumps({
|
||||
"title": title,
|
||||
"body": body,
|
||||
"head": head,
|
||||
"base": base
|
||||
}).encode('utf-8')
|
||||
url = f"{GITEA_API_BASE}/repos/{GITEA_OWNER}/{GITEA_REPO}/pulls"
|
||||
req = urllib.request.Request(
|
||||
url,
|
||||
data=payload,
|
||||
headers={
|
||||
"Authorization": f"token {token}",
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json"
|
||||
},
|
||||
method="POST"
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||||
data = json.loads(resp.read())
|
||||
return data["number"]
|
||||
except urllib.error.HTTPError as e:
|
||||
body = e.read().decode('utf-8')
|
||||
print(f"ERROR: Gitea API returned {e.code}: {body}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Security Patch Applier — detect, fix, PR")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Preview without modifying files or opening PR")
|
||||
parser.add_argument("--pkg", help="Target specific package (skip detection)")
|
||||
parser.add_argument("--version", help="Specific version to update to (requires --pkg)")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Step 1: Detect outdated packages (security patches)
|
||||
if args.pkg:
|
||||
# Manual mode
|
||||
if not args.version:
|
||||
print("ERROR: --version required when using --pkg")
|
||||
sys.exit(1)
|
||||
outdated = [{"name": args.pkg, "latest_version": args.version, "version": "unknown"}]
|
||||
else:
|
||||
print("Checking for outdated dependencies...")
|
||||
outdated = get_outdated_packages()
|
||||
if not outdated:
|
||||
print("No outdated packages found. System is up-to-date.")
|
||||
sys.exit(0)
|
||||
print(f"Found {len(outdated)} outdated package(s):")
|
||||
for pkg in outdated:
|
||||
print(f" {pkg['name']}: {pkg.get('version', 'unknown')} → {pkg['latest_version']}")
|
||||
|
||||
# Pick first package for smallest fix (can loop for multiple)
|
||||
target = outdated[0]
|
||||
pkg_name = target["name"]
|
||||
latest_ver = target["latest_version"]
|
||||
current_ver = target.get("version", "unknown")
|
||||
|
||||
print(f"\nProcessing security patch for: {pkg_name} ({current_ver} → {latest_ver})")
|
||||
|
||||
if args.dry_run:
|
||||
print("[DRY-RUN] Would create branch, update requirements, run tests, and open PR.")
|
||||
sys.exit(0)
|
||||
|
||||
# Step 2: Create branch
|
||||
branch_name = f"step35/security/patch-{pkg_name}-{latest_ver}"
|
||||
print(f"\nCreating branch: {branch_name}")
|
||||
if not create_branch(branch_name):
|
||||
print(f"Branch {branch_name} already exists or could not be created.")
|
||||
# Continue anyway? Let's exit
|
||||
sys.exit(1)
|
||||
|
||||
# Step 3: Update requirements.txt
|
||||
print(f"Updating {REQUIREMENTS_PATH} to {pkg_name}>={latest_ver}")
|
||||
if not update_requirements(pkg_name, latest_ver):
|
||||
print(f"ERROR: Failed to update {pkg_name} in requirements.txt")
|
||||
sys.exit(1)
|
||||
print(f"Updated requirements.txt")
|
||||
|
||||
# Step 4: Run tests
|
||||
if not run_tests():
|
||||
print("ERROR: Tests failed. Aborting PR creation.")
|
||||
# Could revert branch? For minimal fix, just exit with error
|
||||
sys.exit(1)
|
||||
print("Tests passed.")
|
||||
|
||||
# Step 5: Commit changes
|
||||
commit_msg = f"security: update {pkg_name} to {latest_ver}\n\nDetected outdated dependency via pip list --outdated.\n\nRefs: #113"
|
||||
run_cmd(["git", "add", "requirements.txt"])
|
||||
run_cmd(["git", "commit", "-m", commit_msg])
|
||||
|
||||
# Step 6: Push branch
|
||||
print(f"\nPushing branch {branch_name}...")
|
||||
result = run_cmd(["git", "push", "origin", branch_name], check=False)
|
||||
if result.returncode != 0:
|
||||
print(f"ERROR: Push failed: {result.stderr}")
|
||||
sys.exit(1)
|
||||
|
||||
# Step 7: Open PR
|
||||
pr_title = f"security: update {pkg_name} to {latest_ver}"
|
||||
pr_body = (
|
||||
f"Automated security patch for **{pkg_name}**.\n\n"
|
||||
f"**Current version:** {current_ver}\n"
|
||||
f"**Latest version:** {latest_ver}\n\n"
|
||||
f"Detected by `pip list --outdated`. Tests passed locally.\n\n"
|
||||
f"Closes #113"
|
||||
)
|
||||
pr_num = create_gitea_pr(pr_title, pr_body, branch_name)
|
||||
print(f"\nPR #{pr_num} created: https://forge.alexanderwhitestone.com/{GITEA_OWNER}/{GITEA_REPO}/pulls/{pr_num}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
168
scripts/test_code_duplication_detector.py
Normal file
168
scripts/test_code_duplication_detector.py
Normal file
@@ -0,0 +1,168 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Smoke test for code duplication detector — verifies:
|
||||
- Function extraction from Python files
|
||||
- Exact duplicate detection
|
||||
- Near-duplicate detection (token similarity)
|
||||
- Report generation and stats
|
||||
- JSON output format
|
||||
"""
|
||||
|
||||
import json
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
SCRIPT_DIR = Path(__file__).parent.absolute()
|
||||
sys.path.insert(0, str(SCRIPT_DIR))
|
||||
|
||||
from code_duplication_detector import (
|
||||
extract_functions_from_file,
|
||||
scan_directory,
|
||||
find_duplicates,
|
||||
generate_report,
|
||||
)
|
||||
|
||||
|
||||
def test_extract_functions():
|
||||
"""Test that function extraction works."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
test_file = Path(tmpdir) / 'sample.py'
|
||||
test_file.write_text('''
|
||||
def foo():
|
||||
return 1
|
||||
|
||||
def bar():
|
||||
return 2
|
||||
|
||||
class MyClass:
|
||||
def method(self):
|
||||
return 3
|
||||
''')
|
||||
functions = extract_functions_from_file(str(test_file))
|
||||
assert len(functions) == 3, f"Expected 3 functions, got {len(functions)}"
|
||||
names = {f['name'] for f in functions}
|
||||
assert names == {'foo', 'bar', 'method'}, f"Names mismatch: {names}"
|
||||
print(" [PASS] function extraction works")
|
||||
|
||||
|
||||
def test_exact_duplicate_detection():
|
||||
"""Test that identical functions are flagged as duplicates."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create two files with the same function
|
||||
f1 = Path(tmpdir) / 'a.py'
|
||||
f1.write_text('''
|
||||
def duplicated():
|
||||
x = 1
|
||||
y = 2
|
||||
return x + y
|
||||
''')
|
||||
f2 = Path(tmpdir) / 'b.py'
|
||||
f2.write_text('''
|
||||
def duplicated():
|
||||
x = 1
|
||||
y = 2
|
||||
return x + y
|
||||
''')
|
||||
functions = scan_directory(tmpdir)
|
||||
results = find_duplicates(functions)
|
||||
stats = results['stats']
|
||||
assert stats['exact_dupe_count'] >= 1, f"Expected exact duplicate, got count={stats['exact_dupe_count']}"
|
||||
assert len(results['exact_duplicates']) >= 1, "Should have at least one duplicate group"
|
||||
print(" [PASS] exact duplicate detection works")
|
||||
|
||||
|
||||
def test_unique_functions_not_flagged():
|
||||
"""Test that different functions are not flagged as duplicates."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
f1 = Path(tmpdir) / 'a.py'
|
||||
f1.write_text('def func_a(): return 1')
|
||||
f2 = Path(tmpdir) / 'b.py'
|
||||
f2.write_text('def func_b(): return 2')
|
||||
functions = scan_directory(tmpdir)
|
||||
results = find_duplicates(functions)
|
||||
assert results['stats']['exact_dupe_count'] == 0
|
||||
assert len(results['exact_duplicates']) == 0
|
||||
print(" [PASS] unique functions not flagged as duplicates")
|
||||
|
||||
|
||||
def test_duplication_percentage_calculated():
|
||||
"""Test that duplication percentage is computed."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create file with mostly duplicated content
|
||||
f1 = Path(tmpdir) / 'a.py'
|
||||
f1.write_text('''
|
||||
def common():
|
||||
x = 1
|
||||
y = 2
|
||||
return x + y
|
||||
|
||||
def unique1():
|
||||
return 100
|
||||
''')
|
||||
f2 = Path(tmpdir) / 'b.py'
|
||||
f2.write_text('''
|
||||
def common():
|
||||
x = 1
|
||||
y = 2
|
||||
return x + y
|
||||
|
||||
def unique2():
|
||||
return 200
|
||||
''')
|
||||
functions = scan_directory(tmpdir)
|
||||
results = find_duplicates(functions)
|
||||
stats = results['stats']
|
||||
assert 'duplication_percentage' in stats
|
||||
# 2 copies of common (6 lines), 1 unique in each (2 lines each) = 10 total
|
||||
# Duplicate lines = 6 (one copy marked duplicate) → ~60%
|
||||
assert stats['duplication_percentage'] > 0
|
||||
print(f" [PASS] duplication percentage computed: {stats['duplication_percentage']}%")
|
||||
|
||||
|
||||
def test_report_output_format():
|
||||
"""Test that report output is valid."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
f1 = Path(tmpdir) / 'a.py'
|
||||
f1.write_text('def dup(): return 1')
|
||||
f2 = Path(tmpdir) / 'b.py'
|
||||
f2.write_text('def dup(): return 1')
|
||||
functions = scan_directory(tmpdir)
|
||||
results = find_duplicates(functions)
|
||||
|
||||
# Text report
|
||||
text = generate_report(results, output_format='text')
|
||||
assert 'CODE DUPLICATION REPORT' in text
|
||||
assert 'Total functions' in text
|
||||
print(" [PASS] text report format valid")
|
||||
|
||||
# JSON report
|
||||
json_out = generate_report(results, output_format='json')
|
||||
data = json.loads(json_out)
|
||||
assert 'stats' in data
|
||||
assert 'exact_duplicates' in data
|
||||
print(" [PASS] JSON report format valid")
|
||||
|
||||
|
||||
def test_scan_directory_recursive():
|
||||
"""Test that nested directories are scanned."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
subdir = Path(tmpdir) / 'sub'
|
||||
subdir.mkdir()
|
||||
(subdir / 'nested.py').write_text('def nested(): pass')
|
||||
(Path(tmpdir) / 'root.py').write_text('def root(): pass')
|
||||
functions = scan_directory(tmpdir)
|
||||
names = {f['name'] for f in functions}
|
||||
assert 'nested' in names and 'root' in names
|
||||
print(" [PASS] recursive directory scanning works")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
print("Running code duplication detector smoke tests...")
|
||||
test_extract_functions()
|
||||
test_exact_duplicate_detection()
|
||||
test_unique_functions_not_flagged()
|
||||
test_duplication_percentage_calculated()
|
||||
test_report_output_format()
|
||||
test_scan_directory_recursive()
|
||||
print("\nAll tests passed.")
|
||||
170
scripts/test_pr_complexity_scorer.py
Normal file
170
scripts/test_pr_complexity_scorer.py
Normal file
@@ -0,0 +1,170 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for PR Complexity Scorer — unit tests for the scoring logic.
|
||||
"""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
|
||||
from pr_complexity_scorer import (
|
||||
score_pr,
|
||||
is_dependency_file,
|
||||
is_test_file,
|
||||
TIME_PER_POINT,
|
||||
SMALL_FILES,
|
||||
MEDIUM_FILES,
|
||||
LARGE_FILES,
|
||||
SMALL_LINES,
|
||||
MEDIUM_LINES,
|
||||
LARGE_LINES,
|
||||
)
|
||||
|
||||
PASS = 0
|
||||
FAIL = 0
|
||||
|
||||
def test(name):
|
||||
def decorator(fn):
|
||||
global PASS, FAIL
|
||||
try:
|
||||
fn()
|
||||
PASS += 1
|
||||
print(f" [PASS] {name}")
|
||||
except AssertionError as e:
|
||||
FAIL += 1
|
||||
print(f" [FAIL] {name}: {e}")
|
||||
except Exception as e:
|
||||
FAIL += 1
|
||||
print(f" [FAIL] {name}: Unexpected error: {e}")
|
||||
return decorator
|
||||
|
||||
def assert_eq(a, b, msg=""):
|
||||
if a != b:
|
||||
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
|
||||
|
||||
def assert_true(v, msg=""):
|
||||
if not v:
|
||||
raise AssertionError(msg or "Expected True")
|
||||
|
||||
def assert_false(v, msg=""):
|
||||
if v:
|
||||
raise AssertionError(msg or "Expected False")
|
||||
|
||||
|
||||
print("=== PR Complexity Scorer Tests ===\n")
|
||||
|
||||
print("-- File Classification --")
|
||||
|
||||
@test("dependency file detection — requirements.txt")
|
||||
def _():
|
||||
assert_true(is_dependency_file("requirements.txt"))
|
||||
assert_true(is_dependency_file("src/requirements.txt"))
|
||||
assert_false(is_dependency_file("requirements_test.txt"))
|
||||
|
||||
@test("dependency file detection — pyproject.toml")
|
||||
def _():
|
||||
assert_true(is_dependency_file("pyproject.toml"))
|
||||
assert_false(is_dependency_file("myproject.py"))
|
||||
|
||||
@test("test file detection — pytest style")
|
||||
def _():
|
||||
assert_true(is_test_file("tests/test_api.py"))
|
||||
assert_true(is_test_file("test_module.py"))
|
||||
assert_true(is_test_file("src/module_test.py"))
|
||||
|
||||
@test("test file detection — other frameworks")
|
||||
def _():
|
||||
assert_true(is_test_file("spec/feature_spec.rb"))
|
||||
assert_true(is_test_file("__tests__/component.test.js"))
|
||||
assert_false(is_test_file("testfixtures/helper.py"))
|
||||
|
||||
|
||||
print("\n-- Scoring Logic --")
|
||||
|
||||
@test("small PR gets low score (1-3)")
|
||||
def _():
|
||||
score, minutes, _ = score_pr(
|
||||
files_changed=3,
|
||||
additions=50,
|
||||
deletions=10,
|
||||
has_dependency_changes=False,
|
||||
test_coverage_delta=None
|
||||
)
|
||||
assert_true(1 <= score <= 3, f"Score should be low, got {score}")
|
||||
assert_true(minutes < 20)
|
||||
|
||||
@test("medium PR gets medium score (4-6)")
|
||||
def _():
|
||||
score, minutes, _ = score_pr(
|
||||
files_changed=15,
|
||||
additions=400,
|
||||
deletions=100,
|
||||
has_dependency_changes=False,
|
||||
test_coverage_delta=None
|
||||
)
|
||||
assert_true(4 <= score <= 6, f"Score should be medium, got {score}")
|
||||
assert_true(20 <= minutes <= 45)
|
||||
|
||||
@test("large PR gets high score (7-9)")
|
||||
def _():
|
||||
score, minutes, _ = score_pr(
|
||||
files_changed=60,
|
||||
additions=3000,
|
||||
deletions=1500,
|
||||
has_dependency_changes=True,
|
||||
test_coverage_delta=None
|
||||
)
|
||||
assert_true(7 <= score <= 9, f"Score should be high, got {score}")
|
||||
assert_true(minutes >= 45)
|
||||
|
||||
@test("dependency changes boost score")
|
||||
def _():
|
||||
base_score, _, _ = score_pr(
|
||||
files_changed=10, additions=200, deletions=50,
|
||||
has_dependency_changes=False, test_coverage_delta=None
|
||||
)
|
||||
dep_score, _, _ = score_pr(
|
||||
files_changed=10, additions=200, deletions=50,
|
||||
has_dependency_changes=True, test_coverage_delta=None
|
||||
)
|
||||
assert_true(dep_score > base_score, f"Deps: {base_score} -> {dep_score}")
|
||||
|
||||
@test("adding tests lowers complexity")
|
||||
def _():
|
||||
base_score, _, _ = score_pr(
|
||||
files_changed=8, additions=150, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=None
|
||||
)
|
||||
better_score, _, _ = score_pr(
|
||||
files_changed=8, additions=180, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=3
|
||||
)
|
||||
assert_true(better_score < base_score, f"Tests: {base_score} -> {better_score}")
|
||||
|
||||
@test("removing tests increases complexity")
|
||||
def _():
|
||||
base_score, _, _ = score_pr(
|
||||
files_changed=8, additions=150, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=None
|
||||
)
|
||||
worse_score, _, _ = score_pr(
|
||||
files_changed=8, additions=150, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=-2
|
||||
)
|
||||
assert_true(worse_score > base_score, f"Remove tests: {base_score} -> {worse_score}")
|
||||
|
||||
@test("score bounded 1-10")
|
||||
def _():
|
||||
for files, adds, dels in [(1, 10, 5), (100, 10000, 5000)]:
|
||||
score, _, _ = score_pr(files, adds, dels, False, None)
|
||||
assert_true(1 <= score <= 10, f"Score {score} out of range")
|
||||
|
||||
@test("estimated minutes exist for all scores")
|
||||
def _():
|
||||
for s in range(1, 11):
|
||||
assert_true(s in TIME_PER_POINT, f"Missing time for score {s}")
|
||||
|
||||
|
||||
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
|
||||
sys.exit(0 if FAIL == 0 else 1)
|
||||
@@ -1,21 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Smoke test for security_patch_applier — verifies module imports and argument parsing."""
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
def test_imports():
|
||||
import security_patch_applier
|
||||
assert hasattr(security_patch_applier, 'main')
|
||||
|
||||
def test_help():
|
||||
result = subprocess.run(
|
||||
[sys.executable, 'scripts/security_patch_applier.py', '--help'],
|
||||
capture_output=True, text=True
|
||||
)
|
||||
assert result.returncode == 0
|
||||
assert 'Security Patch Applier' in result.stdout or '--dry-run' in result.stdout
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_imports()
|
||||
test_help()
|
||||
print("OK")
|
||||
Reference in New Issue
Block a user