#!/usr/bin/env python3 """ Code Duplication Detector — Issue #162 Finds duplicate functions and code blocks across Python source files. Reports duplication percentage and outputs a duplication report. Usage: python3 scripts/code_duplication_detector.py --output reports/code_duplication.json python3 scripts/code_duplication_detector.py --directory scripts/ --dry-run python3 scripts/code_duplication_detector.py --test # Run built-in test """ import argparse import hashlib import json import os import re import sys from collections import defaultdict from datetime import datetime, timezone from pathlib import Path from typing import List, Dict, Tuple, Optional # ── AST helpers ──────────────────────────────────────────────────────────── def normalize_code(text: str) -> str: """Normalize code for comparison: strip comments, normalize whitespace.""" # Remove comments (both # and docstring triple-quote strings) text = re.sub(r'#.*$', '', text, flags=re.MULTILINE) text = re.sub(r'""".*?"""', '', text, flags=re.DOTALL) text = re.sub(r"'''.*?'''", '', text, flags=re.DOTALL) # Normalize whitespace text = re.sub(r'\s+', ' ', text).strip() return text.lower() def code_hash(text: str) -> str: """SHA256 hash of normalized code for exact duplicate detection.""" normalized = normalize_code(text) return hashlib.sha256(normalized.encode('utf-8')).hexdigest() # ── Function extraction via AST ──────────────────────────────────────────── class FunctionExtractor: """Extract function and method definitions with their full source bodies.""" def __init__(self, source: str, filepath: str): self.source = source self.filepath = filepath self.lines = source.splitlines() self.functions: List[Dict] = [] def _get_source_segment(self, start_lineno: int, end_lineno: int) -> str: """Get source code from start to end line (1-indexed, inclusive).""" # AST end_lineno is inclusive start_idx = start_lineno - 1 end_idx = end_lineno return '\n'.join(self.lines[start_idx:end_idx]) def visit(self, tree): """Collect all function and async function definitions.""" for node in ast.walk(tree): if isinstance(node, ast.FunctionDef) or isinstance(node, ast.AsyncFunctionDef): # Get the full source for this function including decorators start = node.lineno end = node.end_lineno body_source = self._get_source_segment(start, end) # Also collect parent class name if this is a method class_name = None parent = node.parent if hasattr(node, 'parent') else None if parent and isinstance(parent, ast.ClassDef): class_name = parent.name self.functions.append({ 'name': node.name, 'file': self.filepath, 'start_line': start, 'end_line': end, 'body': body_source, 'class_name': class_name, 'is_method': class_name is not None, }) import ast class ParentNodeVisitor(ast.NodeVisitor): """Annotate nodes with parent references.""" def __init__(self, parent=None): self.parent = parent def generic_visit(self, node): node.parent = self.parent for child in ast.iter_child_nodes(node): self.__class__(child).parent = node super().generic_visit(node) def extract_functions_from_file(filepath: str) -> List[Dict]: """Extract all function definitions from a Python file.""" try: with open(filepath, 'r', encoding='utf-8', errors='replace') as f: source = f.read() tree = ast.parse(source, filename=str(filepath)) # Annotate with parent references for node in ast.walk(tree): for child in ast.iter_child_nodes(node): child.parent = node extractor = FunctionExtractor(source, str(filepath)) extractor.visit(tree) return extractor.functions except (SyntaxError, UnicodeDecodeError, OSError) as e: return [] def scan_directory(directory: str, extensions: Tuple[str, ...] = ('.py',)) -> List[Dict]: """Scan directory for Python files and extract all functions.""" all_functions = [] path = Path(directory) for filepath in path.rglob('*'): if filepath.is_file() and filepath.suffix in extensions: # Skip common non-source dirs parts = filepath.parts if any(ex in parts for ex in ('__pycache__', 'node_modules', '.git', 'venv', '.venv', 'dist', 'build')): continue if filepath.name.startswith('.'): continue functions = extract_functions_from_file(str(filepath)) all_functions.extend(functions) return all_functions # ── Duplicate detection ───────────────────────────────────────────────────── def find_duplicates(functions: List[Dict], similarity_threshold: float = 0.95) -> Dict: """ Find duplicate and near-duplicate functions. Returns dict with: - exact_duplicates: {hash: [function_info, ...]} - near_duplicates: [[function_info, ...], ...] - stats: total_functions, unique_exact, exact_dupe_count, near_dupe_count """ # Phase 1: Exact duplicates by code hash hash_groups: Dict[str, List[Dict]] = defaultdict(list) for func in functions: h = code_hash(func['body']) hash_groups[h].append(func) exact_duplicates = {h: group for h, group in hash_groups.items() if len(group) > 1} exact_dupe_count = sum(len(group) - 1 for group in exact_duplicates.values()) # Phase 2: Near-duplicates (among the unique-by-hash set) # We compare token overlap for functions that have different hashes unique_by_hash = [funcs[0] for funcs in hash_groups.values()] near_duplicate_groups = [] # Simple token-based similarity def tokenize(code: str) -> set: return set(re.findall(r'[a-zA-Z_][a-zA-Z0-9_]*', code.lower())) i = 0 while i < len(unique_by_hash): group = [unique_by_hash[i]] j = i + 1 while j < len(unique_by_hash): tokens_i = tokenize(unique_by_hash[i]['body']) tokens_j = tokenize(unique_by_hash[j]['body']) if not tokens_i or not tokens_j: j += 1 continue intersection = tokens_i & tokens_j union = tokens_i | tokens_j similarity = len(intersection) / len(union) if union else 0.0 if similarity >= similarity_threshold: group.append(unique_by_hash[j]) unique_by_hash.pop(j) else: j += 1 if len(group) > 1: near_duplicate_groups.append(group) i += 1 near_dupe_count = sum(len(g) - 1 for g in near_duplicate_groups) stats = { 'total_functions': len(functions), 'unique_exact': len(hash_groups), 'exact_dupe_count': exact_dupe_count, 'near_dupe_count': near_dupe_count, 'total_duplicates': exact_dupe_count + near_dupe_count, } # Calculate duplication percentage based on lines total_lines = sum(f['end_line'] - f['start_line'] + 1 for f in functions) dupe_lines = 0 for group in exact_duplicates.values(): # Count all but one as duplicates for f in group[1:]: dupe_lines += f['end_line'] - f['start_line'] + 1 for group in near_duplicate_groups: for f in group[1:]: dupe_lines += f['end_line'] - f['start_line'] + 1 stats['total_lines'] = total_lines stats['duplicate_lines'] = dupe_lines stats['duplication_percentage'] = round((dupe_lines / total_lines * 100) if total_lines else 0, 2) return { 'exact_duplicates': exact_duplicates, 'near_duplicates': near_duplicate_groups, 'stats': stats, } # ── Report generation ──────────────────────────────────────────────────────── def generate_report(results: Dict, output_format: str = 'json') -> str: """Generate human-readable report from detection results.""" stats = results['stats'] if output_format == 'json': return json.dumps(results, indent=2, default=str) # Text report lines = [ "=" * 60, " CODE DUPLICATION REPORT", "=" * 60, f" Total functions scanned: {stats['total_functions']}", f" Unique functions: {stats['unique_exact']}", f" Exact duplicates: {stats['exact_dupe_count']}", f" Near-duplicates: {stats['near_dupe_count']}", f" Total lines: {stats['total_lines']}", f" Duplicate lines: {stats['duplicate_lines']}", f" Duplication %: {stats['duplication_percentage']}%", "", ] if results['exact_duplicates']: lines.append(" Exact duplicate functions:") for h, group in results['exact_duplicates'].items(): first = group[0] lines.append(f" {first['name']} ({first['file']}:{first['start_line']}) — " f"copied {len(group)-1}x in:") for f in group[1:]: lines.append(f" → {f['file']}:{f['start_line']}") lines.append("") if results['near_duplicates']: lines.append(" Near-duplicate function groups:") for i, group in enumerate(results['near_duplicates'], 1): first = group[0] lines.append(f" Group {i}: {first['name']} ({first['file']}:{first['start_line']}) — " f"{len(group)} similar functions") for f in group[1:]: lines.append(f" → {f['file']}:{f['start_line']}") lines.append("") lines.append("=" * 60) return '\n'.join(lines) # ── CLI ───────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Code Duplication Detector") parser.add_argument('--directory', default='.', help='Directory to scan (default: current directory)') parser.add_argument('--output', help='Output file for JSON report') parser.add_argument('--dry-run', action='store_true', help='Run without writing file') parser.add_argument('--threshold', type=float, default=0.95, help='Similarity threshold for near-dupes (default: 0.95)') parser.add_argument('--json', action='store_true', help='JSON output to stdout') parser.add_argument('--test', action='store_true', help='Run built-in test') args = parser.parse_args() if args.test: _run_test() return # Scan functions = scan_directory(args.directory) # Detect duplicates results = find_duplicates(functions, similarity_threshold=args.threshold) stats = results['stats'] # Output if args.json: print(json.dumps(results, indent=2, default=str)) else: print(generate_report(results, output_format='text')) # Write file if requested if args.output and not args.dry_run: os.makedirs(os.path.dirname(args.output) or '.', exist_ok=True) with open(args.output, 'w') as f: json.dump(results, f, indent=2, default=str) print(f"\nReport written to: {args.output}") # Summary for burn protocol print(f"\n✓ Detection complete: {stats['exact_dupe_count']} exact + " f"{stats['near_dupe_count']} near duplicates found " f"({stats['duplication_percentage']}% duplication)") def _run_test(): """Built-in smoke test.""" import tempfile import os with tempfile.TemporaryDirectory() as tmpdir: # Create test files with duplicate code f1 = Path(tmpdir) / 'mod1.py' f1.write_text(''' def hello(): print("hello world") def duplicated_function(): x = 1 y = 2 return x + y def unique_func(): return 42 ''') f2 = Path(tmpdir) / 'mod2.py' f2.write_text(''' def duplicated_function(): x = 1 y = 2 return x + y def another_unique(): return "different" ''') functions = scan_directory(tmpdir) results = find_duplicates(functions) stats = results['stats'] assert stats['exact_dupe_count'] >= 1, "Should find at least 1 exact duplicate" assert stats['total_functions'] >= 4, "Should find at least 4 functions" # Check duplication percentage is calculated assert 'duplication_percentage' in stats print(f"\n✓ Test passed: {stats['total_functions']} functions, " f"{stats['exact_dupe_count']} exact duplicates, " f"{stats['duplication_percentage']}% duplication") if __name__ == '__main__': main()