Compare commits

..

2 Commits

6 changed files with 1055 additions and 245 deletions

View File

@@ -0,0 +1,366 @@
#!/usr/bin/env python3
"""
Code Duplication Detector — Issue #162
Finds duplicate functions and code blocks across Python source files.
Reports duplication percentage and outputs a duplication report.
Usage:
python3 scripts/code_duplication_detector.py --output reports/code_duplication.json
python3 scripts/code_duplication_detector.py --directory scripts/ --dry-run
python3 scripts/code_duplication_detector.py --test # Run built-in test
"""
import argparse
import hashlib
import json
import os
import re
import sys
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Dict, Tuple, Optional
# ── AST helpers ────────────────────────────────────────────────────────────
def normalize_code(text: str) -> str:
"""Normalize code for comparison: strip comments, normalize whitespace."""
# Remove comments (both # and docstring triple-quote strings)
text = re.sub(r'#.*$', '', text, flags=re.MULTILINE)
text = re.sub(r'""".*?"""', '', text, flags=re.DOTALL)
text = re.sub(r"'''.*?'''", '', text, flags=re.DOTALL)
# Normalize whitespace
text = re.sub(r'\s+', ' ', text).strip()
return text.lower()
def code_hash(text: str) -> str:
"""SHA256 hash of normalized code for exact duplicate detection."""
normalized = normalize_code(text)
return hashlib.sha256(normalized.encode('utf-8')).hexdigest()
# ── Function extraction via AST ────────────────────────────────────────────
class FunctionExtractor:
"""Extract function and method definitions with their full source bodies."""
def __init__(self, source: str, filepath: str):
self.source = source
self.filepath = filepath
self.lines = source.splitlines()
self.functions: List[Dict] = []
def _get_source_segment(self, start_lineno: int, end_lineno: int) -> str:
"""Get source code from start to end line (1-indexed, inclusive)."""
# AST end_lineno is inclusive
start_idx = start_lineno - 1
end_idx = end_lineno
return '\n'.join(self.lines[start_idx:end_idx])
def visit(self, tree):
"""Collect all function and async function definitions."""
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef) or isinstance(node, ast.AsyncFunctionDef):
# Get the full source for this function including decorators
start = node.lineno
end = node.end_lineno
body_source = self._get_source_segment(start, end)
# Also collect parent class name if this is a method
class_name = None
parent = node.parent if hasattr(node, 'parent') else None
if parent and isinstance(parent, ast.ClassDef):
class_name = parent.name
self.functions.append({
'name': node.name,
'file': self.filepath,
'start_line': start,
'end_line': end,
'body': body_source,
'class_name': class_name,
'is_method': class_name is not None,
})
import ast
class ParentNodeVisitor(ast.NodeVisitor):
"""Annotate nodes with parent references."""
def __init__(self, parent=None):
self.parent = parent
def generic_visit(self, node):
node.parent = self.parent
for child in ast.iter_child_nodes(node):
self.__class__(child).parent = node
super().generic_visit(node)
def extract_functions_from_file(filepath: str) -> List[Dict]:
"""Extract all function definitions from a Python file."""
try:
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
source = f.read()
tree = ast.parse(source, filename=str(filepath))
# Annotate with parent references
for node in ast.walk(tree):
for child in ast.iter_child_nodes(node):
child.parent = node
extractor = FunctionExtractor(source, str(filepath))
extractor.visit(tree)
return extractor.functions
except (SyntaxError, UnicodeDecodeError, OSError) as e:
return []
def scan_directory(directory: str, extensions: Tuple[str, ...] = ('.py',)) -> List[Dict]:
"""Scan directory for Python files and extract all functions."""
all_functions = []
path = Path(directory)
for filepath in path.rglob('*'):
if filepath.is_file() and filepath.suffix in extensions:
# Skip common non-source dirs
parts = filepath.parts
if any(ex in parts for ex in ('__pycache__', 'node_modules', '.git', 'venv', '.venv', 'dist', 'build')):
continue
if filepath.name.startswith('.'):
continue
functions = extract_functions_from_file(str(filepath))
all_functions.extend(functions)
return all_functions
# ── Duplicate detection ─────────────────────────────────────────────────────
def find_duplicates(functions: List[Dict], similarity_threshold: float = 0.95) -> Dict:
"""
Find duplicate and near-duplicate functions.
Returns dict with:
- exact_duplicates: {hash: [function_info, ...]}
- near_duplicates: [[function_info, ...], ...]
- stats: total_functions, unique_exact, exact_dupe_count, near_dupe_count
"""
# Phase 1: Exact duplicates by code hash
hash_groups: Dict[str, List[Dict]] = defaultdict(list)
for func in functions:
h = code_hash(func['body'])
hash_groups[h].append(func)
exact_duplicates = {h: group for h, group in hash_groups.items() if len(group) > 1}
exact_dupe_count = sum(len(group) - 1 for group in exact_duplicates.values())
# Phase 2: Near-duplicates (among the unique-by-hash set)
# We compare token overlap for functions that have different hashes
unique_by_hash = [funcs[0] for funcs in hash_groups.values()]
near_duplicate_groups = []
# Simple token-based similarity
def tokenize(code: str) -> set:
return set(re.findall(r'[a-zA-Z_][a-zA-Z0-9_]*', code.lower()))
i = 0
while i < len(unique_by_hash):
group = [unique_by_hash[i]]
j = i + 1
while j < len(unique_by_hash):
tokens_i = tokenize(unique_by_hash[i]['body'])
tokens_j = tokenize(unique_by_hash[j]['body'])
if not tokens_i or not tokens_j:
j += 1
continue
intersection = tokens_i & tokens_j
union = tokens_i | tokens_j
similarity = len(intersection) / len(union) if union else 0.0
if similarity >= similarity_threshold:
group.append(unique_by_hash[j])
unique_by_hash.pop(j)
else:
j += 1
if len(group) > 1:
near_duplicate_groups.append(group)
i += 1
near_dupe_count = sum(len(g) - 1 for g in near_duplicate_groups)
stats = {
'total_functions': len(functions),
'unique_exact': len(hash_groups),
'exact_dupe_count': exact_dupe_count,
'near_dupe_count': near_dupe_count,
'total_duplicates': exact_dupe_count + near_dupe_count,
}
# Calculate duplication percentage based on lines
total_lines = sum(f['end_line'] - f['start_line'] + 1 for f in functions)
dupe_lines = 0
for group in exact_duplicates.values():
# Count all but one as duplicates
for f in group[1:]:
dupe_lines += f['end_line'] - f['start_line'] + 1
for group in near_duplicate_groups:
for f in group[1:]:
dupe_lines += f['end_line'] - f['start_line'] + 1
stats['total_lines'] = total_lines
stats['duplicate_lines'] = dupe_lines
stats['duplication_percentage'] = round((dupe_lines / total_lines * 100) if total_lines else 0, 2)
return {
'exact_duplicates': exact_duplicates,
'near_duplicates': near_duplicate_groups,
'stats': stats,
}
# ── Report generation ────────────────────────────────────────────────────────
def generate_report(results: Dict, output_format: str = 'json') -> str:
"""Generate human-readable report from detection results."""
stats = results['stats']
if output_format == 'json':
return json.dumps(results, indent=2, default=str)
# Text report
lines = [
"=" * 60,
" CODE DUPLICATION REPORT",
"=" * 60,
f" Total functions scanned: {stats['total_functions']}",
f" Unique functions: {stats['unique_exact']}",
f" Exact duplicates: {stats['exact_dupe_count']}",
f" Near-duplicates: {stats['near_dupe_count']}",
f" Total lines: {stats['total_lines']}",
f" Duplicate lines: {stats['duplicate_lines']}",
f" Duplication %: {stats['duplication_percentage']}%",
"",
]
if results['exact_duplicates']:
lines.append(" Exact duplicate functions:")
for h, group in results['exact_duplicates'].items():
first = group[0]
lines.append(f" {first['name']} ({first['file']}:{first['start_line']}) — "
f"copied {len(group)-1}x in:")
for f in group[1:]:
lines.append(f"{f['file']}:{f['start_line']}")
lines.append("")
if results['near_duplicates']:
lines.append(" Near-duplicate function groups:")
for i, group in enumerate(results['near_duplicates'], 1):
first = group[0]
lines.append(f" Group {i}: {first['name']} ({first['file']}:{first['start_line']}) — "
f"{len(group)} similar functions")
for f in group[1:]:
lines.append(f"{f['file']}:{f['start_line']}")
lines.append("")
lines.append("=" * 60)
return '\n'.join(lines)
# ── CLI ─────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Code Duplication Detector")
parser.add_argument('--directory', default='.',
help='Directory to scan (default: current directory)')
parser.add_argument('--output', help='Output file for JSON report')
parser.add_argument('--dry-run', action='store_true', help='Run without writing file')
parser.add_argument('--threshold', type=float, default=0.95,
help='Similarity threshold for near-dupes (default: 0.95)')
parser.add_argument('--json', action='store_true', help='JSON output to stdout')
parser.add_argument('--test', action='store_true', help='Run built-in test')
args = parser.parse_args()
if args.test:
_run_test()
return
# Scan
functions = scan_directory(args.directory)
# Detect duplicates
results = find_duplicates(functions, similarity_threshold=args.threshold)
stats = results['stats']
# Output
if args.json:
print(json.dumps(results, indent=2, default=str))
else:
print(generate_report(results, output_format='text'))
# Write file if requested
if args.output and not args.dry_run:
os.makedirs(os.path.dirname(args.output) or '.', exist_ok=True)
with open(args.output, 'w') as f:
json.dump(results, f, indent=2, default=str)
print(f"\nReport written to: {args.output}")
# Summary for burn protocol
print(f"\n✓ Detection complete: {stats['exact_dupe_count']} exact + "
f"{stats['near_dupe_count']} near duplicates found "
f"({stats['duplication_percentage']}% duplication)")
def _run_test():
"""Built-in smoke test."""
import tempfile
import os
with tempfile.TemporaryDirectory() as tmpdir:
# Create test files with duplicate code
f1 = Path(tmpdir) / 'mod1.py'
f1.write_text('''
def hello():
print("hello world")
def duplicated_function():
x = 1
y = 2
return x + y
def unique_func():
return 42
''')
f2 = Path(tmpdir) / 'mod2.py'
f2.write_text('''
def duplicated_function():
x = 1
y = 2
return x + y
def another_unique():
return "different"
''')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
stats = results['stats']
assert stats['exact_dupe_count'] >= 1, "Should find at least 1 exact duplicate"
assert stats['total_functions'] >= 4, "Should find at least 4 functions"
# Check duplication percentage is calculated
assert 'duplication_percentage' in stats
print(f"\n✓ Test passed: {stats['total_functions']} functions, "
f"{stats['exact_dupe_count']} exact duplicates, "
f"{stats['duplication_percentage']}% duplication")
if __name__ == '__main__':
main()

