Compare commits
1 Commits
step35/162
...
step35/151
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5f6a7f7265 |
@@ -1,366 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Code Duplication Detector — Issue #162
|
||||
|
||||
Finds duplicate functions and code blocks across Python source files.
|
||||
Reports duplication percentage and outputs a duplication report.
|
||||
|
||||
Usage:
|
||||
python3 scripts/code_duplication_detector.py --output reports/code_duplication.json
|
||||
python3 scripts/code_duplication_detector.py --directory scripts/ --dry-run
|
||||
python3 scripts/code_duplication_detector.py --test # Run built-in test
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Tuple, Optional
|
||||
|
||||
|
||||
# ── AST helpers ────────────────────────────────────────────────────────────
|
||||
|
||||
def normalize_code(text: str) -> str:
|
||||
"""Normalize code for comparison: strip comments, normalize whitespace."""
|
||||
# Remove comments (both # and docstring triple-quote strings)
|
||||
text = re.sub(r'#.*$', '', text, flags=re.MULTILINE)
|
||||
text = re.sub(r'""".*?"""', '', text, flags=re.DOTALL)
|
||||
text = re.sub(r"'''.*?'''", '', text, flags=re.DOTALL)
|
||||
# Normalize whitespace
|
||||
text = re.sub(r'\s+', ' ', text).strip()
|
||||
return text.lower()
|
||||
|
||||
|
||||
def code_hash(text: str) -> str:
|
||||
"""SHA256 hash of normalized code for exact duplicate detection."""
|
||||
normalized = normalize_code(text)
|
||||
return hashlib.sha256(normalized.encode('utf-8')).hexdigest()
|
||||
|
||||
|
||||
# ── Function extraction via AST ────────────────────────────────────────────
|
||||
|
||||
class FunctionExtractor:
|
||||
"""Extract function and method definitions with their full source bodies."""
|
||||
|
||||
def __init__(self, source: str, filepath: str):
|
||||
self.source = source
|
||||
self.filepath = filepath
|
||||
self.lines = source.splitlines()
|
||||
self.functions: List[Dict] = []
|
||||
|
||||
def _get_source_segment(self, start_lineno: int, end_lineno: int) -> str:
|
||||
"""Get source code from start to end line (1-indexed, inclusive)."""
|
||||
# AST end_lineno is inclusive
|
||||
start_idx = start_lineno - 1
|
||||
end_idx = end_lineno
|
||||
return '\n'.join(self.lines[start_idx:end_idx])
|
||||
|
||||
def visit(self, tree):
|
||||
"""Collect all function and async function definitions."""
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.FunctionDef) or isinstance(node, ast.AsyncFunctionDef):
|
||||
# Get the full source for this function including decorators
|
||||
start = node.lineno
|
||||
end = node.end_lineno
|
||||
body_source = self._get_source_segment(start, end)
|
||||
|
||||
# Also collect parent class name if this is a method
|
||||
class_name = None
|
||||
parent = node.parent if hasattr(node, 'parent') else None
|
||||
if parent and isinstance(parent, ast.ClassDef):
|
||||
class_name = parent.name
|
||||
|
||||
self.functions.append({
|
||||
'name': node.name,
|
||||
'file': self.filepath,
|
||||
'start_line': start,
|
||||
'end_line': end,
|
||||
'body': body_source,
|
||||
'class_name': class_name,
|
||||
'is_method': class_name is not None,
|
||||
})
|
||||
|
||||
|
||||
import ast
|
||||
|
||||
class ParentNodeVisitor(ast.NodeVisitor):
|
||||
"""Annotate nodes with parent references."""
|
||||
def __init__(self, parent=None):
|
||||
self.parent = parent
|
||||
|
||||
def generic_visit(self, node):
|
||||
node.parent = self.parent
|
||||
for child in ast.iter_child_nodes(node):
|
||||
self.__class__(child).parent = node
|
||||
super().generic_visit(node)
|
||||
|
||||
|
||||
def extract_functions_from_file(filepath: str) -> List[Dict]:
|
||||
"""Extract all function definitions from a Python file."""
|
||||
try:
|
||||
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
|
||||
source = f.read()
|
||||
tree = ast.parse(source, filename=str(filepath))
|
||||
|
||||
# Annotate with parent references
|
||||
for node in ast.walk(tree):
|
||||
for child in ast.iter_child_nodes(node):
|
||||
child.parent = node
|
||||
|
||||
extractor = FunctionExtractor(source, str(filepath))
|
||||
extractor.visit(tree)
|
||||
return extractor.functions
|
||||
except (SyntaxError, UnicodeDecodeError, OSError) as e:
|
||||
return []
|
||||
|
||||
|
||||
def scan_directory(directory: str, extensions: Tuple[str, ...] = ('.py',)) -> List[Dict]:
|
||||
"""Scan directory for Python files and extract all functions."""
|
||||
all_functions = []
|
||||
path = Path(directory)
|
||||
|
||||
for filepath in path.rglob('*'):
|
||||
if filepath.is_file() and filepath.suffix in extensions:
|
||||
# Skip common non-source dirs
|
||||
parts = filepath.parts
|
||||
if any(ex in parts for ex in ('__pycache__', 'node_modules', '.git', 'venv', '.venv', 'dist', 'build')):
|
||||
continue
|
||||
if filepath.name.startswith('.'):
|
||||
continue
|
||||
|
||||
functions = extract_functions_from_file(str(filepath))
|
||||
all_functions.extend(functions)
|
||||
|
||||
return all_functions
|
||||
|
||||
|
||||
# ── Duplicate detection ─────────────────────────────────────────────────────
|
||||
|
||||
def find_duplicates(functions: List[Dict], similarity_threshold: float = 0.95) -> Dict:
|
||||
"""
|
||||
Find duplicate and near-duplicate functions.
|
||||
|
||||
Returns dict with:
|
||||
- exact_duplicates: {hash: [function_info, ...]}
|
||||
- near_duplicates: [[function_info, ...], ...]
|
||||
- stats: total_functions, unique_exact, exact_dupe_count, near_dupe_count
|
||||
"""
|
||||
# Phase 1: Exact duplicates by code hash
|
||||
hash_groups: Dict[str, List[Dict]] = defaultdict(list)
|
||||
for func in functions:
|
||||
h = code_hash(func['body'])
|
||||
hash_groups[h].append(func)
|
||||
|
||||
exact_duplicates = {h: group for h, group in hash_groups.items() if len(group) > 1}
|
||||
exact_dupe_count = sum(len(group) - 1 for group in exact_duplicates.values())
|
||||
|
||||
# Phase 2: Near-duplicates (among the unique-by-hash set)
|
||||
# We compare token overlap for functions that have different hashes
|
||||
unique_by_hash = [funcs[0] for funcs in hash_groups.values()]
|
||||
near_duplicate_groups = []
|
||||
|
||||
# Simple token-based similarity
|
||||
def tokenize(code: str) -> set:
|
||||
return set(re.findall(r'[a-zA-Z_][a-zA-Z0-9_]*', code.lower()))
|
||||
|
||||
i = 0
|
||||
while i < len(unique_by_hash):
|
||||
group = [unique_by_hash[i]]
|
||||
j = i + 1
|
||||
while j < len(unique_by_hash):
|
||||
tokens_i = tokenize(unique_by_hash[i]['body'])
|
||||
tokens_j = tokenize(unique_by_hash[j]['body'])
|
||||
if not tokens_i or not tokens_j:
|
||||
j += 1
|
||||
continue
|
||||
intersection = tokens_i & tokens_j
|
||||
union = tokens_i | tokens_j
|
||||
similarity = len(intersection) / len(union) if union else 0.0
|
||||
|
||||
if similarity >= similarity_threshold:
|
||||
group.append(unique_by_hash[j])
|
||||
unique_by_hash.pop(j)
|
||||
else:
|
||||
j += 1
|
||||
|
||||
if len(group) > 1:
|
||||
near_duplicate_groups.append(group)
|
||||
i += 1
|
||||
|
||||
near_dupe_count = sum(len(g) - 1 for g in near_duplicate_groups)
|
||||
|
||||
stats = {
|
||||
'total_functions': len(functions),
|
||||
'unique_exact': len(hash_groups),
|
||||
'exact_dupe_count': exact_dupe_count,
|
||||
'near_dupe_count': near_dupe_count,
|
||||
'total_duplicates': exact_dupe_count + near_dupe_count,
|
||||
}
|
||||
|
||||
# Calculate duplication percentage based on lines
|
||||
total_lines = sum(f['end_line'] - f['start_line'] + 1 for f in functions)
|
||||
dupe_lines = 0
|
||||
for group in exact_duplicates.values():
|
||||
# Count all but one as duplicates
|
||||
for f in group[1:]:
|
||||
dupe_lines += f['end_line'] - f['start_line'] + 1
|
||||
for group in near_duplicate_groups:
|
||||
for f in group[1:]:
|
||||
dupe_lines += f['end_line'] - f['start_line'] + 1
|
||||
|
||||
stats['total_lines'] = total_lines
|
||||
stats['duplicate_lines'] = dupe_lines
|
||||
stats['duplication_percentage'] = round((dupe_lines / total_lines * 100) if total_lines else 0, 2)
|
||||
|
||||
return {
|
||||
'exact_duplicates': exact_duplicates,
|
||||
'near_duplicates': near_duplicate_groups,
|
||||
'stats': stats,
|
||||
}
|
||||
|
||||
|
||||
# ── Report generation ────────────────────────────────────────────────────────
|
||||
|
||||
def generate_report(results: Dict, output_format: str = 'json') -> str:
|
||||
"""Generate human-readable report from detection results."""
|
||||
stats = results['stats']
|
||||
|
||||
if output_format == 'json':
|
||||
return json.dumps(results, indent=2, default=str)
|
||||
|
||||
# Text report
|
||||
lines = [
|
||||
"=" * 60,
|
||||
" CODE DUPLICATION REPORT",
|
||||
"=" * 60,
|
||||
f" Total functions scanned: {stats['total_functions']}",
|
||||
f" Unique functions: {stats['unique_exact']}",
|
||||
f" Exact duplicates: {stats['exact_dupe_count']}",
|
||||
f" Near-duplicates: {stats['near_dupe_count']}",
|
||||
f" Total lines: {stats['total_lines']}",
|
||||
f" Duplicate lines: {stats['duplicate_lines']}",
|
||||
f" Duplication %: {stats['duplication_percentage']}%",
|
||||
"",
|
||||
]
|
||||
|
||||
if results['exact_duplicates']:
|
||||
lines.append(" Exact duplicate functions:")
|
||||
for h, group in results['exact_duplicates'].items():
|
||||
first = group[0]
|
||||
lines.append(f" {first['name']} ({first['file']}:{first['start_line']}) — "
|
||||
f"copied {len(group)-1}x in:")
|
||||
for f in group[1:]:
|
||||
lines.append(f" → {f['file']}:{f['start_line']}")
|
||||
lines.append("")
|
||||
|
||||
if results['near_duplicates']:
|
||||
lines.append(" Near-duplicate function groups:")
|
||||
for i, group in enumerate(results['near_duplicates'], 1):
|
||||
first = group[0]
|
||||
lines.append(f" Group {i}: {first['name']} ({first['file']}:{first['start_line']}) — "
|
||||
f"{len(group)} similar functions")
|
||||
for f in group[1:]:
|
||||
lines.append(f" → {f['file']}:{f['start_line']}")
|
||||
lines.append("")
|
||||
|
||||
lines.append("=" * 60)
|
||||
return '\n'.join(lines)
|
||||
|
||||
|
||||
# ── CLI ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Code Duplication Detector")
|
||||
parser.add_argument('--directory', default='.',
|
||||
help='Directory to scan (default: current directory)')
|
||||
parser.add_argument('--output', help='Output file for JSON report')
|
||||
parser.add_argument('--dry-run', action='store_true', help='Run without writing file')
|
||||
parser.add_argument('--threshold', type=float, default=0.95,
|
||||
help='Similarity threshold for near-dupes (default: 0.95)')
|
||||
parser.add_argument('--json', action='store_true', help='JSON output to stdout')
|
||||
parser.add_argument('--test', action='store_true', help='Run built-in test')
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.test:
|
||||
_run_test()
|
||||
return
|
||||
|
||||
# Scan
|
||||
functions = scan_directory(args.directory)
|
||||
|
||||
# Detect duplicates
|
||||
results = find_duplicates(functions, similarity_threshold=args.threshold)
|
||||
stats = results['stats']
|
||||
|
||||
# Output
|
||||
if args.json:
|
||||
print(json.dumps(results, indent=2, default=str))
|
||||
else:
|
||||
print(generate_report(results, output_format='text'))
|
||||
|
||||
# Write file if requested
|
||||
if args.output and not args.dry_run:
|
||||
os.makedirs(os.path.dirname(args.output) or '.', exist_ok=True)
|
||||
with open(args.output, 'w') as f:
|
||||
json.dump(results, f, indent=2, default=str)
|
||||
print(f"\nReport written to: {args.output}")
|
||||
|
||||
# Summary for burn protocol
|
||||
print(f"\n✓ Detection complete: {stats['exact_dupe_count']} exact + "
|
||||
f"{stats['near_dupe_count']} near duplicates found "
|
||||
f"({stats['duplication_percentage']}% duplication)")
|
||||
|
||||
|
||||
def _run_test():
|
||||
"""Built-in smoke test."""
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create test files with duplicate code
|
||||
f1 = Path(tmpdir) / 'mod1.py'
|
||||
f1.write_text('''
|
||||
def hello():
|
||||
print("hello world")
|
||||
|
||||
def duplicated_function():
|
||||
x = 1
|
||||
y = 2
|
||||
return x + y
|
||||
|
||||
def unique_func():
|
||||
return 42
|
||||
''')
|
||||
|
||||
f2 = Path(tmpdir) / 'mod2.py'
|
||||
f2.write_text('''
|
||||
def duplicated_function():
|
||||
x = 1
|
||||
y = 2
|
||||
return x + y
|
||||
|
||||
def another_unique():
|
||||
return "different"
|
||||
''')
|
||||
|
||||
functions = scan_directory(tmpdir)
|
||||
results = find_duplicates(functions)
|
||||
|
||||
stats = results['stats']
|
||||
assert stats['exact_dupe_count'] >= 1, "Should find at least 1 exact duplicate"
|
||||
assert stats['total_functions'] >= 4, "Should find at least 4 functions"
|
||||
|
||||
# Check duplication percentage is calculated
|
||||
assert 'duplication_percentage' in stats
|
||||
print(f"\n✓ Test passed: {stats['total_functions']} functions, "
|
||||
f"{stats['exact_dupe_count']} exact duplicates, "
|
||||
f"{stats['duplication_percentage']}% duplication")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
206
scripts/graph_visualizer.py
Executable file
206
scripts/graph_visualizer.py
Executable file
@@ -0,0 +1,206 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
graph_visualizer.py — Generate visual graph representations of the knowledge graph.
|
||||
|
||||
Reads knowledge/index.json and renders the fact relationship graph.
|
||||
Supports ASCII terminal output and DOT export for Graphviz.
|
||||
|
||||
Usage:
|
||||
python3 scripts/graph_visualizer.py # ASCII, all nodes
|
||||
python3 scripts/graph_visualizer.py --format dot # DOT output
|
||||
python3 scripts/graph_visualizer.py --seed root --max-depth 2
|
||||
python3 scripts/graph_visualizer.py --filter-domain hermes-agent
|
||||
python3 scripts/graph_visualizer.py --filter-category pitfall
|
||||
|
||||
Acceptance: [x] Subgraph extraction [x] ASCII rendering [x] DOT export [x] Configurable depth/filter
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
from collections import defaultdict, deque
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
def load_index(index_path: Path):
|
||||
with open(index_path) as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def build_adjacency(facts):
|
||||
adj = defaultdict(list)
|
||||
all_ids = {f['id'] for f in facts if 'id' in f}
|
||||
for f in facts:
|
||||
fid = f.get('id')
|
||||
if not fid:
|
||||
continue
|
||||
for rel in f.get('related', []):
|
||||
if rel in all_ids:
|
||||
adj[fid].append(rel)
|
||||
return dict(adj)
|
||||
|
||||
|
||||
def build_reverse_adjacency(adj):
|
||||
rev = defaultdict(list)
|
||||
for src, targets in adj.items():
|
||||
for tgt in targets:
|
||||
rev[tgt].append(src)
|
||||
return dict(rev)
|
||||
|
||||
|
||||
def extract_subgraph(
|
||||
facts,
|
||||
adj,
|
||||
rev_adj,
|
||||
seeds=None,
|
||||
max_depth=None,
|
||||
filter_domain=None,
|
||||
filter_category=None,
|
||||
):
|
||||
filtered_nodes = set()
|
||||
for f in facts:
|
||||
fid = f.get('id')
|
||||
if not fid:
|
||||
continue
|
||||
if filter_domain and f.get('domain') != filter_domain:
|
||||
continue
|
||||
if filter_category and f.get('category') != filter_category:
|
||||
continue
|
||||
filtered_nodes.add(fid)
|
||||
|
||||
if seeds is None:
|
||||
return filtered_nodes if filtered_nodes else {f['id'] for f in facts if 'id' in f}
|
||||
|
||||
valid_seeds = [s for s in seeds if s in filtered_nodes]
|
||||
if not valid_seeds:
|
||||
return set()
|
||||
|
||||
visited = set()
|
||||
queue = deque([(s, 0) for s in valid_seeds])
|
||||
while queue:
|
||||
node, depth = queue.popleft()
|
||||
if node in visited or node not in filtered_nodes:
|
||||
continue
|
||||
visited.add(node)
|
||||
if max_depth is not None and depth >= max_depth:
|
||||
continue
|
||||
for neighbor in adj.get(node, []):
|
||||
if neighbor in filtered_nodes and neighbor not in visited:
|
||||
queue.append((neighbor, depth + 1))
|
||||
for neighbor in rev_adj.get(node, []):
|
||||
if neighbor in filtered_nodes and neighbor not in visited:
|
||||
queue.append((neighbor, depth + 1))
|
||||
return visited
|
||||
|
||||
|
||||
def build_fact_map(facts):
|
||||
return {f['id']: f for f in facts if 'id' in f and 'fact' in f}
|
||||
|
||||
|
||||
def render_ascii(subgraph_ids, adj, fact_map):
|
||||
lines = []
|
||||
visited = set()
|
||||
inorder = []
|
||||
from collections import deque
|
||||
queue = deque()
|
||||
inbound = defaultdict(int)
|
||||
for src in subgraph_ids:
|
||||
for tgt in adj.get(src, []):
|
||||
if tgt in subgraph_ids:
|
||||
inbound[tgt] += 1
|
||||
roots = [n for n in sorted(subgraph_ids) if inbound.get(n, 0) == 0]
|
||||
if not roots:
|
||||
roots = sorted(subgraph_ids)
|
||||
for root in roots:
|
||||
queue.append((root, 0, None))
|
||||
while queue:
|
||||
node, depth, parent_label = queue.popleft()
|
||||
if node in visited:
|
||||
continue
|
||||
visited.add(node)
|
||||
fact = fact_map.get(node, {})
|
||||
label = fact.get('fact', str(node))[:80]
|
||||
category = fact.get('category', 'fact')
|
||||
domain = fact.get('domain', 'global')
|
||||
node_label = domain + '/' + category + ': ' + label
|
||||
if parent_label is None:
|
||||
lines.append(f"{' ' * depth}┌─ {node_label}")
|
||||
else:
|
||||
lines.append(f"{' ' * depth}├─ {node_label}")
|
||||
children = [c for c in adj.get(node, []) if c in subgraph_ids]
|
||||
for i, child in enumerate(children):
|
||||
queue.append((child, depth + 1, node))
|
||||
if len(visited) < len(subgraph_ids):
|
||||
lines.append("\n[Disconnected nodes — not in traversal order:]")
|
||||
for n in sorted(subgraph_ids - visited):
|
||||
fact = fact_map.get(n, {})
|
||||
label = fact.get('fact', n)[:60]
|
||||
lines.append(f" {n} — {label}")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def render_dot(subgraph_ids, adj, fact_map):
|
||||
lines = ["digraph knowledge_graph {", " rankdir=LR;"]
|
||||
cat_colors = {
|
||||
'fact': '#3498db',
|
||||
'pitfall': '#e74c3c',
|
||||
'pattern': '#2ecc71',
|
||||
'tool-quirk': '#f39c12',
|
||||
'question': '#9b59b6',
|
||||
}
|
||||
for nid in sorted(subgraph_ids):
|
||||
fact = fact_map.get(nid, {})
|
||||
category = fact.get('category', 'fact')
|
||||
domain = fact.get('domain', 'global')
|
||||
label = fact.get('fact', nid).replace('"', '\\"')[:80]
|
||||
fillcolor = cat_colors.get(category, '#666666')
|
||||
lines.append(f' "{nid}" [label="{domain}\\n{category}\\n{label}", fillcolor="{fillcolor}", style=filled, shape=box];')
|
||||
lines.append("")
|
||||
for src in sorted(subgraph_ids):
|
||||
for tgt in adj.get(src, []):
|
||||
if tgt in subgraph_ids:
|
||||
lines.append(f' "{src}" -> "{tgt}";')
|
||||
lines.append("}")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Visualize the knowledge graph (ASCII terminal or DOT for Graphviz).")
|
||||
parser.add_argument("--index", type=Path, default=Path(__file__).parent.parent / "knowledge" / "index.json",
|
||||
help="Path to knowledge/index.json")
|
||||
parser.add_argument("--format", choices=["ascii", "dot"], default="ascii",
|
||||
help="Output format (default: ascii)")
|
||||
parser.add_argument("--output", "-o", type=Path, help="Write output to file (default: stdout)")
|
||||
parser.add_argument("--seed", help="Starting fact ID (comma-sep). Omit to render full graph.")
|
||||
parser.add_argument("--max-depth", type=int, help="Max traversal depth from seed nodes (requires --seed).")
|
||||
parser.add_argument("--filter-domain", help="Only include facts from this domain.")
|
||||
parser.add_argument("--filter-category", help="Only include facts of this category.")
|
||||
args = parser.parse_args()
|
||||
|
||||
index = load_index(args.index)
|
||||
facts = index.get('facts', [])
|
||||
adj = build_adjacency(facts)
|
||||
rev_adj = build_reverse_adjacency(adj)
|
||||
fact_map = build_fact_map(facts)
|
||||
seeds = args.seed.split(',') if args.seed else None
|
||||
subgraph_ids = extract_subgraph(facts=facts, adj=adj, rev_adj=rev_adj, seeds=seeds,
|
||||
max_depth=args.max_depth,
|
||||
filter_domain=args.filter_domain,
|
||||
filter_category=args.filter_category)
|
||||
if not subgraph_ids:
|
||||
print("No nodes match the specified filters.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
if args.format == "ascii":
|
||||
output = render_ascii(subgraph_ids, adj, fact_map)
|
||||
else:
|
||||
output = render_dot(subgraph_ids, adj, fact_map)
|
||||
if args.output:
|
||||
args.output.write_text(output)
|
||||
print(f"Written: {args.output}", file=sys.stderr)
|
||||
else:
|
||||
print(output)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,351 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
PR Complexity Scorer - Estimate review effort for PRs.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from dataclasses import dataclass, asdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
|
||||
DEPENDENCY_FILES = {
|
||||
"requirements.txt", "pyproject.toml", "setup.py", "setup.cfg",
|
||||
"Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile",
|
||||
"go.mod", "Cargo.toml", "pom.xml", "build.gradle"
|
||||
}
|
||||
|
||||
TEST_PATTERNS = [
|
||||
r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$",
|
||||
r"spec/.*\.rb$", r".*_spec\.rb$",
|
||||
r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$"
|
||||
]
|
||||
|
||||
WEIGHT_FILES = 0.25
|
||||
WEIGHT_LINES = 0.25
|
||||
WEIGHT_DEPS = 0.30
|
||||
WEIGHT_TEST_COV = 0.20
|
||||
|
||||
SMALL_FILES = 5
|
||||
MEDIUM_FILES = 20
|
||||
LARGE_FILES = 50
|
||||
|
||||
SMALL_LINES = 100
|
||||
MEDIUM_LINES = 500
|
||||
LARGE_LINES = 2000
|
||||
|
||||
TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120}
|
||||
|
||||
|
||||
@dataclass
|
||||
class PRComplexity:
|
||||
pr_number: int
|
||||
title: str
|
||||
files_changed: int
|
||||
additions: int
|
||||
deletions: int
|
||||
has_dependency_changes: bool
|
||||
test_coverage_delta: Optional[int]
|
||||
score: int
|
||||
estimated_minutes: int
|
||||
reasons: List[str]
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return asdict(self)
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
def __init__(self, token: str):
|
||||
self.token = token
|
||||
self.base_url = GITEA_BASE.rstrip("/")
|
||||
|
||||
def _request(self, path: str, params: Dict = None) -> Any:
|
||||
url = f"{self.base_url}{path}"
|
||||
if params:
|
||||
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
|
||||
url += f"?{qs}"
|
||||
|
||||
req = urllib.request.Request(url)
|
||||
req.add_header("Authorization", f"token {self.token}")
|
||||
req.add_header("Content-Type", "application/json")
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
except urllib.error.HTTPError as e:
|
||||
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
|
||||
return None
|
||||
except urllib.error.URLError as e:
|
||||
print(f"Network error: {e}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
def get_open_prs(self, org: str, repo: str) -> List[Dict]:
|
||||
prs = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"})
|
||||
if not batch:
|
||||
break
|
||||
prs.extend(batch)
|
||||
if len(batch) < 50:
|
||||
break
|
||||
page += 1
|
||||
return prs
|
||||
|
||||
def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]:
|
||||
files = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = self._request(
|
||||
f"/repos/{org}/{repo}/pulls/{pr_number}/files",
|
||||
{"limit": 100, "page": page}
|
||||
)
|
||||
if not batch:
|
||||
break
|
||||
files.extend(batch)
|
||||
if len(batch) < 100:
|
||||
break
|
||||
page += 1
|
||||
return files
|
||||
|
||||
def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool:
|
||||
data = json.dumps({"body": body}).encode("utf-8")
|
||||
req = urllib.request.Request(
|
||||
f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments",
|
||||
data=data,
|
||||
method="POST",
|
||||
headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"}
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return resp.status in (200, 201)
|
||||
except urllib.error.HTTPError:
|
||||
return False
|
||||
|
||||
|
||||
def is_dependency_file(filename: str) -> bool:
|
||||
return any(filename.endswith(dep) for dep in DEPENDENCY_FILES)
|
||||
|
||||
|
||||
def is_test_file(filename: str) -> bool:
|
||||
return any(re.search(pattern, filename) for pattern in TEST_PATTERNS)
|
||||
|
||||
|
||||
def score_pr(
|
||||
files_changed: int,
|
||||
additions: int,
|
||||
deletions: int,
|
||||
has_dependency_changes: bool,
|
||||
test_coverage_delta: Optional[int] = None
|
||||
) -> tuple[int, int, List[str]]:
|
||||
score = 1.0
|
||||
reasons = []
|
||||
|
||||
# Files changed
|
||||
if files_changed <= SMALL_FILES:
|
||||
fscore = 1.0
|
||||
reasons.append("small number of files changed")
|
||||
elif files_changed <= MEDIUM_FILES:
|
||||
fscore = 2.0
|
||||
reasons.append("moderate number of files changed")
|
||||
elif files_changed <= LARGE_FILES:
|
||||
fscore = 2.5
|
||||
reasons.append("large number of files changed")
|
||||
else:
|
||||
fscore = 3.0
|
||||
reasons.append("very large PR spanning many files")
|
||||
|
||||
# Lines changed
|
||||
total_lines = additions + deletions
|
||||
if total_lines <= SMALL_LINES:
|
||||
lscore = 1.0
|
||||
reasons.append("small change size")
|
||||
elif total_lines <= MEDIUM_LINES:
|
||||
lscore = 2.0
|
||||
reasons.append("moderate change size")
|
||||
elif total_lines <= LARGE_LINES:
|
||||
lscore = 3.0
|
||||
reasons.append("large change size")
|
||||
else:
|
||||
lscore = 4.0
|
||||
reasons.append("very large change")
|
||||
|
||||
# Dependency changes
|
||||
if has_dependency_changes:
|
||||
dscore = 2.5
|
||||
reasons.append("dependency changes (architectural impact)")
|
||||
else:
|
||||
dscore = 0.0
|
||||
|
||||
# Test coverage delta
|
||||
tscore = 0.0
|
||||
if test_coverage_delta is not None:
|
||||
if test_coverage_delta > 0:
|
||||
reasons.append(f"test additions (+{test_coverage_delta} test files)")
|
||||
tscore = -min(2.0, test_coverage_delta / 2.0)
|
||||
elif test_coverage_delta < 0:
|
||||
reasons.append(f"test removals ({abs(test_coverage_delta)} test files)")
|
||||
tscore = min(2.0, abs(test_coverage_delta) * 0.5)
|
||||
else:
|
||||
reasons.append("test coverage change not assessed")
|
||||
|
||||
# Weighted sum, scaled by 3 to use full 1-10 range
|
||||
bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV)
|
||||
scaled_bonus = bonus * 3.0
|
||||
score = 1.0 + scaled_bonus
|
||||
|
||||
final_score = max(1, min(10, int(round(score))))
|
||||
est_minutes = TIME_PER_POINT.get(final_score, 30)
|
||||
|
||||
return final_score, est_minutes, reasons
|
||||
|
||||
|
||||
def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity:
|
||||
pr_num = pr_data["number"]
|
||||
title = pr_data.get("title", "")
|
||||
files = client.get_pr_files(org, repo, pr_num)
|
||||
|
||||
additions = sum(f.get("additions", 0) for f in files)
|
||||
deletions = sum(f.get("deletions", 0) for f in files)
|
||||
filenames = [f.get("filename", "") for f in files]
|
||||
|
||||
has_deps = any(is_dependency_file(f) for f in filenames)
|
||||
|
||||
test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", "")))
|
||||
test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", "")))
|
||||
test_delta = test_added - test_removed if (test_added or test_removed) else None
|
||||
|
||||
score, est_min, reasons = score_pr(
|
||||
files_changed=len(files),
|
||||
additions=additions,
|
||||
deletions=deletions,
|
||||
has_dependency_changes=has_deps,
|
||||
test_coverage_delta=test_delta
|
||||
)
|
||||
|
||||
return PRComplexity(
|
||||
pr_number=pr_num,
|
||||
title=title,
|
||||
files_changed=len(files),
|
||||
additions=additions,
|
||||
deletions=deletions,
|
||||
has_dependency_changes=has_deps,
|
||||
test_coverage_delta=test_delta,
|
||||
score=score,
|
||||
estimated_minutes=est_min,
|
||||
reasons=reasons
|
||||
)
|
||||
|
||||
|
||||
def build_comment(complexity: PRComplexity) -> str:
|
||||
change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines"
|
||||
deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else ""
|
||||
test_note = ""
|
||||
if complexity.test_coverage_delta is not None:
|
||||
if complexity.test_coverage_delta > 0:
|
||||
test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added"
|
||||
elif complexity.test_coverage_delta < 0:
|
||||
test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed"
|
||||
|
||||
comment = f"## 📊 PR Complexity Analysis\n\n"
|
||||
comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n"
|
||||
comment += f"| Metric | Value |\n|--------|-------|\n"
|
||||
comment += f"| Changes | {change_desc} |\n"
|
||||
comment += f"| Complexity Score | **{complexity.score}/10** |\n"
|
||||
comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n"
|
||||
comment += f"### Scoring rationale:"
|
||||
for r in complexity.reasons:
|
||||
comment += f"\n- {r}"
|
||||
if deps_note:
|
||||
comment += deps_note
|
||||
if test_note:
|
||||
comment += test_note
|
||||
comment += f"\n\n---\n"
|
||||
comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*"
|
||||
return comment
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="PR Complexity Scorer")
|
||||
parser.add_argument("--org", default="Timmy_Foundation")
|
||||
parser.add_argument("--repo", default="compounding-intelligence")
|
||||
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
|
||||
parser.add_argument("--dry-run", action="store_true")
|
||||
parser.add_argument("--apply", action="store_true")
|
||||
parser.add_argument("--output", default="metrics/pr_complexity.json")
|
||||
args = parser.parse_args()
|
||||
|
||||
token_path = args.token
|
||||
if os.path.exists(token_path):
|
||||
with open(token_path) as f:
|
||||
token = f.read().strip()
|
||||
else:
|
||||
token = args.token
|
||||
|
||||
if not token:
|
||||
print("ERROR: No Gitea token provided", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
client = GiteaClient(token)
|
||||
|
||||
print(f"Fetching open PRs for {args.org}/{args.repo}...")
|
||||
prs = client.get_open_prs(args.org, args.repo)
|
||||
if not prs:
|
||||
print("No open PRs found.")
|
||||
sys.exit(0)
|
||||
|
||||
print(f"Found {len(prs)} open PR(s). Analyzing...")
|
||||
|
||||
results = []
|
||||
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for pr in prs:
|
||||
pr_num = pr["number"]
|
||||
title = pr.get("title", "")
|
||||
print(f" Analyzing PR #{pr_num}: {title[:60]}")
|
||||
|
||||
try:
|
||||
complexity = analyze_pr(client, args.org, args.repo, pr)
|
||||
results.append(complexity.to_dict())
|
||||
|
||||
comment = build_comment(complexity)
|
||||
|
||||
if args.dry_run:
|
||||
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]")
|
||||
elif args.apply:
|
||||
success = client.post_comment(args.org, args.repo, pr_num, comment)
|
||||
status = "[commented]" if success else "[FAILED]"
|
||||
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}")
|
||||
else:
|
||||
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]")
|
||||
|
||||
except Exception as e:
|
||||
print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr)
|
||||
|
||||
with open(args.output, "w") as f:
|
||||
json.dump({
|
||||
"org": args.org,
|
||||
"repo": args.repo,
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"pr_count": len(results),
|
||||
"results": results
|
||||
}, f, indent=2)
|
||||
|
||||
if results:
|
||||
scores = [r["score"] for r in results]
|
||||
print(f"\nResults saved to {args.output}")
|
||||
print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}")
|
||||
else:
|
||||
print("\nNo results to save.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,168 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Smoke test for code duplication detector — verifies:
|
||||
- Function extraction from Python files
|
||||
- Exact duplicate detection
|
||||
- Near-duplicate detection (token similarity)
|
||||
- Report generation and stats
|
||||
- JSON output format
|
||||
"""
|
||||
|
||||
import json
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
SCRIPT_DIR = Path(__file__).parent.absolute()
|
||||
sys.path.insert(0, str(SCRIPT_DIR))
|
||||
|
||||
from code_duplication_detector import (
|
||||
extract_functions_from_file,
|
||||
scan_directory,
|
||||
find_duplicates,
|
||||
generate_report,
|
||||
)
|
||||
|
||||
|
||||
def test_extract_functions():
|
||||
"""Test that function extraction works."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
test_file = Path(tmpdir) / 'sample.py'
|
||||
test_file.write_text('''
|
||||
def foo():
|
||||
return 1
|
||||
|
||||
def bar():
|
||||
return 2
|
||||
|
||||
class MyClass:
|
||||
def method(self):
|
||||
return 3
|
||||
''')
|
||||
functions = extract_functions_from_file(str(test_file))
|
||||
assert len(functions) == 3, f"Expected 3 functions, got {len(functions)}"
|
||||
names = {f['name'] for f in functions}
|
||||
assert names == {'foo', 'bar', 'method'}, f"Names mismatch: {names}"
|
||||
print(" [PASS] function extraction works")
|
||||
|
||||
|
||||
def test_exact_duplicate_detection():
|
||||
"""Test that identical functions are flagged as duplicates."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create two files with the same function
|
||||
f1 = Path(tmpdir) / 'a.py'
|
||||
f1.write_text('''
|
||||
def duplicated():
|
||||
x = 1
|
||||
y = 2
|
||||
return x + y
|
||||
''')
|
||||
f2 = Path(tmpdir) / 'b.py'
|
||||
f2.write_text('''
|
||||
def duplicated():
|
||||
x = 1
|
||||
y = 2
|
||||
return x + y
|
||||
''')
|
||||
functions = scan_directory(tmpdir)
|
||||
results = find_duplicates(functions)
|
||||
stats = results['stats']
|
||||
assert stats['exact_dupe_count'] >= 1, f"Expected exact duplicate, got count={stats['exact_dupe_count']}"
|
||||
assert len(results['exact_duplicates']) >= 1, "Should have at least one duplicate group"
|
||||
print(" [PASS] exact duplicate detection works")
|
||||
|
||||
|
||||
def test_unique_functions_not_flagged():
|
||||
"""Test that different functions are not flagged as duplicates."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
f1 = Path(tmpdir) / 'a.py'
|
||||
f1.write_text('def func_a(): return 1')
|
||||
f2 = Path(tmpdir) / 'b.py'
|
||||
f2.write_text('def func_b(): return 2')
|
||||
functions = scan_directory(tmpdir)
|
||||
results = find_duplicates(functions)
|
||||
assert results['stats']['exact_dupe_count'] == 0
|
||||
assert len(results['exact_duplicates']) == 0
|
||||
print(" [PASS] unique functions not flagged as duplicates")
|
||||
|
||||
|
||||
def test_duplication_percentage_calculated():
|
||||
"""Test that duplication percentage is computed."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create file with mostly duplicated content
|
||||
f1 = Path(tmpdir) / 'a.py'
|
||||
f1.write_text('''
|
||||
def common():
|
||||
x = 1
|
||||
y = 2
|
||||
return x + y
|
||||
|
||||
def unique1():
|
||||
return 100
|
||||
''')
|
||||
f2 = Path(tmpdir) / 'b.py'
|
||||
f2.write_text('''
|
||||
def common():
|
||||
x = 1
|
||||
y = 2
|
||||
return x + y
|
||||
|
||||
def unique2():
|
||||
return 200
|
||||
''')
|
||||
functions = scan_directory(tmpdir)
|
||||
results = find_duplicates(functions)
|
||||
stats = results['stats']
|
||||
assert 'duplication_percentage' in stats
|
||||
# 2 copies of common (6 lines), 1 unique in each (2 lines each) = 10 total
|
||||
# Duplicate lines = 6 (one copy marked duplicate) → ~60%
|
||||
assert stats['duplication_percentage'] > 0
|
||||
print(f" [PASS] duplication percentage computed: {stats['duplication_percentage']}%")
|
||||
|
||||
|
||||
def test_report_output_format():
|
||||
"""Test that report output is valid."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
f1 = Path(tmpdir) / 'a.py'
|
||||
f1.write_text('def dup(): return 1')
|
||||
f2 = Path(tmpdir) / 'b.py'
|
||||
f2.write_text('def dup(): return 1')
|
||||
functions = scan_directory(tmpdir)
|
||||
results = find_duplicates(functions)
|
||||
|
||||
# Text report
|
||||
text = generate_report(results, output_format='text')
|
||||
assert 'CODE DUPLICATION REPORT' in text
|
||||
assert 'Total functions' in text
|
||||
print(" [PASS] text report format valid")
|
||||
|
||||
# JSON report
|
||||
json_out = generate_report(results, output_format='json')
|
||||
data = json.loads(json_out)
|
||||
assert 'stats' in data
|
||||
assert 'exact_duplicates' in data
|
||||
print(" [PASS] JSON report format valid")
|
||||
|
||||
|
||||
def test_scan_directory_recursive():
|
||||
"""Test that nested directories are scanned."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
subdir = Path(tmpdir) / 'sub'
|
||||
subdir.mkdir()
|
||||
(subdir / 'nested.py').write_text('def nested(): pass')
|
||||
(Path(tmpdir) / 'root.py').write_text('def root(): pass')
|
||||
functions = scan_directory(tmpdir)
|
||||
names = {f['name'] for f in functions}
|
||||
assert 'nested' in names and 'root' in names
|
||||
print(" [PASS] recursive directory scanning works")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
print("Running code duplication detector smoke tests...")
|
||||
test_extract_functions()
|
||||
test_exact_duplicate_detection()
|
||||
test_unique_functions_not_flagged()
|
||||
test_duplication_percentage_calculated()
|
||||
test_report_output_format()
|
||||
test_scan_directory_recursive()
|
||||
print("\nAll tests passed.")
|
||||
105
scripts/test_graph_visualizer.py
Executable file
105
scripts/test_graph_visualizer.py
Executable file
@@ -0,0 +1,105 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for graph_visualizer.py — smoke test + subgraph logic.
|
||||
Run: python3 scripts/test_graph_visualizer.py
|
||||
"""
|
||||
|
||||
import json, sys, tempfile
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent))
|
||||
import graph_visualizer as gv
|
||||
|
||||
|
||||
def make_index(facts, tmp_dir):
|
||||
p = tmp_dir / "index.json"
|
||||
p.write_text(json.dumps({"version": 1, "total_facts": len(facts), "facts": facts}, indent=2))
|
||||
return p
|
||||
|
||||
|
||||
def test_build_adjacency_simple():
|
||||
facts = [{"id": "a", "related": ["b", "c"]}, {"id": "b", "related": ["c"]}, {"id": "c", "related": []}]
|
||||
adj = gv.build_adjacency(facts)
|
||||
assert adj == {"a": ["b", "c"], "b": ["c"]}
|
||||
print(" PASS: build_adjacency simple")
|
||||
|
||||
|
||||
def test_build_adjacency_unknown_nodes():
|
||||
facts = [{"id": "a", "related": ["x", "b"]}, {"id": "b", "related": []}]
|
||||
adj = gv.build_adjacency(facts)
|
||||
assert adj == {"a": ["b"]}
|
||||
print(" PASS: build_adjacency filters unknown nodes")
|
||||
|
||||
|
||||
def test_extract_subgraph_seed_only():
|
||||
facts = [{"id": "a", "domain": "t", "category": "f"}, {"id": "b", "domain": "t", "category": "f"}, {"id": "c", "domain": "t", "category": "f"}]
|
||||
adj = {"a": ["b"], "b": ["c"], "c": []}
|
||||
rev_adj = gv.build_reverse_adjacency(adj)
|
||||
sub = gv.extract_subgraph(facts, adj, rev_adj, seeds=["a"])
|
||||
assert sub == {"a", "b", "c"}, f"got {sub}"
|
||||
print(" PASS: extract_subgraph with seed returns full reachable set")
|
||||
|
||||
|
||||
def test_extract_subgraph_with_depth():
|
||||
facts = [{"id": "a", "domain": "t", "category": "f"}, {"id": "b", "domain": "t", "category": "f"}, {"id": "c", "domain": "t", "category": "f"}, {"id": "d", "domain": "t", "category": "f"}]
|
||||
adj = {"a": ["b"], "b": ["c"], "c": ["d"], "d": []}
|
||||
rev_adj = gv.build_reverse_adjacency(adj)
|
||||
sub = gv.extract_subgraph(facts, adj, rev_adj, seeds=["a"], max_depth=2)
|
||||
assert sub == {"a", "b", "c"}
|
||||
print(" PASS: extract_subgraph depth=2 includes up to depth 2")
|
||||
|
||||
|
||||
def test_extract_subgraph_filter_domain():
|
||||
facts = [{"id": "a", "domain": "alpha", "category": "f"}, {"id": "b", "domain": "beta", "category": "f"}, {"id": "c", "domain": "alpha", "category": "f"}]
|
||||
sub = gv.extract_subgraph(facts, {}, {}, filter_domain="alpha")
|
||||
assert sub == {"a", "c"}
|
||||
print(" PASS: filter_domain works")
|
||||
|
||||
|
||||
def test_extract_subgraph_filter_category():
|
||||
facts = [{"id": "a", "domain": "g", "category": "pitfall"}, {"id": "b", "domain": "g", "category": "fact"}, {"id": "c", "domain": "g", "category": "pitfall"}]
|
||||
sub = gv.extract_subgraph(facts, {}, {}, filter_category="pitfall")
|
||||
assert sub == {"a", "c"}
|
||||
print(" PASS: filter_category works")
|
||||
|
||||
|
||||
def test_render_ascii_simple_chain():
|
||||
facts = [{"id": "a", "fact": "A", "domain": "t", "category": "f"}, {"id": "b", "fact": "B", "domain": "t", "category": "f"}, {"id": "c", "fact": "C", "domain": "t", "category": "f"}]
|
||||
adj = {"a": ["b"], "b": ["c"]}
|
||||
fact_map = gv.build_fact_map(facts)
|
||||
out = gv.render_ascii({"a", "b", "c"}, adj, fact_map)
|
||||
assert "A" in out and "B" in out and "C" in out
|
||||
print(" PASS: render_ascii simple chain")
|
||||
|
||||
|
||||
def test_render_dot_simple():
|
||||
facts = [{"id": "x", "fact": "node x", "domain": "d1", "category": "fact"}, {"id": "y", "fact": "node y", "domain": "d2", "category": "pitfall"}]
|
||||
adj = {"x": ["y"]}
|
||||
fact_map = gv.build_fact_map(facts)
|
||||
out = gv.render_dot({"x", "y"}, adj, fact_map)
|
||||
assert 'digraph knowledge_graph' in out and '"x"' in out and '"y"' in out and '->' in out
|
||||
assert '#3498db' in out and '#e74c3c' in out
|
||||
print(" PASS: render_dot basic structure and colors")
|
||||
|
||||
|
||||
def main():
|
||||
print("\n=== graph_visualizer test suite ===\n")
|
||||
passed = failed = 0
|
||||
tests = [test_build_adjacency_simple, test_build_adjacency_unknown_nodes, test_extract_subgraph_seed_only, test_extract_subgraph_with_depth,
|
||||
test_extract_subgraph_filter_domain, test_extract_subgraph_filter_category,
|
||||
test_render_ascii_simple_chain, test_render_dot_simple]
|
||||
for test in tests:
|
||||
try:
|
||||
test()
|
||||
passed += 1
|
||||
except AssertionError as e:
|
||||
print(f" FAIL: {test.__name__} — {e}")
|
||||
failed += 1
|
||||
except Exception as e:
|
||||
print(f" ERROR: {test.__name__} — {e}")
|
||||
failed += 1
|
||||
print(f"\n=== Results: {passed}/{passed+failed} passed, {failed} failed ===")
|
||||
return failed == 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(0 if main() else 1)
|
||||
@@ -1,170 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for PR Complexity Scorer — unit tests for the scoring logic.
|
||||
"""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
|
||||
from pr_complexity_scorer import (
|
||||
score_pr,
|
||||
is_dependency_file,
|
||||
is_test_file,
|
||||
TIME_PER_POINT,
|
||||
SMALL_FILES,
|
||||
MEDIUM_FILES,
|
||||
LARGE_FILES,
|
||||
SMALL_LINES,
|
||||
MEDIUM_LINES,
|
||||
LARGE_LINES,
|
||||
)
|
||||
|
||||
PASS = 0
|
||||
FAIL = 0
|
||||
|
||||
def test(name):
|
||||
def decorator(fn):
|
||||
global PASS, FAIL
|
||||
try:
|
||||
fn()
|
||||
PASS += 1
|
||||
print(f" [PASS] {name}")
|
||||
except AssertionError as e:
|
||||
FAIL += 1
|
||||
print(f" [FAIL] {name}: {e}")
|
||||
except Exception as e:
|
||||
FAIL += 1
|
||||
print(f" [FAIL] {name}: Unexpected error: {e}")
|
||||
return decorator
|
||||
|
||||
def assert_eq(a, b, msg=""):
|
||||
if a != b:
|
||||
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
|
||||
|
||||
def assert_true(v, msg=""):
|
||||
if not v:
|
||||
raise AssertionError(msg or "Expected True")
|
||||
|
||||
def assert_false(v, msg=""):
|
||||
if v:
|
||||
raise AssertionError(msg or "Expected False")
|
||||
|
||||
|
||||
print("=== PR Complexity Scorer Tests ===\n")
|
||||
|
||||
print("-- File Classification --")
|
||||
|
||||
@test("dependency file detection — requirements.txt")
|
||||
def _():
|
||||
assert_true(is_dependency_file("requirements.txt"))
|
||||
assert_true(is_dependency_file("src/requirements.txt"))
|
||||
assert_false(is_dependency_file("requirements_test.txt"))
|
||||
|
||||
@test("dependency file detection — pyproject.toml")
|
||||
def _():
|
||||
assert_true(is_dependency_file("pyproject.toml"))
|
||||
assert_false(is_dependency_file("myproject.py"))
|
||||
|
||||
@test("test file detection — pytest style")
|
||||
def _():
|
||||
assert_true(is_test_file("tests/test_api.py"))
|
||||
assert_true(is_test_file("test_module.py"))
|
||||
assert_true(is_test_file("src/module_test.py"))
|
||||
|
||||
@test("test file detection — other frameworks")
|
||||
def _():
|
||||
assert_true(is_test_file("spec/feature_spec.rb"))
|
||||
assert_true(is_test_file("__tests__/component.test.js"))
|
||||
assert_false(is_test_file("testfixtures/helper.py"))
|
||||
|
||||
|
||||
print("\n-- Scoring Logic --")
|
||||
|
||||
@test("small PR gets low score (1-3)")
|
||||
def _():
|
||||
score, minutes, _ = score_pr(
|
||||
files_changed=3,
|
||||
additions=50,
|
||||
deletions=10,
|
||||
has_dependency_changes=False,
|
||||
test_coverage_delta=None
|
||||
)
|
||||
assert_true(1 <= score <= 3, f"Score should be low, got {score}")
|
||||
assert_true(minutes < 20)
|
||||
|
||||
@test("medium PR gets medium score (4-6)")
|
||||
def _():
|
||||
score, minutes, _ = score_pr(
|
||||
files_changed=15,
|
||||
additions=400,
|
||||
deletions=100,
|
||||
has_dependency_changes=False,
|
||||
test_coverage_delta=None
|
||||
)
|
||||
assert_true(4 <= score <= 6, f"Score should be medium, got {score}")
|
||||
assert_true(20 <= minutes <= 45)
|
||||
|
||||
@test("large PR gets high score (7-9)")
|
||||
def _():
|
||||
score, minutes, _ = score_pr(
|
||||
files_changed=60,
|
||||
additions=3000,
|
||||
deletions=1500,
|
||||
has_dependency_changes=True,
|
||||
test_coverage_delta=None
|
||||
)
|
||||
assert_true(7 <= score <= 9, f"Score should be high, got {score}")
|
||||
assert_true(minutes >= 45)
|
||||
|
||||
@test("dependency changes boost score")
|
||||
def _():
|
||||
base_score, _, _ = score_pr(
|
||||
files_changed=10, additions=200, deletions=50,
|
||||
has_dependency_changes=False, test_coverage_delta=None
|
||||
)
|
||||
dep_score, _, _ = score_pr(
|
||||
files_changed=10, additions=200, deletions=50,
|
||||
has_dependency_changes=True, test_coverage_delta=None
|
||||
)
|
||||
assert_true(dep_score > base_score, f"Deps: {base_score} -> {dep_score}")
|
||||
|
||||
@test("adding tests lowers complexity")
|
||||
def _():
|
||||
base_score, _, _ = score_pr(
|
||||
files_changed=8, additions=150, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=None
|
||||
)
|
||||
better_score, _, _ = score_pr(
|
||||
files_changed=8, additions=180, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=3
|
||||
)
|
||||
assert_true(better_score < base_score, f"Tests: {base_score} -> {better_score}")
|
||||
|
||||
@test("removing tests increases complexity")
|
||||
def _():
|
||||
base_score, _, _ = score_pr(
|
||||
files_changed=8, additions=150, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=None
|
||||
)
|
||||
worse_score, _, _ = score_pr(
|
||||
files_changed=8, additions=150, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=-2
|
||||
)
|
||||
assert_true(worse_score > base_score, f"Remove tests: {base_score} -> {worse_score}")
|
||||
|
||||
@test("score bounded 1-10")
|
||||
def _():
|
||||
for files, adds, dels in [(1, 10, 5), (100, 10000, 5000)]:
|
||||
score, _, _ = score_pr(files, adds, dels, False, None)
|
||||
assert_true(1 <= score <= 10, f"Score {score} out of range")
|
||||
|
||||
@test("estimated minutes exist for all scores")
|
||||
def _():
|
||||
for s in range(1, 11):
|
||||
assert_true(s in TIME_PER_POINT, f"Missing time for score {s}")
|
||||
|
||||
|
||||
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
|
||||
sys.exit(0 if FAIL == 0 else 1)
|
||||
Reference in New Issue
Block a user