From 180464cc5e4f491dc863fe07d4adddff7661b9af Mon Sep 17 00:00:00 2001 From: Rockachopa Date: Sun, 26 Apr 2026 11:19:30 -0400 Subject: [PATCH] feat: add code duplication detector (#162) Detect duplicate functions/blocks across Python files. Reports duplication percentage and outputs JSON report. Closes #162. --- scripts/code_duplication_detector.py | 366 ++++++++++++++++++++++ scripts/test_code_duplication_detector.py | 168 ++++++++++ 2 files changed, 534 insertions(+) create mode 100644 scripts/code_duplication_detector.py create mode 100644 scripts/test_code_duplication_detector.py diff --git a/scripts/code_duplication_detector.py b/scripts/code_duplication_detector.py new file mode 100644 index 0000000..75146d7 --- /dev/null +++ b/scripts/code_duplication_detector.py @@ -0,0 +1,366 @@ +#!/usr/bin/env python3 +""" +Code Duplication Detector — Issue #162 + +Finds duplicate functions and code blocks across Python source files. +Reports duplication percentage and outputs a duplication report. + +Usage: + python3 scripts/code_duplication_detector.py --output reports/code_duplication.json + python3 scripts/code_duplication_detector.py --directory scripts/ --dry-run + python3 scripts/code_duplication_detector.py --test # Run built-in test +""" + +import argparse +import hashlib +import json +import os +import re +import sys +from collections import defaultdict +from datetime import datetime, timezone +from pathlib import Path +from typing import List, Dict, Tuple, Optional + + +# ── AST helpers ──────────────────────────────────────────────────────────── + +def normalize_code(text: str) -> str: + """Normalize code for comparison: strip comments, normalize whitespace.""" + # Remove comments (both # and docstring triple-quote strings) + text = re.sub(r'#.*$', '', text, flags=re.MULTILINE) + text = re.sub(r'""".*?"""', '', text, flags=re.DOTALL) + text = re.sub(r"'''.*?'''", '', text, flags=re.DOTALL) + # Normalize whitespace + text = re.sub(r'\s+', ' ', text).strip() + return text.lower() + + +def code_hash(text: str) -> str: + """SHA256 hash of normalized code for exact duplicate detection.""" + normalized = normalize_code(text) + return hashlib.sha256(normalized.encode('utf-8')).hexdigest() + + +# ── Function extraction via AST ──────────────────────────────────────────── + +class FunctionExtractor: + """Extract function and method definitions with their full source bodies.""" + + def __init__(self, source: str, filepath: str): + self.source = source + self.filepath = filepath + self.lines = source.splitlines() + self.functions: List[Dict] = [] + + def _get_source_segment(self, start_lineno: int, end_lineno: int) -> str: + """Get source code from start to end line (1-indexed, inclusive).""" + # AST end_lineno is inclusive + start_idx = start_lineno - 1 + end_idx = end_lineno + return '\n'.join(self.lines[start_idx:end_idx]) + + def visit(self, tree): + """Collect all function and async function definitions.""" + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef) or isinstance(node, ast.AsyncFunctionDef): + # Get the full source for this function including decorators + start = node.lineno + end = node.end_lineno + body_source = self._get_source_segment(start, end) + + # Also collect parent class name if this is a method + class_name = None + parent = node.parent if hasattr(node, 'parent') else None + if parent and isinstance(parent, ast.ClassDef): + class_name = parent.name + + self.functions.append({ + 'name': node.name, + 'file': self.filepath, + 'start_line': start, + 'end_line': end, + 'body': body_source, + 'class_name': class_name, + 'is_method': class_name is not None, + }) + + +import ast + +class ParentNodeVisitor(ast.NodeVisitor): + """Annotate nodes with parent references.""" + def __init__(self, parent=None): + self.parent = parent + + def generic_visit(self, node): + node.parent = self.parent + for child in ast.iter_child_nodes(node): + self.__class__(child).parent = node + super().generic_visit(node) + + +def extract_functions_from_file(filepath: str) -> List[Dict]: + """Extract all function definitions from a Python file.""" + try: + with open(filepath, 'r', encoding='utf-8', errors='replace') as f: + source = f.read() + tree = ast.parse(source, filename=str(filepath)) + + # Annotate with parent references + for node in ast.walk(tree): + for child in ast.iter_child_nodes(node): + child.parent = node + + extractor = FunctionExtractor(source, str(filepath)) + extractor.visit(tree) + return extractor.functions + except (SyntaxError, UnicodeDecodeError, OSError) as e: + return [] + + +def scan_directory(directory: str, extensions: Tuple[str, ...] = ('.py',)) -> List[Dict]: + """Scan directory for Python files and extract all functions.""" + all_functions = [] + path = Path(directory) + + for filepath in path.rglob('*'): + if filepath.is_file() and filepath.suffix in extensions: + # Skip common non-source dirs + parts = filepath.parts + if any(ex in parts for ex in ('__pycache__', 'node_modules', '.git', 'venv', '.venv', 'dist', 'build')): + continue + if filepath.name.startswith('.'): + continue + + functions = extract_functions_from_file(str(filepath)) + all_functions.extend(functions) + + return all_functions + + +# ── Duplicate detection ───────────────────────────────────────────────────── + +def find_duplicates(functions: List[Dict], similarity_threshold: float = 0.95) -> Dict: + """ + Find duplicate and near-duplicate functions. + + Returns dict with: + - exact_duplicates: {hash: [function_info, ...]} + - near_duplicates: [[function_info, ...], ...] + - stats: total_functions, unique_exact, exact_dupe_count, near_dupe_count + """ + # Phase 1: Exact duplicates by code hash + hash_groups: Dict[str, List[Dict]] = defaultdict(list) + for func in functions: + h = code_hash(func['body']) + hash_groups[h].append(func) + + exact_duplicates = {h: group for h, group in hash_groups.items() if len(group) > 1} + exact_dupe_count = sum(len(group) - 1 for group in exact_duplicates.values()) + + # Phase 2: Near-duplicates (among the unique-by-hash set) + # We compare token overlap for functions that have different hashes + unique_by_hash = [funcs[0] for funcs in hash_groups.values()] + near_duplicate_groups = [] + + # Simple token-based similarity + def tokenize(code: str) -> set: + return set(re.findall(r'[a-zA-Z_][a-zA-Z0-9_]*', code.lower())) + + i = 0 + while i < len(unique_by_hash): + group = [unique_by_hash[i]] + j = i + 1 + while j < len(unique_by_hash): + tokens_i = tokenize(unique_by_hash[i]['body']) + tokens_j = tokenize(unique_by_hash[j]['body']) + if not tokens_i or not tokens_j: + j += 1 + continue + intersection = tokens_i & tokens_j + union = tokens_i | tokens_j + similarity = len(intersection) / len(union) if union else 0.0 + + if similarity >= similarity_threshold: + group.append(unique_by_hash[j]) + unique_by_hash.pop(j) + else: + j += 1 + + if len(group) > 1: + near_duplicate_groups.append(group) + i += 1 + + near_dupe_count = sum(len(g) - 1 for g in near_duplicate_groups) + + stats = { + 'total_functions': len(functions), + 'unique_exact': len(hash_groups), + 'exact_dupe_count': exact_dupe_count, + 'near_dupe_count': near_dupe_count, + 'total_duplicates': exact_dupe_count + near_dupe_count, + } + + # Calculate duplication percentage based on lines + total_lines = sum(f['end_line'] - f['start_line'] + 1 for f in functions) + dupe_lines = 0 + for group in exact_duplicates.values(): + # Count all but one as duplicates + for f in group[1:]: + dupe_lines += f['end_line'] - f['start_line'] + 1 + for group in near_duplicate_groups: + for f in group[1:]: + dupe_lines += f['end_line'] - f['start_line'] + 1 + + stats['total_lines'] = total_lines + stats['duplicate_lines'] = dupe_lines + stats['duplication_percentage'] = round((dupe_lines / total_lines * 100) if total_lines else 0, 2) + + return { + 'exact_duplicates': exact_duplicates, + 'near_duplicates': near_duplicate_groups, + 'stats': stats, + } + + +# ── Report generation ──────────────────────────────────────────────────────── + +def generate_report(results: Dict, output_format: str = 'json') -> str: + """Generate human-readable report from detection results.""" + stats = results['stats'] + + if output_format == 'json': + return json.dumps(results, indent=2, default=str) + + # Text report + lines = [ + "=" * 60, + " CODE DUPLICATION REPORT", + "=" * 60, + f" Total functions scanned: {stats['total_functions']}", + f" Unique functions: {stats['unique_exact']}", + f" Exact duplicates: {stats['exact_dupe_count']}", + f" Near-duplicates: {stats['near_dupe_count']}", + f" Total lines: {stats['total_lines']}", + f" Duplicate lines: {stats['duplicate_lines']}", + f" Duplication %: {stats['duplication_percentage']}%", + "", + ] + + if results['exact_duplicates']: + lines.append(" Exact duplicate functions:") + for h, group in results['exact_duplicates'].items(): + first = group[0] + lines.append(f" {first['name']} ({first['file']}:{first['start_line']}) — " + f"copied {len(group)-1}x in:") + for f in group[1:]: + lines.append(f" → {f['file']}:{f['start_line']}") + lines.append("") + + if results['near_duplicates']: + lines.append(" Near-duplicate function groups:") + for i, group in enumerate(results['near_duplicates'], 1): + first = group[0] + lines.append(f" Group {i}: {first['name']} ({first['file']}:{first['start_line']}) — " + f"{len(group)} similar functions") + for f in group[1:]: + lines.append(f" → {f['file']}:{f['start_line']}") + lines.append("") + + lines.append("=" * 60) + return '\n'.join(lines) + + +# ── CLI ───────────────────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser(description="Code Duplication Detector") + parser.add_argument('--directory', default='.', + help='Directory to scan (default: current directory)') + parser.add_argument('--output', help='Output file for JSON report') + parser.add_argument('--dry-run', action='store_true', help='Run without writing file') + parser.add_argument('--threshold', type=float, default=0.95, + help='Similarity threshold for near-dupes (default: 0.95)') + parser.add_argument('--json', action='store_true', help='JSON output to stdout') + parser.add_argument('--test', action='store_true', help='Run built-in test') + args = parser.parse_args() + + if args.test: + _run_test() + return + + # Scan + functions = scan_directory(args.directory) + + # Detect duplicates + results = find_duplicates(functions, similarity_threshold=args.threshold) + stats = results['stats'] + + # Output + if args.json: + print(json.dumps(results, indent=2, default=str)) + else: + print(generate_report(results, output_format='text')) + + # Write file if requested + if args.output and not args.dry_run: + os.makedirs(os.path.dirname(args.output) or '.', exist_ok=True) + with open(args.output, 'w') as f: + json.dump(results, f, indent=2, default=str) + print(f"\nReport written to: {args.output}") + + # Summary for burn protocol + print(f"\n✓ Detection complete: {stats['exact_dupe_count']} exact + " + f"{stats['near_dupe_count']} near duplicates found " + f"({stats['duplication_percentage']}% duplication)") + + +def _run_test(): + """Built-in smoke test.""" + import tempfile + import os + + with tempfile.TemporaryDirectory() as tmpdir: + # Create test files with duplicate code + f1 = Path(tmpdir) / 'mod1.py' + f1.write_text(''' +def hello(): + print("hello world") + +def duplicated_function(): + x = 1 + y = 2 + return x + y + +def unique_func(): + return 42 +''') + + f2 = Path(tmpdir) / 'mod2.py' + f2.write_text(''' +def duplicated_function(): + x = 1 + y = 2 + return x + y + +def another_unique(): + return "different" +''') + + functions = scan_directory(tmpdir) + results = find_duplicates(functions) + + stats = results['stats'] + assert stats['exact_dupe_count'] >= 1, "Should find at least 1 exact duplicate" + assert stats['total_functions'] >= 4, "Should find at least 4 functions" + + # Check duplication percentage is calculated + assert 'duplication_percentage' in stats + print(f"\n✓ Test passed: {stats['total_functions']} functions, " + f"{stats['exact_dupe_count']} exact duplicates, " + f"{stats['duplication_percentage']}% duplication") + + +if __name__ == '__main__': + main() diff --git a/scripts/test_code_duplication_detector.py b/scripts/test_code_duplication_detector.py new file mode 100644 index 0000000..cc50e21 --- /dev/null +++ b/scripts/test_code_duplication_detector.py @@ -0,0 +1,168 @@ +#!/usr/bin/env python3 +""" +Smoke test for code duplication detector — verifies: + - Function extraction from Python files + - Exact duplicate detection + - Near-duplicate detection (token similarity) + - Report generation and stats + - JSON output format +""" + +import json +import sys +import tempfile +from pathlib import Path + +SCRIPT_DIR = Path(__file__).parent.absolute() +sys.path.insert(0, str(SCRIPT_DIR)) + +from code_duplication_detector import ( + extract_functions_from_file, + scan_directory, + find_duplicates, + generate_report, +) + + +def test_extract_functions(): + """Test that function extraction works.""" + with tempfile.TemporaryDirectory() as tmpdir: + test_file = Path(tmpdir) / 'sample.py' + test_file.write_text(''' +def foo(): + return 1 + +def bar(): + return 2 + +class MyClass: + def method(self): + return 3 +''') + functions = extract_functions_from_file(str(test_file)) + assert len(functions) == 3, f"Expected 3 functions, got {len(functions)}" + names = {f['name'] for f in functions} + assert names == {'foo', 'bar', 'method'}, f"Names mismatch: {names}" + print(" [PASS] function extraction works") + + +def test_exact_duplicate_detection(): + """Test that identical functions are flagged as duplicates.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Create two files with the same function + f1 = Path(tmpdir) / 'a.py' + f1.write_text(''' +def duplicated(): + x = 1 + y = 2 + return x + y +''') + f2 = Path(tmpdir) / 'b.py' + f2.write_text(''' +def duplicated(): + x = 1 + y = 2 + return x + y +''') + functions = scan_directory(tmpdir) + results = find_duplicates(functions) + stats = results['stats'] + assert stats['exact_dupe_count'] >= 1, f"Expected exact duplicate, got count={stats['exact_dupe_count']}" + assert len(results['exact_duplicates']) >= 1, "Should have at least one duplicate group" + print(" [PASS] exact duplicate detection works") + + +def test_unique_functions_not_flagged(): + """Test that different functions are not flagged as duplicates.""" + with tempfile.TemporaryDirectory() as tmpdir: + f1 = Path(tmpdir) / 'a.py' + f1.write_text('def func_a(): return 1') + f2 = Path(tmpdir) / 'b.py' + f2.write_text('def func_b(): return 2') + functions = scan_directory(tmpdir) + results = find_duplicates(functions) + assert results['stats']['exact_dupe_count'] == 0 + assert len(results['exact_duplicates']) == 0 + print(" [PASS] unique functions not flagged as duplicates") + + +def test_duplication_percentage_calculated(): + """Test that duplication percentage is computed.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Create file with mostly duplicated content + f1 = Path(tmpdir) / 'a.py' + f1.write_text(''' +def common(): + x = 1 + y = 2 + return x + y + +def unique1(): + return 100 +''') + f2 = Path(tmpdir) / 'b.py' + f2.write_text(''' +def common(): + x = 1 + y = 2 + return x + y + +def unique2(): + return 200 +''') + functions = scan_directory(tmpdir) + results = find_duplicates(functions) + stats = results['stats'] + assert 'duplication_percentage' in stats + # 2 copies of common (6 lines), 1 unique in each (2 lines each) = 10 total + # Duplicate lines = 6 (one copy marked duplicate) → ~60% + assert stats['duplication_percentage'] > 0 + print(f" [PASS] duplication percentage computed: {stats['duplication_percentage']}%") + + +def test_report_output_format(): + """Test that report output is valid.""" + with tempfile.TemporaryDirectory() as tmpdir: + f1 = Path(tmpdir) / 'a.py' + f1.write_text('def dup(): return 1') + f2 = Path(tmpdir) / 'b.py' + f2.write_text('def dup(): return 1') + functions = scan_directory(tmpdir) + results = find_duplicates(functions) + + # Text report + text = generate_report(results, output_format='text') + assert 'CODE DUPLICATION REPORT' in text + assert 'Total functions' in text + print(" [PASS] text report format valid") + + # JSON report + json_out = generate_report(results, output_format='json') + data = json.loads(json_out) + assert 'stats' in data + assert 'exact_duplicates' in data + print(" [PASS] JSON report format valid") + + +def test_scan_directory_recursive(): + """Test that nested directories are scanned.""" + with tempfile.TemporaryDirectory() as tmpdir: + subdir = Path(tmpdir) / 'sub' + subdir.mkdir() + (subdir / 'nested.py').write_text('def nested(): pass') + (Path(tmpdir) / 'root.py').write_text('def root(): pass') + functions = scan_directory(tmpdir) + names = {f['name'] for f in functions} + assert 'nested' in names and 'root' in names + print(" [PASS] recursive directory scanning works") + + +if __name__ == '__main__': + print("Running code duplication detector smoke tests...") + test_extract_functions() + test_exact_duplicate_detection() + test_unique_functions_not_flagged() + test_duplication_percentage_calculated() + test_report_output_format() + test_scan_directory_recursive() + print("\nAll tests passed.")