View File

@@ -180,89 +180,6 @@ def to_mermaid(graph: dict) -> str:
return "\n".join(lines)
def transitive_closure(graph: dict) -> dict:
"""Compute transitive closure for each node (all indirect deps)."""
closure = {}
# Build adjacency list
adj = {node: set(data.get("dependencies", [])) for node, data in graph.items()}
all_nodes = set(adj.keys()) | set().union(*adj.values())
for node in all_nodes:
visited = set()
stack = list(adj.get(node, set()))
while stack:
current = stack.pop()
if current not in visited:
visited.add(current)
stack.extend(adj.get(current, set()))
# Remove self-reference: a node's transitive deps should not include itself
visited.discard(node)
closure[node] = visited
return closure
def find_deep_chains(graph: dict) -> list[list[str]]:
"""Find the longest simple paths in the dependency graph (ignoring cycles)."""
from collections import defaultdict
adj = {node: list(data.get("dependencies", [])) for node, data in graph.items()}
deepest = []
max_len = 0
def dfs(node: str, path: list, visited: set):
nonlocal deepest, max_len
# Stop if we hit a cycle (node already in path)
if node in path:
return
new_path = path + [node]
if node not in adj or not adj[node]:
# leaf
if len(new_path) > max_len:
max_len = len(new_path)
deepest = [new_path.copy()]
elif len(new_path) == max_len:
deepest.append(new_path.copy())
else:
for neighbor in adj[node]:
dfs(neighbor, new_path.copy(), visited | {node})
for start in graph:
dfs(start, [], set())
return deepest
def format_transitive_markdown(closure: dict) -> str:
"""Render transitive closure as a markdown table."""
lines = ["# Transitive Dependencies\n\n"]
lines.append("| Node | Transitive Dependencies | Count |\n")
lines.append("|------|------------------------|-------|\n")
for node in sorted(closure.keys()):
deps = closure[node]
deps_str = ", ".join(sorted(deps)) if deps else "(none)"
lines.append(f"| {node} | {deps_str} | {len(deps)} |\n")
return "".join(lines)
def format_deep_chains_markdown(chains: list[list[str]]) -> str:
"""Render longest dependency chains as a markdown list."""
lines = ["# Deepest Dependency Chains\n\n"]
if not chains:
lines.append("No chains found.\n")
return "".join(lines)
max_len = max(len(c) for c in chains)
lines.append(f"*Longest chain length:* {max_len}\n\n")
for i, chain in enumerate(sorted(chains, key=lambda c: (-len(c), " -> ".join(c))), 1):
lines.append(f"**Chain {i}** ({len(chain)} nodes)\n\n")
indent = " "
for j, node in enumerate(chain):
arrow = "" if j < len(chain)-1 else ""
lines.append(f"{indent}{arrow}{node}\n")
lines.append("\n")
return "".join(lines)
def main():
parser = argparse.ArgumentParser(description="Build cross-repo dependency graph")
parser.add_argument("repos_dir", nargs="?", help="Directory containing repos")
@@ -311,20 +228,13 @@ def main():
elif args.format == "mermaid":
output = to_mermaid(results)
else:
# Compute transitive and deep chains
closure = transitive_closure(results)
deep_chains = find_deep_chains(results)
output = json.dumps({
"repos": results,
"cycles": cycles,
"transitive": {node: sorted(deps) for node, deps in closure.items()},
"deep_chains": [chain for chain in deep_chains if len(chain) > 1],
"summary": {
"total_repos": len(results),
"total_deps": sum(len(r["dependencies"]) for r in results.values()),
"cycles_found": len(cycles),
"transitive_pairs": sum(len(deps) for deps in closure.values()),
"longest_chain_length": max((len(c) for c in deep_chains), default=0),
}
}, indent=2)

