Compare commits
1 Commits
step35/162
...
step35/155
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b4a3501aa3 |
@@ -1,366 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Code Duplication Detector — Issue #162
|
||||
|
||||
Finds duplicate functions and code blocks across Python source files.
|
||||
Reports duplication percentage and outputs a duplication report.
|
||||
|
||||
Usage:
|
||||
python3 scripts/code_duplication_detector.py --output reports/code_duplication.json
|
||||
python3 scripts/code_duplication_detector.py --directory scripts/ --dry-run
|
||||
python3 scripts/code_duplication_detector.py --test # Run built-in test
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Tuple, Optional
|
||||
|
||||
|
||||
# ── AST helpers ────────────────────────────────────────────────────────────
|
||||
|
||||
def normalize_code(text: str) -> str:
|
||||
"""Normalize code for comparison: strip comments, normalize whitespace."""
|
||||
# Remove comments (both # and docstring triple-quote strings)
|
||||
text = re.sub(r'#.*$', '', text, flags=re.MULTILINE)
|
||||
text = re.sub(r'""".*?"""', '', text, flags=re.DOTALL)
|
||||
text = re.sub(r"'''.*?'''", '', text, flags=re.DOTALL)
|
||||
# Normalize whitespace
|
||||
text = re.sub(r'\s+', ' ', text).strip()
|
||||
return text.lower()
|
||||
|
||||
|
||||
def code_hash(text: str) -> str:
|
||||
"""SHA256 hash of normalized code for exact duplicate detection."""
|
||||
normalized = normalize_code(text)
|
||||
return hashlib.sha256(normalized.encode('utf-8')).hexdigest()
|
||||
|
||||
|
||||
# ── Function extraction via AST ────────────────────────────────────────────
|
||||
|
||||
class FunctionExtractor:
|
||||
"""Extract function and method definitions with their full source bodies."""
|
||||
|
||||
def __init__(self, source: str, filepath: str):
|
||||
self.source = source
|
||||
self.filepath = filepath
|
||||
self.lines = source.splitlines()
|
||||
self.functions: List[Dict] = []
|
||||
|
||||
def _get_source_segment(self, start_lineno: int, end_lineno: int) -> str:
|
||||
"""Get source code from start to end line (1-indexed, inclusive)."""
|
||||
# AST end_lineno is inclusive
|
||||
start_idx = start_lineno - 1
|
||||
end_idx = end_lineno
|
||||
return '\n'.join(self.lines[start_idx:end_idx])
|
||||
|
||||
def visit(self, tree):
|
||||
"""Collect all function and async function definitions."""
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.FunctionDef) or isinstance(node, ast.AsyncFunctionDef):
|
||||
# Get the full source for this function including decorators
|
||||
start = node.lineno
|
||||
end = node.end_lineno
|
||||
body_source = self._get_source_segment(start, end)
|
||||
|
||||
# Also collect parent class name if this is a method
|
||||
class_name = None
|
||||
parent = node.parent if hasattr(node, 'parent') else None
|
||||
if parent and isinstance(parent, ast.ClassDef):
|
||||
class_name = parent.name
|
||||
|
||||
self.functions.append({
|
||||
'name': node.name,
|
||||
'file': self.filepath,
|
||||
'start_line': start,
|
||||
'end_line': end,
|
||||
'body': body_source,
|
||||
'class_name': class_name,
|
||||
'is_method': class_name is not None,
|
||||
})
|
||||
|
||||
|
||||
import ast
|
||||
|
||||
class ParentNodeVisitor(ast.NodeVisitor):
|
||||
"""Annotate nodes with parent references."""
|
||||
def __init__(self, parent=None):
|
||||
self.parent = parent
|
||||
|
||||
def generic_visit(self, node):
|
||||
node.parent = self.parent
|
||||
for child in ast.iter_child_nodes(node):
|
||||
self.__class__(child).parent = node
|
||||
super().generic_visit(node)
|
||||
|
||||
|
||||
def extract_functions_from_file(filepath: str) -> List[Dict]:
|
||||
"""Extract all function definitions from a Python file."""
|
||||
try:
|
||||
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
|
||||
source = f.read()
|
||||
tree = ast.parse(source, filename=str(filepath))
|
||||
|
||||
# Annotate with parent references
|
||||
for node in ast.walk(tree):
|
||||
for child in ast.iter_child_nodes(node):
|
||||
child.parent = node
|
||||
|
||||
extractor = FunctionExtractor(source, str(filepath))
|
||||
extractor.visit(tree)
|
||||
return extractor.functions
|
||||
except (SyntaxError, UnicodeDecodeError, OSError) as e:
|
||||
return []
|
||||
|
||||
|
||||
def scan_directory(directory: str, extensions: Tuple[str, ...] = ('.py',)) -> List[Dict]:
|
||||
"""Scan directory for Python files and extract all functions."""
|
||||
all_functions = []
|
||||
path = Path(directory)
|
||||
|
||||
for filepath in path.rglob('*'):
|
||||
if filepath.is_file() and filepath.suffix in extensions:
|
||||
# Skip common non-source dirs
|
||||
parts = filepath.parts
|
||||
if any(ex in parts for ex in ('__pycache__', 'node_modules', '.git', 'venv', '.venv', 'dist', 'build')):
|
||||
continue
|
||||
if filepath.name.startswith('.'):
|
||||
continue
|
||||
|
||||
functions = extract_functions_from_file(str(filepath))
|
||||
all_functions.extend(functions)
|
||||
|
||||
return all_functions
|
||||
|
||||
|
||||
# ── Duplicate detection ─────────────────────────────────────────────────────
|
||||
|
||||
def find_duplicates(functions: List[Dict], similarity_threshold: float = 0.95) -> Dict:
|
||||
"""
|
||||
Find duplicate and near-duplicate functions.
|
||||
|
||||
Returns dict with:
|
||||
- exact_duplicates: {hash: [function_info, ...]}
|
||||
- near_duplicates: [[function_info, ...], ...]
|
||||
- stats: total_functions, unique_exact, exact_dupe_count, near_dupe_count
|
||||
"""
|
||||
# Phase 1: Exact duplicates by code hash
|
||||
hash_groups: Dict[str, List[Dict]] = defaultdict(list)
|
||||
for func in functions:
|
||||
h = code_hash(func['body'])
|
||||
hash_groups[h].append(func)
|
||||
|
||||
exact_duplicates = {h: group for h, group in hash_groups.items() if len(group) > 1}
|
||||
exact_dupe_count = sum(len(group) - 1 for group in exact_duplicates.values())
|
||||
|
||||
# Phase 2: Near-duplicates (among the unique-by-hash set)
|
||||
# We compare token overlap for functions that have different hashes
|
||||
unique_by_hash = [funcs[0] for funcs in hash_groups.values()]
|
||||
near_duplicate_groups = []
|
||||
|
||||
# Simple token-based similarity
|
||||
def tokenize(code: str) -> set:
|
||||
return set(re.findall(r'[a-zA-Z_][a-zA-Z0-9_]*', code.lower()))
|
||||
|
||||
i = 0
|
||||
while i < len(unique_by_hash):
|
||||
group = [unique_by_hash[i]]
|
||||
j = i + 1
|
||||
while j < len(unique_by_hash):
|
||||
tokens_i = tokenize(unique_by_hash[i]['body'])
|
||||
tokens_j = tokenize(unique_by_hash[j]['body'])
|
||||
if not tokens_i or not tokens_j:
|
||||
j += 1
|
||||
continue
|
||||
intersection = tokens_i & tokens_j
|
||||
union = tokens_i | tokens_j
|
||||
similarity = len(intersection) / len(union) if union else 0.0
|
||||
|
||||
if similarity >= similarity_threshold:
|
||||
group.append(unique_by_hash[j])
|
||||
unique_by_hash.pop(j)
|
||||
else:
|
||||
j += 1
|
||||
|
||||
if len(group) > 1:
|
||||
near_duplicate_groups.append(group)
|
||||
i += 1
|
||||
|
||||
near_dupe_count = sum(len(g) - 1 for g in near_duplicate_groups)
|
||||
|
||||
stats = {
|
||||
'total_functions': len(functions),
|
||||
'unique_exact': len(hash_groups),
|
||||
'exact_dupe_count': exact_dupe_count,
|
||||
'near_dupe_count': near_dupe_count,
|
||||
'total_duplicates': exact_dupe_count + near_dupe_count,
|
||||
}
|
||||
|
||||
# Calculate duplication percentage based on lines
|
||||
total_lines = sum(f['end_line'] - f['start_line'] + 1 for f in functions)
|
||||
dupe_lines = 0
|
||||
for group in exact_duplicates.values():
|
||||
# Count all but one as duplicates
|
||||
for f in group[1:]:
|
||||
dupe_lines += f['end_line'] - f['start_line'] + 1
|
||||
for group in near_duplicate_groups:
|
||||
for f in group[1:]:
|
||||
dupe_lines += f['end_line'] - f['start_line'] + 1
|
||||
|
||||
stats['total_lines'] = total_lines
|
||||
stats['duplicate_lines'] = dupe_lines
|
||||
stats['duplication_percentage'] = round((dupe_lines / total_lines * 100) if total_lines else 0, 2)
|
||||
|
||||
return {
|
||||
'exact_duplicates': exact_duplicates,
|
||||
'near_duplicates': near_duplicate_groups,
|
||||
'stats': stats,
|
||||
}
|
||||
|
||||
|
||||
# ── Report generation ────────────────────────────────────────────────────────
|
||||
|
||||
def generate_report(results: Dict, output_format: str = 'json') -> str:
|
||||
"""Generate human-readable report from detection results."""
|
||||
stats = results['stats']
|
||||
|
||||
if output_format == 'json':
|
||||
return json.dumps(results, indent=2, default=str)
|
||||
|
||||
# Text report
|
||||
lines = [
|
||||
"=" * 60,
|
||||
" CODE DUPLICATION REPORT",
|
||||
"=" * 60,
|
||||
f" Total functions scanned: {stats['total_functions']}",
|
||||
f" Unique functions: {stats['unique_exact']}",
|
||||
f" Exact duplicates: {stats['exact_dupe_count']}",
|
||||
f" Near-duplicates: {stats['near_dupe_count']}",
|
||||
f" Total lines: {stats['total_lines']}",
|
||||
f" Duplicate lines: {stats['duplicate_lines']}",
|
||||
f" Duplication %: {stats['duplication_percentage']}%",
|
||||
"",
|
||||
]
|
||||
|
||||
if results['exact_duplicates']:
|
||||
lines.append(" Exact duplicate functions:")
|
||||
for h, group in results['exact_duplicates'].items():
|
||||
first = group[0]
|
||||
lines.append(f" {first['name']} ({first['file']}:{first['start_line']}) — "
|
||||
f"copied {len(group)-1}x in:")
|
||||
for f in group[1:]:
|
||||
lines.append(f" → {f['file']}:{f['start_line']}")
|
||||
lines.append("")
|
||||
|
||||
if results['near_duplicates']:
|
||||
lines.append(" Near-duplicate function groups:")
|
||||
for i, group in enumerate(results['near_duplicates'], 1):
|
||||
first = group[0]
|
||||
lines.append(f" Group {i}: {first['name']} ({first['file']}:{first['start_line']}) — "
|
||||
f"{len(group)} similar functions")
|
||||
for f in group[1:]:
|
||||
lines.append(f" → {f['file']}:{f['start_line']}")
|
||||
lines.append("")
|
||||
|
||||
lines.append("=" * 60)
|
||||
return '\n'.join(lines)
|
||||
|
||||
|
||||
# ── CLI ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Code Duplication Detector")
|
||||
parser.add_argument('--directory', default='.',
|
||||
help='Directory to scan (default: current directory)')
|
||||
parser.add_argument('--output', help='Output file for JSON report')
|
||||
parser.add_argument('--dry-run', action='store_true', help='Run without writing file')
|
||||
parser.add_argument('--threshold', type=float, default=0.95,
|
||||
help='Similarity threshold for near-dupes (default: 0.95)')
|
||||
parser.add_argument('--json', action='store_true', help='JSON output to stdout')
|
||||
parser.add_argument('--test', action='store_true', help='Run built-in test')
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.test:
|
||||
_run_test()
|
||||
return
|
||||
|
||||
# Scan
|
||||
functions = scan_directory(args.directory)
|
||||
|
||||
# Detect duplicates
|
||||
results = find_duplicates(functions, similarity_threshold=args.threshold)
|
||||
stats = results['stats']
|
||||
|
||||
# Output
|
||||
if args.json:
|
||||
print(json.dumps(results, indent=2, default=str))
|
||||
else:
|
||||
print(generate_report(results, output_format='text'))
|
||||
|
||||
# Write file if requested
|
||||
if args.output and not args.dry_run:
|
||||
os.makedirs(os.path.dirname(args.output) or '.', exist_ok=True)
|
||||
with open(args.output, 'w') as f:
|
||||
json.dump(results, f, indent=2, default=str)
|
||||
print(f"\nReport written to: {args.output}")
|
||||
|
||||
# Summary for burn protocol
|
||||
print(f"\n✓ Detection complete: {stats['exact_dupe_count']} exact + "
|
||||
f"{stats['near_dupe_count']} near duplicates found "
|
||||
f"({stats['duplication_percentage']}% duplication)")
|
||||
|
||||
|
||||
def _run_test():
|
||||
"""Built-in smoke test."""
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create test files with duplicate code
|
||||
f1 = Path(tmpdir) / 'mod1.py'
|
||||
f1.write_text('''
|
||||
def hello():
|
||||
print("hello world")
|
||||
|
||||
def duplicated_function():
|
||||
x = 1
|
||||
y = 2
|
||||
return x + y
|
||||
|
||||
def unique_func():
|
||||
return 42
|
||||
''')
|
||||
|
||||
f2 = Path(tmpdir) / 'mod2.py'
|
||||
f2.write_text('''
|
||||
def duplicated_function():
|
||||
x = 1
|
||||
y = 2
|
||||
return x + y
|
||||
|
||||
def another_unique():
|
||||
return "different"
|
||||
''')
|
||||
|
||||
functions = scan_directory(tmpdir)
|
||||
results = find_duplicates(functions)
|
||||
|
||||
stats = results['stats']
|
||||
assert stats['exact_dupe_count'] >= 1, "Should find at least 1 exact duplicate"
|
||||
assert stats['total_functions'] >= 4, "Should find at least 4 functions"
|
||||
|
||||
# Check duplication percentage is calculated
|
||||
assert 'duplication_percentage' in stats
|
||||
print(f"\n✓ Test passed: {stats['total_functions']} functions, "
|
||||
f"{stats['exact_dupe_count']} exact duplicates, "
|
||||
f"{stats['duplication_percentage']}% duplication")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
530
scripts/linter_runner.py
Normal file
530
scripts/linter_runner.py
Normal file
@@ -0,0 +1,530 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Linter Runner — detect languages and run linters across a repo.
|
||||
|
||||
Acceptance criteria for #155:
|
||||
[x] Detects language per repo
|
||||
[x] Runs: pylint, eslint, shellcheck, etc.
|
||||
[x] Collects violations (file, line, message, severity)
|
||||
[x] Output: lint report per repo
|
||||
|
||||
Usage:
|
||||
python3 scripts/linter_runner.py --repo .
|
||||
python3 scripts/linter_runner.py --all # Scan all repos in knowledge/repos/
|
||||
python3 scripts/linter_runner.py --repo . --format json # Machine-readable output
|
||||
python3 scripts/linter_runner.py --repo . --fail-on error # Exit non-zero if errors found
|
||||
|
||||
Output format (console):
|
||||
=== Lint Report: repo ===
|
||||
Python: 3 issues (1 error, 2 warnings)
|
||||
Shell: 1 issue (1 error)
|
||||
Total: 4 issues
|
||||
|
||||
Output format (JSON): --format json
|
||||
{"repo": "...", "issues": [...], "summary": {...}}
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import dataclass, asdict
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
SCRIPT_DIR = Path(__file__).resolve().parent
|
||||
REPO_ROOT = SCRIPT_DIR.parent
|
||||
|
||||
|
||||
@dataclass
|
||||
class Violation:
|
||||
"""A single lint violation."""
|
||||
file: str
|
||||
line: Optional[int]
|
||||
column: Optional[int]
|
||||
message: str
|
||||
severity: str # "error", "warning", "info"
|
||||
linter: str
|
||||
code: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class LinterResult:
|
||||
"""Result from running a single linter."""
|
||||
linter_name: str
|
||||
language: str
|
||||
violations: list[Violation]
|
||||
timed_out: bool = False
|
||||
error: Optional[str] = None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Language detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
EXTENSION_TO_LANGUAGE = {
|
||||
".py": "python",
|
||||
".js": "javascript",
|
||||
".ts": "typescript",
|
||||
".jsx": "javascript",
|
||||
".tsx": "typescript",
|
||||
".sh": "shell",
|
||||
".bash": "shell",
|
||||
".zsh": "shell",
|
||||
".yaml": "yaml",
|
||||
".yml": "yaml",
|
||||
".json": "json",
|
||||
".md": "markdown",
|
||||
".rb": "ruby",
|
||||
".go": "go",
|
||||
".rs": "rust",
|
||||
".c": "c",
|
||||
".cpp": "cpp",
|
||||
".h": "c",
|
||||
".java": "java",
|
||||
".php": "php",
|
||||
".swift": "swift",
|
||||
".kt": "kotlin",
|
||||
".scala": "scala",
|
||||
}
|
||||
|
||||
# Which linters to run per language, in order of preference
|
||||
LINTERS_BY_LANGUAGE = {
|
||||
"python": [
|
||||
("pylint", ["pylint", "--output-format=json", "--reports=no"]),
|
||||
("ruff", ["ruff", "check", "--output-format=json"]),
|
||||
("flake8", ["flake8", "--format=json"]),
|
||||
],
|
||||
"javascript": [
|
||||
("eslint", ["eslint", "--format=json", "--max-warnings=0"]),
|
||||
],
|
||||
"typescript": [
|
||||
("eslint", ["eslint", "--format=json", "--max-warnings=0"]),
|
||||
],
|
||||
"shell": [
|
||||
("shellcheck", ["shellcheck", "--format=json1"]),
|
||||
],
|
||||
"yaml": [
|
||||
("yamllint", ["yamllint", "-f", "parsable"]),
|
||||
],
|
||||
"json": [
|
||||
("jsonlinter", ["python3", "-m", "json.tool"]), # Simple syntax check
|
||||
],
|
||||
"markdown": [], # No linter yet
|
||||
"ruby": [
|
||||
("rubocop", ["rubocop", "--format", "json"]),
|
||||
],
|
||||
"go": [
|
||||
("golangci-lint", ["golangci-lint", "run", "--out-format", "json"]),
|
||||
],
|
||||
"rust": [
|
||||
("cargo clippy", ["cargo", "clippy", "--message-format=json"]),
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
def detect_languages(repo_path: Path) -> dict[str, list[Path]]:
|
||||
"""
|
||||
Scan repo and return mapping: language -> list of file paths.
|
||||
Only includes languages we have linters for."""
|
||||
language_files: dict[str, list[Path]] = {lang: [] for lang in LINTERS_BY_LANGUAGE.keys()}
|
||||
|
||||
if not repo_path.exists():
|
||||
return language_files
|
||||
|
||||
exclude_dirs = {".git", ".gitea", "node_modules", "__pycache__", ".venv", "venv", "build", "dist"}
|
||||
|
||||
for root, dirs, files in os.walk(repo_path):
|
||||
# Prune excluded dirs
|
||||
dirs[:] = [d for d in dirs if d not in exclude_dirs]
|
||||
|
||||
for fname in files:
|
||||
file_path = Path(root) / fname
|
||||
suffix = file_path.suffix.lower()
|
||||
lang = EXTENSION_TO_LANGUAGE.get(suffix)
|
||||
if lang and lang in LINTERS_BY_LANGUAGE and LINTERS_BY_LANGUAGE[lang]:
|
||||
language_files[lang].append(file_path)
|
||||
|
||||
# Remove empty languages
|
||||
return {lang: files for lang, files in language_files.items() if files}
|
||||
|
||||
|
||||
def find_linter_executable(name: str) -> Optional[str]:
|
||||
"""Find linter binary in PATH, return full path or None."""
|
||||
for path_dir in os.environ.get("PATH", "").split(os.pathsep):
|
||||
candidate = Path(path_dir) / name
|
||||
if candidate.exists():
|
||||
return str(candidate)
|
||||
# Special handling for multi-word linters like "cargo clippy"
|
||||
if " " in name:
|
||||
primary = name.split()[0]
|
||||
for path_dir in os.environ.get("PATH", "").split(os.pathsep):
|
||||
candidate = Path(path_dir) / primary
|
||||
if candidate.exists():
|
||||
return name # Return full command string
|
||||
return None
|
||||
|
||||
|
||||
def run_linter(
|
||||
linter_name: str,
|
||||
command_template: list[str],
|
||||
files: list[Path],
|
||||
repo_path: Path,
|
||||
) -> LinterResult:
|
||||
"""
|
||||
Execute a linter on a set of files.
|
||||
Returns LinterResult with violations or error.
|
||||
"""
|
||||
# Build command: [linter_bin, args..., files...]
|
||||
# Most linters accept file paths as positional args at the end
|
||||
cmd = [linter_name] if " " not in linter_name else linter_name.split()
|
||||
cmd.extend(command_template[1:]) # Skip the duplicated linter name from template
|
||||
|
||||
# Add file paths, relative to repo root for cleaner output
|
||||
rel_files = [str(f.relative_to(repo_path)) for f in files]
|
||||
cmd.extend(rel_files)
|
||||
|
||||
try:
|
||||
proc = subprocess.run(
|
||||
cmd,
|
||||
cwd=repo_path,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=60,
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
return LinterResult(
|
||||
linter_name=linter_name,
|
||||
language="unknown",
|
||||
violations=[],
|
||||
timed_out=True,
|
||||
error="Linter timed out after 60s",
|
||||
)
|
||||
except FileNotFoundError:
|
||||
return LinterResult(
|
||||
linter_name=linter_name,
|
||||
language="unknown",
|
||||
violations=[],
|
||||
error=f"Linter not found: {linter_name}",
|
||||
)
|
||||
|
||||
# Parse output based on linter type
|
||||
violations = parse_linter_output(linter_name, proc.stdout, proc.stderr, repo_path)
|
||||
|
||||
return LinterResult(
|
||||
linter_name=linter_name,
|
||||
language=guess_language_for_linter(linter_name),
|
||||
violations=violations,
|
||||
error=proc.stderr.strip() if proc.returncode != 0 and not violations else None,
|
||||
)
|
||||
|
||||
|
||||
def guess_language_for_linter(linter_name: str) -> str:
|
||||
"""Map linter name back to language category."""
|
||||
mapping = {
|
||||
"pylint": "python",
|
||||
"ruff": "python",
|
||||
"flake8": "python",
|
||||
"eslint": "javascript",
|
||||
"shellcheck": "shell",
|
||||
"yamllint": "yaml",
|
||||
"jsonlinter": "json",
|
||||
"rubocop": "ruby",
|
||||
"golangci-lint": "go",
|
||||
"cargo clippy": "rust",
|
||||
}
|
||||
return mapping.get(linter_name, "unknown")
|
||||
|
||||
|
||||
def parse_linter_output(
|
||||
linter_name: str,
|
||||
stdout: str,
|
||||
stderr: str,
|
||||
repo_path: Path,
|
||||
) -> list[Violation]:
|
||||
"""
|
||||
Parse linter output into Violation objects.
|
||||
Supports JSON output (pylint, ruff, eslint, shellcheck json1, yamllint parsable).
|
||||
"""
|
||||
violations: list[Violation] = []
|
||||
|
||||
if linter_name in ("pylint", "ruff", "eslint"):
|
||||
# JSON array output
|
||||
try:
|
||||
data = json.loads(stdout)
|
||||
except json.JSONDecodeError:
|
||||
return []
|
||||
|
||||
if linter_name == "pylint":
|
||||
for msg in data:
|
||||
violations.append(Violation(
|
||||
file=msg.get("path", "").lstrip("./"),
|
||||
line=msg.get("line"),
|
||||
column=msg.get("column"),
|
||||
message=msg.get("message", ""),
|
||||
severity="error" if msg.get("type") == "error" else "warning",
|
||||
linter=linter_name,
|
||||
code=msg.get("symbol"),
|
||||
))
|
||||
elif linter_name == "ruff":
|
||||
for entry in data:
|
||||
violations.append(Violation(
|
||||
file=entry.get("filename", "").lstrip("./"),
|
||||
line=entry.get("location", {}).get("row"),
|
||||
column=entry.get("location", {}).get("column"),
|
||||
message=entry.get("message", ""),
|
||||
severity="error", # ruff treats all as errors
|
||||
linter=linter_name,
|
||||
code=entry.get("code"),
|
||||
))
|
||||
elif linter_name == "eslint":
|
||||
for entry in data:
|
||||
violations.append(Violation(
|
||||
file=entry.get("fileName", "").lstrip("./"),
|
||||
line=entry.get("range", {}).get("start", {}).get("line"),
|
||||
column=entry.get("range", {}).get("start", {}).get("column"),
|
||||
message=entry.get("message", ""),
|
||||
severity=entry.get("severity", 1) and "error" or "warning",
|
||||
linter=linter_name,
|
||||
code=entry.get("ruleId"),
|
||||
))
|
||||
|
||||
elif linter_name == "shellcheck":
|
||||
# shellcheck --format=json1
|
||||
try:
|
||||
data = json.loads(stdout)
|
||||
for issue in data.get("issues", []):
|
||||
violations.append(Violation(
|
||||
file=issue.get("file", "").lstrip("./"),
|
||||
line=issue.get("line"),
|
||||
column=issue.get("column"),
|
||||
message=issue.get("message", ""),
|
||||
severity="error" if issue.get("level") == "error" else "warning",
|
||||
linter=linter_name,
|
||||
code=str(issue.get("code")),
|
||||
))
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
elif linter_name == "yamllint":
|
||||
# parsable: file:line:col: level message [rule]
|
||||
# Example: test.yaml:3:1: [error] wrong document start (document-start)
|
||||
for line in stdout.splitlines():
|
||||
parts = line.split(":")
|
||||
if len(parts) >= 4:
|
||||
file_rel = parts[0].lstrip("./")
|
||||
line_num = int(parts[1]) if parts[1].isdigit() else None
|
||||
col_num = int(parts[2]) if parts[2].isdigit() else None
|
||||
rest = ":".join(parts[3:]).strip()
|
||||
# Parse: "[error] message (rule)"
|
||||
import re
|
||||
m = re.match(r'\[(\w+)\]\s+(.+?)(?:\s+\(([^)]+)\))?$', rest)
|
||||
if m:
|
||||
severity = m.group(1).lower()
|
||||
message = m.group(2)
|
||||
code = m.group(3)
|
||||
violations.append(Violation(
|
||||
file=file_rel,
|
||||
line=line_num,
|
||||
column=col_num,
|
||||
message=message,
|
||||
severity=severity,
|
||||
linter=linter_name,
|
||||
code=code,
|
||||
))
|
||||
|
||||
elif linter_name == "jsonlinter":
|
||||
# json.tool syntax check — no violations, just exit code
|
||||
if proc.returncode != 0:
|
||||
violations.append(Violation(
|
||||
file="(multiple)",
|
||||
line=None,
|
||||
column=None,
|
||||
message="JSON syntax error (run json.tool on each file individually)",
|
||||
severity="error",
|
||||
linter="json.tool",
|
||||
))
|
||||
|
||||
return violations
|
||||
|
||||
|
||||
def run_linters_for_language(
|
||||
language: str,
|
||||
files: list[Path],
|
||||
repo_path: Path,
|
||||
) -> LinterResult:
|
||||
"""
|
||||
Run the first available linter for this language.
|
||||
Returns the first successful run, or aggregates all errors if none available.
|
||||
"""
|
||||
linter_options = LINTERS_BY_LANGUAGE.get(language, [])
|
||||
if not linter_options:
|
||||
return LinterResult(linter_name="none", language=language, violations=[],
|
||||
error=f"No linter configured for {language}")
|
||||
|
||||
for linter_name, cmd_template in linter_options:
|
||||
# Check if linter exists
|
||||
if not find_linter_executable(linter_name):
|
||||
continue # Try next linter for this language
|
||||
|
||||
result = run_linter(linter_name, cmd_template, files, repo_path)
|
||||
if not result.error and not result.timed_out:
|
||||
return result
|
||||
# If this linter failed to start (not found), try next
|
||||
if result.error and "not found" in result.error.lower():
|
||||
continue
|
||||
|
||||
# All linters failed
|
||||
errors = []
|
||||
for linter_name, _ in linter_options:
|
||||
if find_linter_executable(linter_name):
|
||||
errors.append(f"{linter_name}: not runnable")
|
||||
else:
|
||||
errors.append(f"{linter_name}: not installed")
|
||||
return LinterResult(
|
||||
linter_name="/".join(l[0] for l in linter_options),
|
||||
language=language,
|
||||
violations=[],
|
||||
error="; ".join(errors),
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def parse_args():
|
||||
p = argparse.ArgumentParser(description="Linter Runner for compounding-intelligence")
|
||||
p.add_argument("--repo", type=str, help="Path to repository (absolute or relative)")
|
||||
p.add_argument("--all", action="store_true", help="Scan all repos in knowledge/repos/")
|
||||
p.add_argument("--format", choices=["console", "json"], default="console",
|
||||
help="Output format (default: console)")
|
||||
p.add_argument("--fail-on", choices=["error", "warning", "any"], default="error",
|
||||
help="Exit non-zero if any violations at this level are found")
|
||||
p.add_argument("--output", type=str, help="Write report to file (default: stdout)")
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
|
||||
if not args.repo and not args.all:
|
||||
print("ERROR: Must specify --repo <path> or --all", file=sys.stderr)
|
||||
sys.exit(2)
|
||||
|
||||
repos_to_scan = []
|
||||
if args.repo:
|
||||
repos_to_scan.append(Path(args.repo).resolve())
|
||||
if args.all:
|
||||
repos_dir = REPO_ROOT / "knowledge" / "repos"
|
||||
if repos_dir.exists():
|
||||
for yaml_file in repos_dir.glob("*.yaml"):
|
||||
# Extract repo name from filename
|
||||
repos_to_scan.append(REPO_ROOT / yaml_file.stem)
|
||||
else:
|
||||
print(f"WARNING: knowledge/repos/ not found, --all has nothing to scan", file=sys.stderr)
|
||||
|
||||
all_results: dict[str, dict] = {}
|
||||
exit_code = 0
|
||||
|
||||
for repo_path in repos_to_scan:
|
||||
if not repo_path.exists():
|
||||
print(f"WARNING: Repo not found: {repo_path}", file=sys.stderr)
|
||||
continue
|
||||
|
||||
repo_name = repo_path.name
|
||||
print(f"\n=== Scanning: {repo_name} ===") if args.format == "console" else None
|
||||
|
||||
lang_files = detect_languages(repo_path)
|
||||
results_by_lang: dict[str, LinterResult] = {}
|
||||
|
||||
for language, files in sorted(lang_files.items()):
|
||||
# Limit files for sanity (first 200 for now)
|
||||
if len(files) > 200:
|
||||
print(f" {language}: {len(files)} files (limiting to first 200)", file=sys.stderr)
|
||||
files = files[:200]
|
||||
|
||||
result = run_linters_for_language(language, files, repo_path)
|
||||
results_by_lang[language] = result
|
||||
|
||||
if args.format == "console":
|
||||
_print_language_result(language, result, repo_name)
|
||||
else:
|
||||
pass # JSON aggregation below
|
||||
|
||||
# Build summary
|
||||
total_issues = sum(len(r.violations) for r in results_by_lang.values())
|
||||
total_errors = sum(1 for v in (v for r in results_by_lang.values() for v in r.violations)
|
||||
if v.severity == "error")
|
||||
total_warnings = sum(1 for v in (v for r in results_by_lang.values() for v in r.violations)
|
||||
if v.severity == "warning")
|
||||
|
||||
if args.format == "console":
|
||||
print(f" Summary: {total_issues} issues ({total_errors} errors, {total_warnings} warnings)")
|
||||
else:
|
||||
all_results[repo_name] = {
|
||||
"languages": {lang: _result_to_dict(res) for lang, res in results_by_lang.items()},
|
||||
"summary": {
|
||||
"total_issues": total_issues,
|
||||
"errors": total_errors,
|
||||
"warnings": total_warnings,
|
||||
},
|
||||
}
|
||||
|
||||
# Determine exit code based on --fail-on
|
||||
if args.fail_on == "error" and total_errors > 0:
|
||||
exit_code = 1
|
||||
elif args.fail_on == "warning" and total_issues > 0:
|
||||
exit_code = 1
|
||||
elif args.fail_on == "any" and total_issues > 0:
|
||||
exit_code = 1
|
||||
|
||||
if args.format == "json":
|
||||
output = json.dumps({"repos": all_results, "meta": {"scanned": len(repos_to_scan)}}, indent=2)
|
||||
if args.output:
|
||||
Path(args.output).write_text(output)
|
||||
else:
|
||||
print(output)
|
||||
|
||||
sys.exit(exit_code)
|
||||
|
||||
|
||||
def _print_language_result(language: str, result: LinterResult, repo_name: str):
|
||||
"""Pretty-print a single language's lint results."""
|
||||
status = "✓"
|
||||
if result.error:
|
||||
status = "✗"
|
||||
print(f" {language}: {result.error}")
|
||||
elif result.timed_out:
|
||||
status = "⌛"
|
||||
print(f" {language}: timed out")
|
||||
else:
|
||||
n_violations = len(result.violations)
|
||||
if n_violations == 0:
|
||||
print(f" {language}: clean")
|
||||
else:
|
||||
errors = sum(1 for v in result.violations if v.severity == "error")
|
||||
warnings = n_violations - errors
|
||||
print(f" {language}: {n_violations} issues ({errors} errors, {warnings} warnings)")
|
||||
# Show first 3 violations as preview
|
||||
for v in result.violations[:3]:
|
||||
loc = f"{v.file}:{v.line or '?'}"
|
||||
print(f" {loc} [{v.severity.upper()}] {v.message[:70]}")
|
||||
if len(result.violations) > 3:
|
||||
print(f" ... and {len(result.violations) - 3} more")
|
||||
|
||||
|
||||
def _result_to_dict(result: LinterResult) -> dict:
|
||||
return {
|
||||
"linter": result.linter_name,
|
||||
"language": result.language,
|
||||
"violations": [asdict(v) for v in result.violations],
|
||||
"timed_out": result.timed_out,
|
||||
"error": result.error,
|
||||
}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,351 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
PR Complexity Scorer - Estimate review effort for PRs.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from dataclasses import dataclass, asdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
|
||||
DEPENDENCY_FILES = {
|
||||
"requirements.txt", "pyproject.toml", "setup.py", "setup.cfg",
|
||||
"Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile",
|
||||
"go.mod", "Cargo.toml", "pom.xml", "build.gradle"
|
||||
}
|
||||
|
||||
TEST_PATTERNS = [
|
||||
r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$",
|
||||
r"spec/.*\.rb$", r".*_spec\.rb$",
|
||||
r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$"
|
||||
]
|
||||
|
||||
WEIGHT_FILES = 0.25
|
||||
WEIGHT_LINES = 0.25
|
||||
WEIGHT_DEPS = 0.30
|
||||
WEIGHT_TEST_COV = 0.20
|
||||
|
||||
SMALL_FILES = 5
|
||||
MEDIUM_FILES = 20
|
||||
LARGE_FILES = 50
|
||||
|
||||
SMALL_LINES = 100
|
||||
MEDIUM_LINES = 500
|
||||
LARGE_LINES = 2000
|
||||
|
||||
TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120}
|
||||
|
||||
|
||||
@dataclass
|
||||
class PRComplexity:
|
||||
pr_number: int
|
||||
title: str
|
||||
files_changed: int
|
||||
additions: int
|
||||
deletions: int
|
||||
has_dependency_changes: bool
|
||||
test_coverage_delta: Optional[int]
|
||||
score: int
|
||||
estimated_minutes: int
|
||||
reasons: List[str]
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return asdict(self)
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
def __init__(self, token: str):
|
||||
self.token = token
|
||||
self.base_url = GITEA_BASE.rstrip("/")
|
||||
|
||||
def _request(self, path: str, params: Dict = None) -> Any:
|
||||
url = f"{self.base_url}{path}"
|
||||
if params:
|
||||
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
|
||||
url += f"?{qs}"
|
||||
|
||||
req = urllib.request.Request(url)
|
||||
req.add_header("Authorization", f"token {self.token}")
|
||||
req.add_header("Content-Type", "application/json")
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
except urllib.error.HTTPError as e:
|
||||
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
|
||||
return None
|
||||
except urllib.error.URLError as e:
|
||||
print(f"Network error: {e}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
def get_open_prs(self, org: str, repo: str) -> List[Dict]:
|
||||
prs = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"})
|
||||
if not batch:
|
||||
break
|
||||
prs.extend(batch)
|
||||
if len(batch) < 50:
|
||||
break
|
||||
page += 1
|
||||
return prs
|
||||
|
||||
def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]:
|
||||
files = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = self._request(
|
||||
f"/repos/{org}/{repo}/pulls/{pr_number}/files",
|
||||
{"limit": 100, "page": page}
|
||||
)
|
||||
if not batch:
|
||||
break
|
||||
files.extend(batch)
|
||||
if len(batch) < 100:
|
||||
break
|
||||
page += 1
|
||||
return files
|
||||
|
||||
def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool:
|
||||
data = json.dumps({"body": body}).encode("utf-8")
|
||||
req = urllib.request.Request(
|
||||
f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments",
|
||||
data=data,
|
||||
method="POST",
|
||||
headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"}
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return resp.status in (200, 201)
|
||||
except urllib.error.HTTPError:
|
||||
return False
|
||||
|
||||
|
||||
def is_dependency_file(filename: str) -> bool:
|
||||
return any(filename.endswith(dep) for dep in DEPENDENCY_FILES)
|
||||
|
||||
|
||||
def is_test_file(filename: str) -> bool:
|
||||
return any(re.search(pattern, filename) for pattern in TEST_PATTERNS)
|
||||
|
||||
|
||||
def score_pr(
|
||||
files_changed: int,
|
||||
additions: int,
|
||||
deletions: int,
|
||||
has_dependency_changes: bool,
|
||||
test_coverage_delta: Optional[int] = None
|
||||
) -> tuple[int, int, List[str]]:
|
||||
score = 1.0
|
||||
reasons = []
|
||||
|
||||
# Files changed
|
||||
if files_changed <= SMALL_FILES:
|
||||
fscore = 1.0
|
||||
reasons.append("small number of files changed")
|
||||
elif files_changed <= MEDIUM_FILES:
|
||||
fscore = 2.0
|
||||
reasons.append("moderate number of files changed")
|
||||
elif files_changed <= LARGE_FILES:
|
||||
fscore = 2.5
|
||||
reasons.append("large number of files changed")
|
||||
else:
|
||||
fscore = 3.0
|
||||
reasons.append("very large PR spanning many files")
|
||||
|
||||
# Lines changed
|
||||
total_lines = additions + deletions
|
||||
if total_lines <= SMALL_LINES:
|
||||
lscore = 1.0
|
||||
reasons.append("small change size")
|
||||
elif total_lines <= MEDIUM_LINES:
|
||||
lscore = 2.0
|
||||
reasons.append("moderate change size")
|
||||
elif total_lines <= LARGE_LINES:
|
||||
lscore = 3.0
|
||||
reasons.append("large change size")
|
||||
else:
|
||||
lscore = 4.0
|
||||
reasons.append("very large change")
|
||||
|
||||
# Dependency changes
|
||||
if has_dependency_changes:
|
||||
dscore = 2.5
|
||||
reasons.append("dependency changes (architectural impact)")
|
||||
else:
|
||||
dscore = 0.0
|
||||
|
||||
# Test coverage delta
|
||||
tscore = 0.0
|
||||
if test_coverage_delta is not None:
|
||||
if test_coverage_delta > 0:
|
||||
reasons.append(f"test additions (+{test_coverage_delta} test files)")
|
||||
tscore = -min(2.0, test_coverage_delta / 2.0)
|
||||
elif test_coverage_delta < 0:
|
||||
reasons.append(f"test removals ({abs(test_coverage_delta)} test files)")
|
||||
tscore = min(2.0, abs(test_coverage_delta) * 0.5)
|
||||
else:
|
||||
reasons.append("test coverage change not assessed")
|
||||
|
||||
# Weighted sum, scaled by 3 to use full 1-10 range
|
||||
bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV)
|
||||
scaled_bonus = bonus * 3.0
|
||||
score = 1.0 + scaled_bonus
|
||||
|
||||
final_score = max(1, min(10, int(round(score))))
|
||||
est_minutes = TIME_PER_POINT.get(final_score, 30)
|
||||
|
||||
return final_score, est_minutes, reasons
|
||||
|
||||
|
||||
def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity:
|
||||
pr_num = pr_data["number"]
|
||||
title = pr_data.get("title", "")
|
||||
files = client.get_pr_files(org, repo, pr_num)
|
||||
|
||||
additions = sum(f.get("additions", 0) for f in files)
|
||||
deletions = sum(f.get("deletions", 0) for f in files)
|
||||
filenames = [f.get("filename", "") for f in files]
|
||||
|
||||
has_deps = any(is_dependency_file(f) for f in filenames)
|
||||
|
||||
test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", "")))
|
||||
test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", "")))
|
||||
test_delta = test_added - test_removed if (test_added or test_removed) else None
|
||||
|
||||
score, est_min, reasons = score_pr(
|
||||
files_changed=len(files),
|
||||
additions=additions,
|
||||
deletions=deletions,
|
||||
has_dependency_changes=has_deps,
|
||||
test_coverage_delta=test_delta
|
||||
)
|
||||
|
||||
return PRComplexity(
|
||||
pr_number=pr_num,
|
||||
title=title,
|
||||
files_changed=len(files),
|
||||
additions=additions,
|
||||
deletions=deletions,
|
||||
has_dependency_changes=has_deps,
|
||||
test_coverage_delta=test_delta,
|
||||
score=score,
|
||||
estimated_minutes=est_min,
|
||||
reasons=reasons
|
||||
)
|
||||
|
||||
|
||||
def build_comment(complexity: PRComplexity) -> str:
|
||||
change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines"
|
||||
deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else ""
|
||||
test_note = ""
|
||||
if complexity.test_coverage_delta is not None:
|
||||
if complexity.test_coverage_delta > 0:
|
||||
test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added"
|
||||
elif complexity.test_coverage_delta < 0:
|
||||
test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed"
|
||||
|
||||
comment = f"## 📊 PR Complexity Analysis\n\n"
|
||||
comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n"
|
||||
comment += f"| Metric | Value |\n|--------|-------|\n"
|
||||
comment += f"| Changes | {change_desc} |\n"
|
||||
comment += f"| Complexity Score | **{complexity.score}/10** |\n"
|
||||
comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n"
|
||||
comment += f"### Scoring rationale:"
|
||||
for r in complexity.reasons:
|
||||
comment += f"\n- {r}"
|
||||
if deps_note:
|
||||
comment += deps_note
|
||||
if test_note:
|
||||
comment += test_note
|
||||
comment += f"\n\n---\n"
|
||||
comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*"
|
||||
return comment
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="PR Complexity Scorer")
|
||||
parser.add_argument("--org", default="Timmy_Foundation")
|
||||
parser.add_argument("--repo", default="compounding-intelligence")
|
||||
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
|
||||
parser.add_argument("--dry-run", action="store_true")
|
||||
parser.add_argument("--apply", action="store_true")
|
||||
parser.add_argument("--output", default="metrics/pr_complexity.json")
|
||||
args = parser.parse_args()
|
||||
|
||||
token_path = args.token
|
||||
if os.path.exists(token_path):
|
||||
with open(token_path) as f:
|
||||
token = f.read().strip()
|
||||
else:
|
||||
token = args.token
|
||||
|
||||
if not token:
|
||||
print("ERROR: No Gitea token provided", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
client = GiteaClient(token)
|
||||
|
||||
print(f"Fetching open PRs for {args.org}/{args.repo}...")
|
||||
prs = client.get_open_prs(args.org, args.repo)
|
||||
if not prs:
|
||||
print("No open PRs found.")
|
||||
sys.exit(0)
|
||||
|
||||
print(f"Found {len(prs)} open PR(s). Analyzing...")
|
||||
|
||||
results = []
|
||||
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for pr in prs:
|
||||
pr_num = pr["number"]
|
||||
title = pr.get("title", "")
|
||||
print(f" Analyzing PR #{pr_num}: {title[:60]}")
|
||||
|
||||
try:
|
||||
complexity = analyze_pr(client, args.org, args.repo, pr)
|
||||
results.append(complexity.to_dict())
|
||||
|
||||
comment = build_comment(complexity)
|
||||
|
||||
if args.dry_run:
|
||||
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]")
|
||||
elif args.apply:
|
||||
success = client.post_comment(args.org, args.repo, pr_num, comment)
|
||||
status = "[commented]" if success else "[FAILED]"
|
||||
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}")
|
||||
else:
|
||||
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]")
|
||||
|
||||
except Exception as e:
|
||||
print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr)
|
||||
|
||||
with open(args.output, "w") as f:
|
||||
json.dump({
|
||||
"org": args.org,
|
||||
"repo": args.repo,
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"pr_count": len(results),
|
||||
"results": results
|
||||
}, f, indent=2)
|
||||
|
||||
if results:
|
||||
scores = [r["score"] for r in results]
|
||||
print(f"\nResults saved to {args.output}")
|
||||
print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}")
|
||||
else:
|
||||
print("\nNo results to save.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,168 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Smoke test for code duplication detector — verifies:
|
||||
- Function extraction from Python files
|
||||
- Exact duplicate detection
|
||||
- Near-duplicate detection (token similarity)
|
||||
- Report generation and stats
|
||||
- JSON output format
|
||||
"""
|
||||
|
||||
import json
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
SCRIPT_DIR = Path(__file__).parent.absolute()
|
||||
sys.path.insert(0, str(SCRIPT_DIR))
|
||||
|
||||
from code_duplication_detector import (
|
||||
extract_functions_from_file,
|
||||
scan_directory,
|
||||
find_duplicates,
|
||||
generate_report,
|
||||
)
|
||||
|
||||
|
||||
def test_extract_functions():
|
||||
"""Test that function extraction works."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
test_file = Path(tmpdir) / 'sample.py'
|
||||
test_file.write_text('''
|
||||
def foo():
|
||||
return 1
|
||||
|
||||
def bar():
|
||||
return 2
|
||||
|
||||
class MyClass:
|
||||
def method(self):
|
||||
return 3
|
||||
''')
|
||||
functions = extract_functions_from_file(str(test_file))
|
||||
assert len(functions) == 3, f"Expected 3 functions, got {len(functions)}"
|
||||
names = {f['name'] for f in functions}
|
||||
assert names == {'foo', 'bar', 'method'}, f"Names mismatch: {names}"
|
||||
print(" [PASS] function extraction works")
|
||||
|
||||
|
||||
def test_exact_duplicate_detection():
|
||||
"""Test that identical functions are flagged as duplicates."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create two files with the same function
|
||||
f1 = Path(tmpdir) / 'a.py'
|
||||
f1.write_text('''
|
||||
def duplicated():
|
||||
x = 1
|
||||
y = 2
|
||||
return x + y
|
||||
''')
|
||||
f2 = Path(tmpdir) / 'b.py'
|
||||
f2.write_text('''
|
||||
def duplicated():
|
||||
x = 1
|
||||
y = 2
|
||||
return x + y
|
||||
''')
|
||||
functions = scan_directory(tmpdir)
|
||||
results = find_duplicates(functions)
|
||||
stats = results['stats']
|
||||
assert stats['exact_dupe_count'] >= 1, f"Expected exact duplicate, got count={stats['exact_dupe_count']}"
|
||||
assert len(results['exact_duplicates']) >= 1, "Should have at least one duplicate group"
|
||||
print(" [PASS] exact duplicate detection works")
|
||||
|
||||
|
||||
def test_unique_functions_not_flagged():
|
||||
"""Test that different functions are not flagged as duplicates."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
f1 = Path(tmpdir) / 'a.py'
|
||||
f1.write_text('def func_a(): return 1')
|
||||
f2 = Path(tmpdir) / 'b.py'
|
||||
f2.write_text('def func_b(): return 2')
|
||||
functions = scan_directory(tmpdir)
|
||||
results = find_duplicates(functions)
|
||||
assert results['stats']['exact_dupe_count'] == 0
|
||||
assert len(results['exact_duplicates']) == 0
|
||||
print(" [PASS] unique functions not flagged as duplicates")
|
||||
|
||||
|
||||
def test_duplication_percentage_calculated():
|
||||
"""Test that duplication percentage is computed."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create file with mostly duplicated content
|
||||
f1 = Path(tmpdir) / 'a.py'
|
||||
f1.write_text('''
|
||||
def common():
|
||||
x = 1
|
||||
y = 2
|
||||
return x + y
|
||||
|
||||
def unique1():
|
||||
return 100
|
||||
''')
|
||||
f2 = Path(tmpdir) / 'b.py'
|
||||
f2.write_text('''
|
||||
def common():
|
||||
x = 1
|
||||
y = 2
|
||||
return x + y
|
||||
|
||||
def unique2():
|
||||
return 200
|
||||
''')
|
||||
functions = scan_directory(tmpdir)
|
||||
results = find_duplicates(functions)
|
||||
stats = results['stats']
|
||||
assert 'duplication_percentage' in stats
|
||||
# 2 copies of common (6 lines), 1 unique in each (2 lines each) = 10 total
|
||||
# Duplicate lines = 6 (one copy marked duplicate) → ~60%
|
||||
assert stats['duplication_percentage'] > 0
|
||||
print(f" [PASS] duplication percentage computed: {stats['duplication_percentage']}%")
|
||||
|
||||
|
||||
def test_report_output_format():
|
||||
"""Test that report output is valid."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
f1 = Path(tmpdir) / 'a.py'
|
||||
f1.write_text('def dup(): return 1')
|
||||
f2 = Path(tmpdir) / 'b.py'
|
||||
f2.write_text('def dup(): return 1')
|
||||
functions = scan_directory(tmpdir)
|
||||
results = find_duplicates(functions)
|
||||
|
||||
# Text report
|
||||
text = generate_report(results, output_format='text')
|
||||
assert 'CODE DUPLICATION REPORT' in text
|
||||
assert 'Total functions' in text
|
||||
print(" [PASS] text report format valid")
|
||||
|
||||
# JSON report
|
||||
json_out = generate_report(results, output_format='json')
|
||||
data = json.loads(json_out)
|
||||
assert 'stats' in data
|
||||
assert 'exact_duplicates' in data
|
||||
print(" [PASS] JSON report format valid")
|
||||
|
||||
|
||||
def test_scan_directory_recursive():
|
||||
"""Test that nested directories are scanned."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
subdir = Path(tmpdir) / 'sub'
|
||||
subdir.mkdir()
|
||||
(subdir / 'nested.py').write_text('def nested(): pass')
|
||||
(Path(tmpdir) / 'root.py').write_text('def root(): pass')
|
||||
functions = scan_directory(tmpdir)
|
||||
names = {f['name'] for f in functions}
|
||||
assert 'nested' in names and 'root' in names
|
||||
print(" [PASS] recursive directory scanning works")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
print("Running code duplication detector smoke tests...")
|
||||
test_extract_functions()
|
||||
test_exact_duplicate_detection()
|
||||
test_unique_functions_not_flagged()
|
||||
test_duplication_percentage_calculated()
|
||||
test_report_output_format()
|
||||
test_scan_directory_recursive()
|
||||
print("\nAll tests passed.")
|
||||
@@ -1,170 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for PR Complexity Scorer — unit tests for the scoring logic.
|
||||
"""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
|
||||
from pr_complexity_scorer import (
|
||||
score_pr,
|
||||
is_dependency_file,
|
||||
is_test_file,
|
||||
TIME_PER_POINT,
|
||||
SMALL_FILES,
|
||||
MEDIUM_FILES,
|
||||
LARGE_FILES,
|
||||
SMALL_LINES,
|
||||
MEDIUM_LINES,
|
||||
LARGE_LINES,
|
||||
)
|
||||
|
||||
PASS = 0
|
||||
FAIL = 0
|
||||
|
||||
def test(name):
|
||||
def decorator(fn):
|
||||
global PASS, FAIL
|
||||
try:
|
||||
fn()
|
||||
PASS += 1
|
||||
print(f" [PASS] {name}")
|
||||
except AssertionError as e:
|
||||
FAIL += 1
|
||||
print(f" [FAIL] {name}: {e}")
|
||||
except Exception as e:
|
||||
FAIL += 1
|
||||
print(f" [FAIL] {name}: Unexpected error: {e}")
|
||||
return decorator
|
||||
|
||||
def assert_eq(a, b, msg=""):
|
||||
if a != b:
|
||||
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
|
||||
|
||||
def assert_true(v, msg=""):
|
||||
if not v:
|
||||
raise AssertionError(msg or "Expected True")
|
||||
|
||||
def assert_false(v, msg=""):
|
||||
if v:
|
||||
raise AssertionError(msg or "Expected False")
|
||||
|
||||
|
||||
print("=== PR Complexity Scorer Tests ===\n")
|
||||
|
||||
print("-- File Classification --")
|
||||
|
||||
@test("dependency file detection — requirements.txt")
|
||||
def _():
|
||||
assert_true(is_dependency_file("requirements.txt"))
|
||||
assert_true(is_dependency_file("src/requirements.txt"))
|
||||
assert_false(is_dependency_file("requirements_test.txt"))
|
||||
|
||||
@test("dependency file detection — pyproject.toml")
|
||||
def _():
|
||||
assert_true(is_dependency_file("pyproject.toml"))
|
||||
assert_false(is_dependency_file("myproject.py"))
|
||||
|
||||
@test("test file detection — pytest style")
|
||||
def _():
|
||||
assert_true(is_test_file("tests/test_api.py"))
|
||||
assert_true(is_test_file("test_module.py"))
|
||||
assert_true(is_test_file("src/module_test.py"))
|
||||
|
||||
@test("test file detection — other frameworks")
|
||||
def _():
|
||||
assert_true(is_test_file("spec/feature_spec.rb"))
|
||||
assert_true(is_test_file("__tests__/component.test.js"))
|
||||
assert_false(is_test_file("testfixtures/helper.py"))
|
||||
|
||||
|
||||
print("\n-- Scoring Logic --")
|
||||
|
||||
@test("small PR gets low score (1-3)")
|
||||
def _():
|
||||
score, minutes, _ = score_pr(
|
||||
files_changed=3,
|
||||
additions=50,
|
||||
deletions=10,
|
||||
has_dependency_changes=False,
|
||||
test_coverage_delta=None
|
||||
)
|
||||
assert_true(1 <= score <= 3, f"Score should be low, got {score}")
|
||||
assert_true(minutes < 20)
|
||||
|
||||
@test("medium PR gets medium score (4-6)")
|
||||
def _():
|
||||
score, minutes, _ = score_pr(
|
||||
files_changed=15,
|
||||
additions=400,
|
||||
deletions=100,
|
||||
has_dependency_changes=False,
|
||||
test_coverage_delta=None
|
||||
)
|
||||
assert_true(4 <= score <= 6, f"Score should be medium, got {score}")
|
||||
assert_true(20 <= minutes <= 45)
|
||||
|
||||
@test("large PR gets high score (7-9)")
|
||||
def _():
|
||||
score, minutes, _ = score_pr(
|
||||
files_changed=60,
|
||||
additions=3000,
|
||||
deletions=1500,
|
||||
has_dependency_changes=True,
|
||||
test_coverage_delta=None
|
||||
)
|
||||
assert_true(7 <= score <= 9, f"Score should be high, got {score}")
|
||||
assert_true(minutes >= 45)
|
||||
|
||||
@test("dependency changes boost score")
|
||||
def _():
|
||||
base_score, _, _ = score_pr(
|
||||
files_changed=10, additions=200, deletions=50,
|
||||
has_dependency_changes=False, test_coverage_delta=None
|
||||
)
|
||||
dep_score, _, _ = score_pr(
|
||||
files_changed=10, additions=200, deletions=50,
|
||||
has_dependency_changes=True, test_coverage_delta=None
|
||||
)
|
||||
assert_true(dep_score > base_score, f"Deps: {base_score} -> {dep_score}")
|
||||
|
||||
@test("adding tests lowers complexity")
|
||||
def _():
|
||||
base_score, _, _ = score_pr(
|
||||
files_changed=8, additions=150, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=None
|
||||
)
|
||||
better_score, _, _ = score_pr(
|
||||
files_changed=8, additions=180, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=3
|
||||
)
|
||||
assert_true(better_score < base_score, f"Tests: {base_score} -> {better_score}")
|
||||
|
||||
@test("removing tests increases complexity")
|
||||
def _():
|
||||
base_score, _, _ = score_pr(
|
||||
files_changed=8, additions=150, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=None
|
||||
)
|
||||
worse_score, _, _ = score_pr(
|
||||
files_changed=8, additions=150, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=-2
|
||||
)
|
||||
assert_true(worse_score > base_score, f"Remove tests: {base_score} -> {worse_score}")
|
||||
|
||||
@test("score bounded 1-10")
|
||||
def _():
|
||||
for files, adds, dels in [(1, 10, 5), (100, 10000, 5000)]:
|
||||
score, _, _ = score_pr(files, adds, dels, False, None)
|
||||
assert_true(1 <= score <= 10, f"Score {score} out of range")
|
||||
|
||||
@test("estimated minutes exist for all scores")
|
||||
def _():
|
||||
for s in range(1, 11):
|
||||
assert_true(s in TIME_PER_POINT, f"Missing time for score {s}")
|
||||
|
||||
|
||||
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
|
||||
sys.exit(0 if FAIL == 0 else 1)
|
||||
222
tests/test_linter_runner.py
Normal file
222
tests/test_linter_runner.py
Normal file
@@ -0,0 +1,222 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Tests for linter_runner module (Issue #155).
|
||||
|
||||
Tests cover:
|
||||
- Language detection by file extension
|
||||
- Linter result aggregation
|
||||
- Violation parsing (JSON output formats)
|
||||
- Exit code logic (fail-on)
|
||||
- Report formatting (console/JSON)
|
||||
"""
|
||||
import json
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
# Add scripts to path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
|
||||
|
||||
from linter_runner import (
|
||||
Violation,
|
||||
LinterResult,
|
||||
detect_languages,
|
||||
parse_linter_output,
|
||||
_result_to_dict,
|
||||
EXTENSION_TO_LANGUAGE,
|
||||
LINTERS_BY_LANGUAGE,
|
||||
)
|
||||
|
||||
|
||||
class TestLanguageDetection:
|
||||
"""Test detect_languages() identifies languages correctly."""
|
||||
|
||||
def test_detects_python_files(self, tmp_path: Path):
|
||||
(tmp_path / "main.py").write_text("print('hello')")
|
||||
(tmp_path / "lib" / "utils.py").mkdir(parents=True)
|
||||
(tmp_path / "lib" / "utils.py").write_text("def foo(): pass")
|
||||
|
||||
result = detect_languages(tmp_path)
|
||||
assert "python" in result
|
||||
assert len(result["python"]) == 2
|
||||
|
||||
def test_detects_javascript_files(self, tmp_path: Path):
|
||||
(tmp_path / "app.js").write_text("console.log('hi')")
|
||||
(tmp_path / "component.jsx").write_text("<div/>")
|
||||
|
||||
result = detect_languages(tmp_path)
|
||||
assert "javascript" in result
|
||||
assert len(result["javascript"]) == 2
|
||||
|
||||
def test_detects_shell_files(self, tmp_path: Path):
|
||||
(tmp_path / "setup.sh").write_text("#!/bin/bash\necho hi")
|
||||
(tmp_path / "build.sh").write_text("make")
|
||||
|
||||
result = detect_languages(tmp_path)
|
||||
assert "shell" in result
|
||||
assert len(result["shell"]) == 2
|
||||
|
||||
def test_detects_yaml_files(self, tmp_path: Path):
|
||||
(tmp_path / "config.yml").write_text("key: value")
|
||||
(tmp_path / "env.yaml").write_text("env: test")
|
||||
|
||||
result = detect_languages(tmp_path)
|
||||
assert "yaml" in result
|
||||
assert len(result["yaml"]) == 2
|
||||
|
||||
def test_ignores_git_directory(self, tmp_path: Path):
|
||||
git_dir = tmp_path / ".git"
|
||||
git_dir.mkdir()
|
||||
(git_dir / "config").write_text("placeholder")
|
||||
(tmp_path / "script.py").write_text("print(1)")
|
||||
|
||||
result = detect_languages(tmp_path)
|
||||
assert "python" in result
|
||||
assert not any(".git" in str(f) for f in result.get("python", []))
|
||||
|
||||
def test_returns_empty_for_nonexistent_path(self):
|
||||
result = detect_languages(Path("/nonexistent/path/xyz"))
|
||||
assert result == {}
|
||||
|
||||
def test_mixed_languages(self, tmp_path: Path):
|
||||
(tmp_path / "app.py").write_text("")
|
||||
(tmp_path / "main.js").write_text("")
|
||||
(tmp_path / "deploy.sh").write_text("")
|
||||
|
||||
result = detect_languages(tmp_path)
|
||||
langs = set(result.keys())
|
||||
assert {"python", "javascript", "shell"} <= langs
|
||||
|
||||
def test_limits_files_to_known_languages(self, tmp_path: Path):
|
||||
(tmp_path / "readme.txt").write_text("text")
|
||||
(tmp_path / "data.csv").write_text("a,b,c")
|
||||
|
||||
result = detect_languages(tmp_path)
|
||||
assert len(result) == 0
|
||||
|
||||
|
||||
class TestViolationParsing:
|
||||
"""Test parse_linter_output parses various linter formats."""
|
||||
|
||||
def test_parses_pylint_json(self):
|
||||
stdout = json.dumps([
|
||||
{"type": "error", "module": "test.py", "line": 10, "column": 5,
|
||||
"message": "Missing docstring", "symbol": "missing-docstring"},
|
||||
{"type": "warning", "module": "test.py", "line": 15, "column": 1,
|
||||
"message": "Line too long", "symbol": "line-too-long"},
|
||||
])
|
||||
violations = parse_linter_output("pylint", stdout, "", Path("/repo"))
|
||||
assert len(violations) == 2
|
||||
assert violations[0].severity == "error"
|
||||
assert violations[0].message == "Missing docstring"
|
||||
assert violations[1].severity == "warning"
|
||||
assert violations[1].code == "line-too-long"
|
||||
|
||||
def test_parses_ruff_json(self):
|
||||
stdout = json.dumps([
|
||||
{"filename": "src/main.py", "location": {"row": 5, "column": 1},
|
||||
"code": "E501", "message": "Line too long"},
|
||||
])
|
||||
violations = parse_linter_output("ruff", stdout, "", Path("/repo"))
|
||||
assert len(violations) == 1
|
||||
assert violations[0].file == "src/main.py"
|
||||
assert violations[0].line == 5
|
||||
assert violations[0].code == "E501"
|
||||
|
||||
def test_parses_eslint_json(self):
|
||||
stdout = json.dumps([
|
||||
{"fileName": "app.js", "range": {"start": {"line": 2, "column": 0}},
|
||||
"message": "Unexpected console statement", "severity": 2, "ruleId": "no-console"},
|
||||
])
|
||||
violations = parse_linter_output("eslint", stdout, "", Path("/repo"))
|
||||
assert len(violations) == 1
|
||||
assert violations[0].severity == "error"
|
||||
assert violations[0].code == "no-console"
|
||||
|
||||
def test_parses_shellcheck_json1(self):
|
||||
stdout = json.dumps({
|
||||
"issues": [
|
||||
{"file": "script.sh", "line": 3, "column": 1,
|
||||
"message": "Quote this to prevent word splitting", "level": "warning", "code": "SC2086"},
|
||||
]
|
||||
})
|
||||
violations = parse_linter_output("shellcheck", stdout, "", Path("/repo"))
|
||||
assert len(violations) == 1
|
||||
assert violations[0].severity == "warning"
|
||||
assert violations[0].code == "SC2086"
|
||||
|
||||
def test_parses_yamllint_parsable(self):
|
||||
stdout = "config.yaml:3:1: [error] wrong document start (document-start)\n"
|
||||
violations = parse_linter_output("yamllint", stdout, "", Path("/repo"))
|
||||
assert len(violations) == 1
|
||||
assert violations[0].file == "config.yaml"
|
||||
assert violations[0].line == 3
|
||||
assert violations[0].severity == "error"
|
||||
assert violations[0].code == "document-start"
|
||||
|
||||
def test_returns_empty_on_invalid_json(self):
|
||||
stdout = "Not valid JSON"
|
||||
violations = parse_linter_output("pylint", stdout, "", Path("/repo"))
|
||||
assert violations == []
|
||||
|
||||
def test_strips_leading_slash_from_paths(self):
|
||||
stdout = json.dumps([{"type": "error", "module": "/repo/src/test.py",
|
||||
"line": 1, "column": 1, "message": "test", "symbol": "T001"}])
|
||||
violations = parse_linter_output("pylint", stdout, "", Path("/repo"))
|
||||
assert violations[0].file == "src/test.py"
|
||||
|
||||
|
||||
class TestLinterResult:
|
||||
"""Test LinterResult and JSON serialization."""
|
||||
|
||||
def test_result_to_dict_roundtrip(self):
|
||||
v = Violation(file="test.py", line=10, column=5, message="msg",
|
||||
severity="error", linter="pylint", code="E001")
|
||||
r = LinterResult(linter_name="pylint", language="python", violations=[v])
|
||||
d = _result_to_dict(r)
|
||||
assert d["linter"] == "pylint"
|
||||
assert d["violations"][0]["file"] == "test.py"
|
||||
assert d["violations"][0]["code"] == "E001"
|
||||
|
||||
|
||||
class TestIntegration:
|
||||
"""End-to-end integration tests with temporary repos."""
|
||||
|
||||
def test_linter_runner_accepts_repo_path(self, tmp_path: Path):
|
||||
(tmp_path / "main.py").write_text("print('hello')")
|
||||
(tmp_path / "bad.py").write_text("import unused_module\nx=1")
|
||||
|
||||
from linter_runner import detect_languages, run_linters_for_language
|
||||
|
||||
langs = detect_languages(tmp_path)
|
||||
assert "python" in langs
|
||||
|
||||
result = run_linters_for_language("python", langs["python"][:1], tmp_path)
|
||||
assert result.language == "python"
|
||||
assert result.violations or result.error # either linter output or not-installed
|
||||
|
||||
def test_json_output_structure(self, tmp_path: Path):
|
||||
(tmp_path / "script.py").write_text("print(1)")
|
||||
|
||||
from linter_runner import detect_languages, run_linters_for_language, _result_to_dict
|
||||
|
||||
langs = detect_languages(tmp_path)
|
||||
if "python" not in langs:
|
||||
pytest.skip("No Python files detected")
|
||||
|
||||
result = run_linters_for_language("python", langs["python"], tmp_path)
|
||||
report = {
|
||||
"repo": tmp_path.name,
|
||||
"languages": {"python": _result_to_dict(result)},
|
||||
"summary": {
|
||||
"total_issues": len(result.violations),
|
||||
"errors": sum(1 for v in result.violations if v.severity == "error"),
|
||||
},
|
||||
}
|
||||
json.dumps(report) # should not raise
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Run: pytest tests/test_linter_runner.py -v")
|
||||
|
||||
Reference in New Issue
Block a user