View File

@@ -0,0 +1,351 @@
#!/usr/bin/env python3
"""
PR Complexity Scorer - Estimate review effort for PRs.
"""
import argparse
import json
import os
import re
import sys
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import urllib.request
import urllib.error
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
DEPENDENCY_FILES = {
"requirements.txt", "pyproject.toml", "setup.py", "setup.cfg",
"Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile",
"go.mod", "Cargo.toml", "pom.xml", "build.gradle"
}
TEST_PATTERNS = [
r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$",
r"spec/.*\.rb$", r".*_spec\.rb$",
r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$"
]
WEIGHT_FILES = 0.25
WEIGHT_LINES = 0.25
WEIGHT_DEPS = 0.30
WEIGHT_TEST_COV = 0.20
SMALL_FILES = 5
MEDIUM_FILES = 20
LARGE_FILES = 50
SMALL_LINES = 100
MEDIUM_LINES = 500
LARGE_LINES = 2000
TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120}
@dataclass
class PRComplexity:
pr_number: int
title: str
files_changed: int
additions: int
deletions: int
has_dependency_changes: bool
test_coverage_delta: Optional[int]
score: int
estimated_minutes: int
reasons: List[str]
def to_dict(self) -> dict:
return asdict(self)
class GiteaClient:
def __init__(self, token: str):
self.token = token
self.base_url = GITEA_BASE.rstrip("/")
def _request(self, path: str, params: Dict = None) -> Any:
url = f"{self.base_url}{path}"
if params:
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
url += f"?{qs}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
return None
except urllib.error.URLError as e:
print(f"Network error: {e}", file=sys.stderr)
return None
def get_open_prs(self, org: str, repo: str) -> List[Dict]:
prs = []
page = 1
while True:
batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"})
if not batch:
break
prs.extend(batch)
if len(batch) < 50:
break
page += 1
return prs
def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]:
files = []
page = 1
while True:
batch = self._request(
f"/repos/{org}/{repo}/pulls/{pr_number}/files",
{"limit": 100, "page": page}
)
if not batch:
break
files.extend(batch)
if len(batch) < 100:
break
page += 1
return files
def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool:
data = json.dumps({"body": body}).encode("utf-8")
req = urllib.request.Request(
f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments",
data=data,
method="POST",
headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"}
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status in (200, 201)
except urllib.error.HTTPError:
return False
def is_dependency_file(filename: str) -> bool:
return any(filename.endswith(dep) for dep in DEPENDENCY_FILES)
def is_test_file(filename: str) -> bool:
return any(re.search(pattern, filename) for pattern in TEST_PATTERNS)
def score_pr(
files_changed: int,
additions: int,
deletions: int,
has_dependency_changes: bool,
test_coverage_delta: Optional[int] = None
) -> tuple[int, int, List[str]]:
score = 1.0
reasons = []
# Files changed
if files_changed <= SMALL_FILES:
fscore = 1.0
reasons.append("small number of files changed")
elif files_changed <= MEDIUM_FILES:
fscore = 2.0
reasons.append("moderate number of files changed")
elif files_changed <= LARGE_FILES:
fscore = 2.5
reasons.append("large number of files changed")
else:
fscore = 3.0
reasons.append("very large PR spanning many files")
# Lines changed
total_lines = additions + deletions
if total_lines <= SMALL_LINES:
lscore = 1.0
reasons.append("small change size")
elif total_lines <= MEDIUM_LINES:
lscore = 2.0
reasons.append("moderate change size")
elif total_lines <= LARGE_LINES:
lscore = 3.0
reasons.append("large change size")
else:
lscore = 4.0
reasons.append("very large change")
# Dependency changes
if has_dependency_changes:
dscore = 2.5
reasons.append("dependency changes (architectural impact)")
else:
dscore = 0.0
# Test coverage delta
tscore = 0.0
if test_coverage_delta is not None:
if test_coverage_delta > 0:
reasons.append(f"test additions (+{test_coverage_delta} test files)")
tscore = -min(2.0, test_coverage_delta / 2.0)
elif test_coverage_delta < 0:
reasons.append(f"test removals ({abs(test_coverage_delta)} test files)")
tscore = min(2.0, abs(test_coverage_delta) * 0.5)
else:
reasons.append("test coverage change not assessed")
# Weighted sum, scaled by 3 to use full 1-10 range
bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV)
scaled_bonus = bonus * 3.0
score = 1.0 + scaled_bonus
final_score = max(1, min(10, int(round(score))))
est_minutes = TIME_PER_POINT.get(final_score, 30)
return final_score, est_minutes, reasons
def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity:
pr_num = pr_data["number"]
title = pr_data.get("title", "")
files = client.get_pr_files(org, repo, pr_num)
additions = sum(f.get("additions", 0) for f in files)
deletions = sum(f.get("deletions", 0) for f in files)
filenames = [f.get("filename", "") for f in files]
has_deps = any(is_dependency_file(f) for f in filenames)
test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", "")))
test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", "")))
test_delta = test_added - test_removed if (test_added or test_removed) else None
score, est_min, reasons = score_pr(
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta
)
return PRComplexity(
pr_number=pr_num,
title=title,
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta,
score=score,
estimated_minutes=est_min,
reasons=reasons
)
def build_comment(complexity: PRComplexity) -> str:
change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines"
deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else ""
test_note = ""
if complexity.test_coverage_delta is not None:
if complexity.test_coverage_delta > 0:
test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added"
elif complexity.test_coverage_delta < 0:
test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed"
comment = f"## 📊 PR Complexity Analysis\n\n"
comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n"
comment += f"| Metric | Value |\n|--------|-------|\n"
comment += f"| Changes | {change_desc} |\n"
comment += f"| Complexity Score | **{complexity.score}/10** |\n"
comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n"
comment += f"### Scoring rationale:"
for r in complexity.reasons:
comment += f"\n- {r}"
if deps_note:
comment += deps_note
if test_note:
comment += test_note
comment += f"\n\n---\n"
comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*"
return comment
def main():
parser = argparse.ArgumentParser(description="PR Complexity Scorer")
parser.add_argument("--org", default="Timmy_Foundation")
parser.add_argument("--repo", default="compounding-intelligence")
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--apply", action="store_true")
parser.add_argument("--output", default="metrics/pr_complexity.json")
args = parser.parse_args()
token_path = args.token
if os.path.exists(token_path):
with open(token_path) as f:
token = f.read().strip()
else:
token = args.token
if not token:
print("ERROR: No Gitea token provided", file=sys.stderr)
sys.exit(1)
client = GiteaClient(token)
print(f"Fetching open PRs for {args.org}/{args.repo}...")
prs = client.get_open_prs(args.org, args.repo)
if not prs:
print("No open PRs found.")
sys.exit(0)
print(f"Found {len(prs)} open PR(s). Analyzing...")
results = []
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
for pr in prs:
pr_num = pr["number"]
title = pr.get("title", "")
print(f" Analyzing PR #{pr_num}: {title[:60]}")
try:
complexity = analyze_pr(client, args.org, args.repo, pr)
results.append(complexity.to_dict())
comment = build_comment(complexity)
if args.dry_run:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]")
elif args.apply:
success = client.post_comment(args.org, args.repo, pr_num, comment)
status = "[commented]" if success else "[FAILED]"
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}")
else:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]")
except Exception as e:
print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr)
with open(args.output, "w") as f:
json.dump({
"org": args.org,
"repo": args.repo,
"timestamp": datetime.now(timezone.utc).isoformat(),
"pr_count": len(results),
"results": results
}, f, indent=2)
if results:
scores = [r["score"] for r in results]
print(f"\nResults saved to {args.output}")
print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}")
else:
print("\nNo results to save.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,168 @@
#!/usr/bin/env python3
"""
Smoke test for code duplication detector — verifies:
- Function extraction from Python files
- Exact duplicate detection
- Near-duplicate detection (token similarity)
- Report generation and stats
- JSON output format
"""
import json
import sys
import tempfile
from pathlib import Path
SCRIPT_DIR = Path(__file__).parent.absolute()
sys.path.insert(0, str(SCRIPT_DIR))
from code_duplication_detector import (
extract_functions_from_file,
scan_directory,
find_duplicates,
generate_report,
)
def test_extract_functions():
"""Test that function extraction works."""
with tempfile.TemporaryDirectory() as tmpdir:
test_file = Path(tmpdir) / 'sample.py'
test_file.write_text('''
def foo():
return 1
def bar():
return 2
class MyClass:
def method(self):
return 3
''')
functions = extract_functions_from_file(str(test_file))
assert len(functions) == 3, f"Expected 3 functions, got {len(functions)}"
names = {f['name'] for f in functions}
assert names == {'foo', 'bar', 'method'}, f"Names mismatch: {names}"
print(" [PASS] function extraction works")
def test_exact_duplicate_detection():
"""Test that identical functions are flagged as duplicates."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create two files with the same function
f1 = Path(tmpdir) / 'a.py'
f1.write_text('''
def duplicated():
x = 1
y = 2
return x + y
''')
f2 = Path(tmpdir) / 'b.py'
f2.write_text('''
def duplicated():
x = 1
y = 2
return x + y
''')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
stats = results['stats']
assert stats['exact_dupe_count'] >= 1, f"Expected exact duplicate, got count={stats['exact_dupe_count']}"
assert len(results['exact_duplicates']) >= 1, "Should have at least one duplicate group"
print(" [PASS] exact duplicate detection works")
def test_unique_functions_not_flagged():
"""Test that different functions are not flagged as duplicates."""
with tempfile.TemporaryDirectory() as tmpdir:
f1 = Path(tmpdir) / 'a.py'
f1.write_text('def func_a(): return 1')
f2 = Path(tmpdir) / 'b.py'
f2.write_text('def func_b(): return 2')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
assert results['stats']['exact_dupe_count'] == 0
assert len(results['exact_duplicates']) == 0
print(" [PASS] unique functions not flagged as duplicates")
def test_duplication_percentage_calculated():
"""Test that duplication percentage is computed."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create file with mostly duplicated content
f1 = Path(tmpdir) / 'a.py'
f1.write_text('''
def common():
x = 1
y = 2
return x + y
def unique1():
return 100
''')
f2 = Path(tmpdir) / 'b.py'
f2.write_text('''
def common():
x = 1
y = 2
return x + y
def unique2():
return 200
''')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
stats = results['stats']
assert 'duplication_percentage' in stats
# 2 copies of common (6 lines), 1 unique in each (2 lines each) = 10 total
# Duplicate lines = 6 (one copy marked duplicate) → ~60%
assert stats['duplication_percentage'] > 0
print(f" [PASS] duplication percentage computed: {stats['duplication_percentage']}%")
def test_report_output_format():
"""Test that report output is valid."""
with tempfile.TemporaryDirectory() as tmpdir:
f1 = Path(tmpdir) / 'a.py'
f1.write_text('def dup(): return 1')
f2 = Path(tmpdir) / 'b.py'
f2.write_text('def dup(): return 1')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
# Text report
text = generate_report(results, output_format='text')
assert 'CODE DUPLICATION REPORT' in text
assert 'Total functions' in text
print(" [PASS] text report format valid")
# JSON report
json_out = generate_report(results, output_format='json')
data = json.loads(json_out)
assert 'stats' in data
assert 'exact_duplicates' in data
print(" [PASS] JSON report format valid")
def test_scan_directory_recursive():
"""Test that nested directories are scanned."""
with tempfile.TemporaryDirectory() as tmpdir:
subdir = Path(tmpdir) / 'sub'
subdir.mkdir()
(subdir / 'nested.py').write_text('def nested(): pass')
(Path(tmpdir) / 'root.py').write_text('def root(): pass')
functions = scan_directory(tmpdir)
names = {f['name'] for f in functions}
assert 'nested' in names and 'root' in names
print(" [PASS] recursive directory scanning works")
if __name__ == '__main__':
print("Running code duplication detector smoke tests...")
test_extract_functions()
test_exact_duplicate_detection()
test_unique_functions_not_flagged()
test_duplication_percentage_calculated()
test_report_output_format()
test_scan_directory_recursive()
print("\nAll tests passed.")

View File

@@ -1,155 +0,0 @@
#!/usr/bin/env python3
"""Tests for dependency_graph.py — transitive closure and deep chain detection."""
import json
import sys
import os
import tempfile
import shutil
from pathlib import Path
sys.path.insert(0, os.path.dirname(__file__) or ".")
import importlib.util
spec = importlib.util.spec_from_file_location(
"dg", os.path.join(os.path.dirname(__file__) or ".", "dependency_graph.py")
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
transitive_closure = mod.transitive_closure
find_deep_chains = mod.find_deep_chains
detect_cycles = mod.detect_cycles
def make_graph(edges: dict[str, list[str]]) -> dict:
"""Build graph dict in expected format: {repo: {"dependencies": [...]}}."""
return {
node: {"dependencies": sorted(deps), "files_scanned": 1}
for node, deps in edges.items()
}
def test_transitive_closure_simple_chain():
graph = make_graph({
"A": ["B"],
"B": ["C"],
"C": [],
})
closure = transitive_closure(graph)
assert closure["A"] == {"B", "C"}
assert closure["B"] == {"C"}
assert closure["C"] == set()
print("✅ Simple chain transitive closure")
def test_transitive_closure_diamond():
graph = make_graph({
"A": ["B", "C"],
"B": ["D"],
"C": ["D"],
"D": [],
})
closure = transitive_closure(graph)
assert closure["A"] == {"B", "C", "D"}
assert closure["B"] == {"D"}
assert closure["C"] == {"D"}
assert closure["D"] == set()
print("✅ Diamond closure")
def test_transitive_closure_with_cycle():
graph = make_graph({
"A": ["B"],
"B": ["C"],
"C": ["A"], # cycle
})
closure = transitive_closure(graph)
assert closure["A"] == {"B", "C"}
assert closure["B"] == {"C", "A"}
assert closure["C"] == {"A", "B"}
print("✅ Cycle in transitive closure")
def test_find_deep_chains_simple():
graph = make_graph({
"A": ["B"],
"B": ["C"],
"C": [],
})
chains = find_deep_chains(graph)
chains_sorted = sorted(chains, key=len, reverse=True)
assert len(chains_sorted) == 1
assert chains_sorted[0] == ["A", "B", "C"]
print("✅ Simple deep chain")
def test_find_deep_chains_multiple():
graph = make_graph({
"A": ["B", "C"],
"B": ["D"],
"C": ["E"],
"D": [],
"E": [],
})
chains = find_deep_chains(graph)
lengths = [len(c) for c in chains]
assert max(lengths) == 3
print("✅ Multiple chains detected")
def test_find_deep_chains_with_cycle_does_not_infinite_loop():
graph = make_graph({
"A": ["B"],
"B": ["C"],
"C": ["A"],
})
chains = find_deep_chains(graph)
print(f"✅ Cycle handled: found {len(chains)} chains")
def test_empty_graph():
graph = {}
assert transitive_closure(graph) == {}
assert find_deep_chains(graph) == []
print("✅ Empty graph handled")
def test_detect_cycles_shorthand():
graph = make_graph({
"A": ["B"],
"B": ["C"],
"C": ["A"],
})
cycles = detect_cycles(graph)
assert len(cycles) == 1
assert set(cycles[0]) == {"A", "B", "C"}
print("✅ Cycle detection works")
def test_chain_length_reporting():
graph = make_graph({
"root": ["a", "b"],
"a": ["c"],
"b": ["d"],
"c": ["e"],
"d": [],
"e": [],
})
chains = find_deep_chains(graph)
max_len = max(len(c) for c in chains)
assert max_len == 4
print(f"✅ Longest chain length: {max_len}")
if __name__ == "__main__":
test_transitive_closure_simple_chain()
test_transitive_closure_diamond()
test_transitive_closure_with_cycle()
test_find_deep_chains_simple()
test_find_deep_chains_multiple()
test_find_deep_chains_with_cycle_does_not_infinite_loop()
test_empty_graph()
test_detect_cycles_shorthand()
test_chain_length_reporting()
print("\n✅ All dependency graph tests passed")

View File

@@ -0,0 +1,170 @@
#!/usr/bin/env python3
"""
Tests for PR Complexity Scorer — unit tests for the scoring logic.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pr_complexity_scorer import (
score_pr,
is_dependency_file,
is_test_file,
TIME_PER_POINT,
SMALL_FILES,
MEDIUM_FILES,
LARGE_FILES,
SMALL_LINES,
MEDIUM_LINES,
LARGE_LINES,
)
PASS = 0
FAIL = 0
def test(name):
def decorator(fn):
global PASS, FAIL
try:
fn()
PASS += 1
print(f" [PASS] {name}")
except AssertionError as e:
FAIL += 1
print(f" [FAIL] {name}: {e}")
except Exception as e:
FAIL += 1
print(f" [FAIL] {name}: Unexpected error: {e}")
return decorator
def assert_eq(a, b, msg=""):
if a != b:
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
def assert_true(v, msg=""):
if not v:
raise AssertionError(msg or "Expected True")
def assert_false(v, msg=""):
if v:
raise AssertionError(msg or "Expected False")
print("=== PR Complexity Scorer Tests ===\n")
print("-- File Classification --")
@test("dependency file detection — requirements.txt")
def _():
assert_true(is_dependency_file("requirements.txt"))
assert_true(is_dependency_file("src/requirements.txt"))
assert_false(is_dependency_file("requirements_test.txt"))
@test("dependency file detection — pyproject.toml")
def _():
assert_true(is_dependency_file("pyproject.toml"))
assert_false(is_dependency_file("myproject.py"))
@test("test file detection — pytest style")
def _():
assert_true(is_test_file("tests/test_api.py"))
assert_true(is_test_file("test_module.py"))
assert_true(is_test_file("src/module_test.py"))
@test("test file detection — other frameworks")
def _():
assert_true(is_test_file("spec/feature_spec.rb"))
assert_true(is_test_file("__tests__/component.test.js"))
assert_false(is_test_file("testfixtures/helper.py"))
print("\n-- Scoring Logic --")
@test("small PR gets low score (1-3)")
def _():
score, minutes, _ = score_pr(
files_changed=3,
additions=50,
deletions=10,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(1 <= score <= 3, f"Score should be low, got {score}")
assert_true(minutes < 20)
@test("medium PR gets medium score (4-6)")
def _():
score, minutes, _ = score_pr(
files_changed=15,
additions=400,
deletions=100,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(4 <= score <= 6, f"Score should be medium, got {score}")
assert_true(20 <= minutes <= 45)
@test("large PR gets high score (7-9)")
def _():
score, minutes, _ = score_pr(
files_changed=60,
additions=3000,
deletions=1500,
has_dependency_changes=True,
test_coverage_delta=None
)
assert_true(7 <= score <= 9, f"Score should be high, got {score}")
assert_true(minutes >= 45)
@test("dependency changes boost score")
def _():
base_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=False, test_coverage_delta=None
)
dep_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=True, test_coverage_delta=None
)
assert_true(dep_score > base_score, f"Deps: {base_score} -> {dep_score}")
@test("adding tests lowers complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
better_score, _, _ = score_pr(
files_changed=8, additions=180, deletions=20,
has_dependency_changes=False, test_coverage_delta=3
)
assert_true(better_score < base_score, f"Tests: {base_score} -> {better_score}")
@test("removing tests increases complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
worse_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=-2
)
assert_true(worse_score > base_score, f"Remove tests: {base_score} -> {worse_score}")
@test("score bounded 1-10")
def _():
for files, adds, dels in [(1, 10, 5), (100, 10000, 5000)]:
score, _, _ = score_pr(files, adds, dels, False, None)
assert_true(1 <= score <= 10, f"Score {score} out of range")
@test("estimated minutes exist for all scores")
def _():
for s in range(1, 11):
assert_true(s in TIME_PER_POINT, f"Missing time for score {s}")
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
sys.exit(0 if FAIL == 0 else 1)