Compare commits
1 Commits
step35/172
...
step35/155
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b4a3501aa3 |
@@ -75,7 +75,7 @@ class GapReport:
|
||||
return {
|
||||
"repo_path": self.repo_path,
|
||||
"total_gaps": len(self.gaps),
|
||||
"stats": {k.value: len(v) for k, v in
|
||||
"stats": {k: len(v) for k, v in
|
||||
{gt: [g for g in self.gaps if g.gap_type == gt]
|
||||
for gt in GapType}.items() if v},
|
||||
"gaps": [
|
||||
@@ -273,44 +273,3 @@ class KnowledgeGapIdentifier:
|
||||
))
|
||||
|
||||
return report
|
||||
|
||||
def main() -> None:
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Knowledge Gap Identifier — cross-reference code, docs, and tests to find gaps"
|
||||
)
|
||||
parser.add_argument(
|
||||
"repo_path",
|
||||
nargs="?",
|
||||
default=".",
|
||||
help="Path to repository root (default: current directory)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--json",
|
||||
action="store_true",
|
||||
help="Output report as JSON instead of human-readable summary"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-o", "--output",
|
||||
help="Write report to file instead of stdout"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
report = KnowledgeGapIdentifier().analyze(args.repo_path)
|
||||
|
||||
if args.json:
|
||||
output = json.dumps(report.to_dict(), indent=2, default=str)
|
||||
else:
|
||||
output = report.summary()
|
||||
|
||||
if args.output:
|
||||
with open(args.output, "w") as fh:
|
||||
print(output, file=fh)
|
||||
else:
|
||||
print(output)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
530
scripts/linter_runner.py
Normal file
530
scripts/linter_runner.py
Normal file
@@ -0,0 +1,530 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Linter Runner — detect languages and run linters across a repo.
|
||||
|
||||
Acceptance criteria for #155:
|
||||
[x] Detects language per repo
|
||||
[x] Runs: pylint, eslint, shellcheck, etc.
|
||||
[x] Collects violations (file, line, message, severity)
|
||||
[x] Output: lint report per repo
|
||||
|
||||
Usage:
|
||||
python3 scripts/linter_runner.py --repo .
|
||||
python3 scripts/linter_runner.py --all # Scan all repos in knowledge/repos/
|
||||
python3 scripts/linter_runner.py --repo . --format json # Machine-readable output
|
||||
python3 scripts/linter_runner.py --repo . --fail-on error # Exit non-zero if errors found
|
||||
|
||||
Output format (console):
|
||||
=== Lint Report: repo ===
|
||||
Python: 3 issues (1 error, 2 warnings)
|
||||
Shell: 1 issue (1 error)
|
||||
Total: 4 issues
|
||||
|
||||
Output format (JSON): --format json
|
||||
{"repo": "...", "issues": [...], "summary": {...}}
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import dataclass, asdict
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
SCRIPT_DIR = Path(__file__).resolve().parent
|
||||
REPO_ROOT = SCRIPT_DIR.parent
|
||||
|
||||
|
||||
@dataclass
|
||||
class Violation:
|
||||
"""A single lint violation."""
|
||||
file: str
|
||||
line: Optional[int]
|
||||
column: Optional[int]
|
||||
message: str
|
||||
severity: str # "error", "warning", "info"
|
||||
linter: str
|
||||
code: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class LinterResult:
|
||||
"""Result from running a single linter."""
|
||||
linter_name: str
|
||||
language: str
|
||||
violations: list[Violation]
|
||||
timed_out: bool = False
|
||||
error: Optional[str] = None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Language detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
EXTENSION_TO_LANGUAGE = {
|
||||
".py": "python",
|
||||
".js": "javascript",
|
||||
".ts": "typescript",
|
||||
".jsx": "javascript",
|
||||
".tsx": "typescript",
|
||||
".sh": "shell",
|
||||
".bash": "shell",
|
||||
".zsh": "shell",
|
||||
".yaml": "yaml",
|
||||
".yml": "yaml",
|
||||
".json": "json",
|
||||
".md": "markdown",
|
||||
".rb": "ruby",
|
||||
".go": "go",
|
||||
".rs": "rust",
|
||||
".c": "c",
|
||||
".cpp": "cpp",
|
||||
".h": "c",
|
||||
".java": "java",
|
||||
".php": "php",
|
||||
".swift": "swift",
|
||||
".kt": "kotlin",
|
||||
".scala": "scala",
|
||||
}
|
||||
|
||||
# Which linters to run per language, in order of preference
|
||||
LINTERS_BY_LANGUAGE = {
|
||||
"python": [
|
||||
("pylint", ["pylint", "--output-format=json", "--reports=no"]),
|
||||
("ruff", ["ruff", "check", "--output-format=json"]),
|
||||
("flake8", ["flake8", "--format=json"]),
|
||||
],
|
||||
"javascript": [
|
||||
("eslint", ["eslint", "--format=json", "--max-warnings=0"]),
|
||||
],
|
||||
"typescript": [
|
||||
("eslint", ["eslint", "--format=json", "--max-warnings=0"]),
|
||||
],
|
||||
"shell": [
|
||||
("shellcheck", ["shellcheck", "--format=json1"]),
|
||||
],
|
||||
"yaml": [
|
||||
("yamllint", ["yamllint", "-f", "parsable"]),
|
||||
],
|
||||
"json": [
|
||||
("jsonlinter", ["python3", "-m", "json.tool"]), # Simple syntax check
|
||||
],
|
||||
"markdown": [], # No linter yet
|
||||
"ruby": [
|
||||
("rubocop", ["rubocop", "--format", "json"]),
|
||||
],
|
||||
"go": [
|
||||
("golangci-lint", ["golangci-lint", "run", "--out-format", "json"]),
|
||||
],
|
||||
"rust": [
|
||||
("cargo clippy", ["cargo", "clippy", "--message-format=json"]),
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
def detect_languages(repo_path: Path) -> dict[str, list[Path]]:
|
||||
"""
|
||||
Scan repo and return mapping: language -> list of file paths.
|
||||
Only includes languages we have linters for."""
|
||||
language_files: dict[str, list[Path]] = {lang: [] for lang in LINTERS_BY_LANGUAGE.keys()}
|
||||
|
||||
if not repo_path.exists():
|
||||
return language_files
|
||||
|
||||
exclude_dirs = {".git", ".gitea", "node_modules", "__pycache__", ".venv", "venv", "build", "dist"}
|
||||
|
||||
for root, dirs, files in os.walk(repo_path):
|
||||
# Prune excluded dirs
|
||||
dirs[:] = [d for d in dirs if d not in exclude_dirs]
|
||||
|
||||
for fname in files:
|
||||
file_path = Path(root) / fname
|
||||
suffix = file_path.suffix.lower()
|
||||
lang = EXTENSION_TO_LANGUAGE.get(suffix)
|
||||
if lang and lang in LINTERS_BY_LANGUAGE and LINTERS_BY_LANGUAGE[lang]:
|
||||
language_files[lang].append(file_path)
|
||||
|
||||
# Remove empty languages
|
||||
return {lang: files for lang, files in language_files.items() if files}
|
||||
|
||||
|
||||
def find_linter_executable(name: str) -> Optional[str]:
|
||||
"""Find linter binary in PATH, return full path or None."""
|
||||
for path_dir in os.environ.get("PATH", "").split(os.pathsep):
|
||||
candidate = Path(path_dir) / name
|
||||
if candidate.exists():
|
||||
return str(candidate)
|
||||
# Special handling for multi-word linters like "cargo clippy"
|
||||
if " " in name:
|
||||
primary = name.split()[0]
|
||||
for path_dir in os.environ.get("PATH", "").split(os.pathsep):
|
||||
candidate = Path(path_dir) / primary
|
||||
if candidate.exists():
|
||||
return name # Return full command string
|
||||
return None
|
||||
|
||||
|
||||
def run_linter(
|
||||
linter_name: str,
|
||||
command_template: list[str],
|
||||
files: list[Path],
|
||||
repo_path: Path,
|
||||
) -> LinterResult:
|
||||
"""
|
||||
Execute a linter on a set of files.
|
||||
Returns LinterResult with violations or error.
|
||||
"""
|
||||
# Build command: [linter_bin, args..., files...]
|
||||
# Most linters accept file paths as positional args at the end
|
||||
cmd = [linter_name] if " " not in linter_name else linter_name.split()
|
||||
cmd.extend(command_template[1:]) # Skip the duplicated linter name from template
|
||||
|
||||
# Add file paths, relative to repo root for cleaner output
|
||||
rel_files = [str(f.relative_to(repo_path)) for f in files]
|
||||
cmd.extend(rel_files)
|
||||
|
||||
try:
|
||||
proc = subprocess.run(
|
||||
cmd,
|
||||
cwd=repo_path,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=60,
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
return LinterResult(
|
||||
linter_name=linter_name,
|
||||
language="unknown",
|
||||
violations=[],
|
||||
timed_out=True,
|
||||
error="Linter timed out after 60s",
|
||||
)
|
||||
except FileNotFoundError:
|
||||
return LinterResult(
|
||||
linter_name=linter_name,
|
||||
language="unknown",
|
||||
violations=[],
|
||||
error=f"Linter not found: {linter_name}",
|
||||
)
|
||||
|
||||
# Parse output based on linter type
|
||||
violations = parse_linter_output(linter_name, proc.stdout, proc.stderr, repo_path)
|
||||
|
||||
return LinterResult(
|
||||
linter_name=linter_name,
|
||||
language=guess_language_for_linter(linter_name),
|
||||
violations=violations,
|
||||
error=proc.stderr.strip() if proc.returncode != 0 and not violations else None,
|
||||
)
|
||||
|
||||
|
||||
def guess_language_for_linter(linter_name: str) -> str:
|
||||
"""Map linter name back to language category."""
|
||||
mapping = {
|
||||
"pylint": "python",
|
||||
"ruff": "python",
|
||||
"flake8": "python",
|
||||
"eslint": "javascript",
|
||||
"shellcheck": "shell",
|
||||
"yamllint": "yaml",
|
||||
"jsonlinter": "json",
|
||||
"rubocop": "ruby",
|
||||
"golangci-lint": "go",
|
||||
"cargo clippy": "rust",
|
||||
}
|
||||
return mapping.get(linter_name, "unknown")
|
||||
|
||||
|
||||
def parse_linter_output(
|
||||
linter_name: str,
|
||||
stdout: str,
|
||||
stderr: str,
|
||||
repo_path: Path,
|
||||
) -> list[Violation]:
|
||||
"""
|
||||
Parse linter output into Violation objects.
|
||||
Supports JSON output (pylint, ruff, eslint, shellcheck json1, yamllint parsable).
|
||||
"""
|
||||
violations: list[Violation] = []
|
||||
|
||||
if linter_name in ("pylint", "ruff", "eslint"):
|
||||
# JSON array output
|
||||
try:
|
||||
data = json.loads(stdout)
|
||||
except json.JSONDecodeError:
|
||||
return []
|
||||
|
||||
if linter_name == "pylint":
|
||||
for msg in data:
|
||||
violations.append(Violation(
|
||||
file=msg.get("path", "").lstrip("./"),
|
||||
line=msg.get("line"),
|
||||
column=msg.get("column"),
|
||||
message=msg.get("message", ""),
|
||||
severity="error" if msg.get("type") == "error" else "warning",
|
||||
linter=linter_name,
|
||||
code=msg.get("symbol"),
|
||||
))
|
||||
elif linter_name == "ruff":
|
||||
for entry in data:
|
||||
violations.append(Violation(
|
||||
file=entry.get("filename", "").lstrip("./"),
|
||||
line=entry.get("location", {}).get("row"),
|
||||
column=entry.get("location", {}).get("column"),
|
||||
message=entry.get("message", ""),
|
||||
severity="error", # ruff treats all as errors
|
||||
linter=linter_name,
|
||||
code=entry.get("code"),
|
||||
))
|
||||
elif linter_name == "eslint":
|
||||
for entry in data:
|
||||
violations.append(Violation(
|
||||
file=entry.get("fileName", "").lstrip("./"),
|
||||
line=entry.get("range", {}).get("start", {}).get("line"),
|
||||
column=entry.get("range", {}).get("start", {}).get("column"),
|
||||
message=entry.get("message", ""),
|
||||
severity=entry.get("severity", 1) and "error" or "warning",
|
||||
linter=linter_name,
|
||||
code=entry.get("ruleId"),
|
||||
))
|
||||
|
||||
elif linter_name == "shellcheck":
|
||||
# shellcheck --format=json1
|
||||
try:
|
||||
data = json.loads(stdout)
|
||||
for issue in data.get("issues", []):
|
||||
violations.append(Violation(
|
||||
file=issue.get("file", "").lstrip("./"),
|
||||
line=issue.get("line"),
|
||||
column=issue.get("column"),
|
||||
message=issue.get("message", ""),
|
||||
severity="error" if issue.get("level") == "error" else "warning",
|
||||
linter=linter_name,
|
||||
code=str(issue.get("code")),
|
||||
))
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
elif linter_name == "yamllint":
|
||||
# parsable: file:line:col: level message [rule]
|
||||
# Example: test.yaml:3:1: [error] wrong document start (document-start)
|
||||
for line in stdout.splitlines():
|
||||
parts = line.split(":")
|
||||
if len(parts) >= 4:
|
||||
file_rel = parts[0].lstrip("./")
|
||||
line_num = int(parts[1]) if parts[1].isdigit() else None
|
||||
col_num = int(parts[2]) if parts[2].isdigit() else None
|
||||
rest = ":".join(parts[3:]).strip()
|
||||
# Parse: "[error] message (rule)"
|
||||
import re
|
||||
m = re.match(r'\[(\w+)\]\s+(.+?)(?:\s+\(([^)]+)\))?$', rest)
|
||||
if m:
|
||||
severity = m.group(1).lower()
|
||||
message = m.group(2)
|
||||
code = m.group(3)
|
||||
violations.append(Violation(
|
||||
file=file_rel,
|
||||
line=line_num,
|
||||
column=col_num,
|
||||
message=message,
|
||||
severity=severity,
|
||||
linter=linter_name,
|
||||
code=code,
|
||||
))
|
||||
|
||||
elif linter_name == "jsonlinter":
|
||||
# json.tool syntax check — no violations, just exit code
|
||||
if proc.returncode != 0:
|
||||
violations.append(Violation(
|
||||
file="(multiple)",
|
||||
line=None,
|
||||
column=None,
|
||||
message="JSON syntax error (run json.tool on each file individually)",
|
||||
severity="error",
|
||||
linter="json.tool",
|
||||
))
|
||||
|
||||
return violations
|
||||
|
||||
|
||||
def run_linters_for_language(
|
||||
language: str,
|
||||
files: list[Path],
|
||||
repo_path: Path,
|
||||
) -> LinterResult:
|
||||
"""
|
||||
Run the first available linter for this language.
|
||||
Returns the first successful run, or aggregates all errors if none available.
|
||||
"""
|
||||
linter_options = LINTERS_BY_LANGUAGE.get(language, [])
|
||||
if not linter_options:
|
||||
return LinterResult(linter_name="none", language=language, violations=[],
|
||||
error=f"No linter configured for {language}")
|
||||
|
||||
for linter_name, cmd_template in linter_options:
|
||||
# Check if linter exists
|
||||
if not find_linter_executable(linter_name):
|
||||
continue # Try next linter for this language
|
||||
|
||||
result = run_linter(linter_name, cmd_template, files, repo_path)
|
||||
if not result.error and not result.timed_out:
|
||||
return result
|
||||
# If this linter failed to start (not found), try next
|
||||
if result.error and "not found" in result.error.lower():
|
||||
continue
|
||||
|
||||
# All linters failed
|
||||
errors = []
|
||||
for linter_name, _ in linter_options:
|
||||
if find_linter_executable(linter_name):
|
||||
errors.append(f"{linter_name}: not runnable")
|
||||
else:
|
||||
errors.append(f"{linter_name}: not installed")
|
||||
return LinterResult(
|
||||
linter_name="/".join(l[0] for l in linter_options),
|
||||
language=language,
|
||||
violations=[],
|
||||
error="; ".join(errors),
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def parse_args():
|
||||
p = argparse.ArgumentParser(description="Linter Runner for compounding-intelligence")
|
||||
p.add_argument("--repo", type=str, help="Path to repository (absolute or relative)")
|
||||
p.add_argument("--all", action="store_true", help="Scan all repos in knowledge/repos/")
|
||||
p.add_argument("--format", choices=["console", "json"], default="console",
|
||||
help="Output format (default: console)")
|
||||
p.add_argument("--fail-on", choices=["error", "warning", "any"], default="error",
|
||||
help="Exit non-zero if any violations at this level are found")
|
||||
p.add_argument("--output", type=str, help="Write report to file (default: stdout)")
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
|
||||
if not args.repo and not args.all:
|
||||
print("ERROR: Must specify --repo <path> or --all", file=sys.stderr)
|
||||
sys.exit(2)
|
||||
|
||||
repos_to_scan = []
|
||||
if args.repo:
|
||||
repos_to_scan.append(Path(args.repo).resolve())
|
||||
if args.all:
|
||||
repos_dir = REPO_ROOT / "knowledge" / "repos"
|
||||
if repos_dir.exists():
|
||||
for yaml_file in repos_dir.glob("*.yaml"):
|
||||
# Extract repo name from filename
|
||||
repos_to_scan.append(REPO_ROOT / yaml_file.stem)
|
||||
else:
|
||||
print(f"WARNING: knowledge/repos/ not found, --all has nothing to scan", file=sys.stderr)
|
||||
|
||||
all_results: dict[str, dict] = {}
|
||||
exit_code = 0
|
||||
|
||||
for repo_path in repos_to_scan:
|
||||
if not repo_path.exists():
|
||||
print(f"WARNING: Repo not found: {repo_path}", file=sys.stderr)
|
||||
continue
|
||||
|
||||
repo_name = repo_path.name
|
||||
print(f"\n=== Scanning: {repo_name} ===") if args.format == "console" else None
|
||||
|
||||
lang_files = detect_languages(repo_path)
|
||||
results_by_lang: dict[str, LinterResult] = {}
|
||||
|
||||
for language, files in sorted(lang_files.items()):
|
||||
# Limit files for sanity (first 200 for now)
|
||||
if len(files) > 200:
|
||||
print(f" {language}: {len(files)} files (limiting to first 200)", file=sys.stderr)
|
||||
files = files[:200]
|
||||
|
||||
result = run_linters_for_language(language, files, repo_path)
|
||||
results_by_lang[language] = result
|
||||
|
||||
if args.format == "console":
|
||||
_print_language_result(language, result, repo_name)
|
||||
else:
|
||||
pass # JSON aggregation below
|
||||
|
||||
# Build summary
|
||||
total_issues = sum(len(r.violations) for r in results_by_lang.values())
|
||||
total_errors = sum(1 for v in (v for r in results_by_lang.values() for v in r.violations)
|
||||
if v.severity == "error")
|
||||
total_warnings = sum(1 for v in (v for r in results_by_lang.values() for v in r.violations)
|
||||
if v.severity == "warning")
|
||||
|
||||
if args.format == "console":
|
||||
print(f" Summary: {total_issues} issues ({total_errors} errors, {total_warnings} warnings)")
|
||||
else:
|
||||
all_results[repo_name] = {
|
||||
"languages": {lang: _result_to_dict(res) for lang, res in results_by_lang.items()},
|
||||
"summary": {
|
||||
"total_issues": total_issues,
|
||||
"errors": total_errors,
|
||||
"warnings": total_warnings,
|
||||
},
|
||||
}
|
||||
|
||||
# Determine exit code based on --fail-on
|
||||
if args.fail_on == "error" and total_errors > 0:
|
||||
exit_code = 1
|
||||
elif args.fail_on == "warning" and total_issues > 0:
|
||||
exit_code = 1
|
||||
elif args.fail_on == "any" and total_issues > 0:
|
||||
exit_code = 1
|
||||
|
||||
if args.format == "json":
|
||||
output = json.dumps({"repos": all_results, "meta": {"scanned": len(repos_to_scan)}}, indent=2)
|
||||
if args.output:
|
||||
Path(args.output).write_text(output)
|
||||
else:
|
||||
print(output)
|
||||
|
||||
sys.exit(exit_code)
|
||||
|
||||
|
||||
def _print_language_result(language: str, result: LinterResult, repo_name: str):
|
||||
"""Pretty-print a single language's lint results."""
|
||||
status = "✓"
|
||||
if result.error:
|
||||
status = "✗"
|
||||
print(f" {language}: {result.error}")
|
||||
elif result.timed_out:
|
||||
status = "⌛"
|
||||
print(f" {language}: timed out")
|
||||
else:
|
||||
n_violations = len(result.violations)
|
||||
if n_violations == 0:
|
||||
print(f" {language}: clean")
|
||||
else:
|
||||
errors = sum(1 for v in result.violations if v.severity == "error")
|
||||
warnings = n_violations - errors
|
||||
print(f" {language}: {n_violations} issues ({errors} errors, {warnings} warnings)")
|
||||
# Show first 3 violations as preview
|
||||
for v in result.violations[:3]:
|
||||
loc = f"{v.file}:{v.line or '?'}"
|
||||
print(f" {loc} [{v.severity.upper()}] {v.message[:70]}")
|
||||
if len(result.violations) > 3:
|
||||
print(f" ... and {len(result.violations) - 3} more")
|
||||
|
||||
|
||||
def _result_to_dict(result: LinterResult) -> dict:
|
||||
return {
|
||||
"linter": result.linter_name,
|
||||
"language": result.language,
|
||||
"violations": [asdict(v) for v in result.violations],
|
||||
"timed_out": result.timed_out,
|
||||
"error": result.error,
|
||||
}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,351 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
PR Complexity Scorer - Estimate review effort for PRs.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from dataclasses import dataclass, asdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
|
||||
DEPENDENCY_FILES = {
|
||||
"requirements.txt", "pyproject.toml", "setup.py", "setup.cfg",
|
||||
"Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile",
|
||||
"go.mod", "Cargo.toml", "pom.xml", "build.gradle"
|
||||
}
|
||||
|
||||
TEST_PATTERNS = [
|
||||
r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$",
|
||||
r"spec/.*\.rb$", r".*_spec\.rb$",
|
||||
r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$"
|
||||
]
|
||||
|
||||
WEIGHT_FILES = 0.25
|
||||
WEIGHT_LINES = 0.25
|
||||
WEIGHT_DEPS = 0.30
|
||||
WEIGHT_TEST_COV = 0.20
|
||||
|
||||
SMALL_FILES = 5
|
||||
MEDIUM_FILES = 20
|
||||
LARGE_FILES = 50
|
||||
|
||||
SMALL_LINES = 100
|
||||
MEDIUM_LINES = 500
|
||||
LARGE_LINES = 2000
|
||||
|
||||
TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120}
|
||||
|
||||
|
||||
@dataclass
|
||||
class PRComplexity:
|
||||
pr_number: int
|
||||
title: str
|
||||
files_changed: int
|
||||
additions: int
|
||||
deletions: int
|
||||
has_dependency_changes: bool
|
||||
test_coverage_delta: Optional[int]
|
||||
score: int
|
||||
estimated_minutes: int
|
||||
reasons: List[str]
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return asdict(self)
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
def __init__(self, token: str):
|
||||
self.token = token
|
||||
self.base_url = GITEA_BASE.rstrip("/")
|
||||
|
||||
def _request(self, path: str, params: Dict = None) -> Any:
|
||||
url = f"{self.base_url}{path}"
|
||||
if params:
|
||||
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
|
||||
url += f"?{qs}"
|
||||
|
||||
req = urllib.request.Request(url)
|
||||
req.add_header("Authorization", f"token {self.token}")
|
||||
req.add_header("Content-Type", "application/json")
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
except urllib.error.HTTPError as e:
|
||||
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
|
||||
return None
|
||||
except urllib.error.URLError as e:
|
||||
print(f"Network error: {e}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
def get_open_prs(self, org: str, repo: str) -> List[Dict]:
|
||||
prs = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"})
|
||||
if not batch:
|
||||
break
|
||||
prs.extend(batch)
|
||||
if len(batch) < 50:
|
||||
break
|
||||
page += 1
|
||||
return prs
|
||||
|
||||
def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]:
|
||||
files = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = self._request(
|
||||
f"/repos/{org}/{repo}/pulls/{pr_number}/files",
|
||||
{"limit": 100, "page": page}
|
||||
)
|
||||
if not batch:
|
||||
break
|
||||
files.extend(batch)
|
||||
if len(batch) < 100:
|
||||
break
|
||||
page += 1
|
||||
return files
|
||||
|
||||
def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool:
|
||||
data = json.dumps({"body": body}).encode("utf-8")
|
||||
req = urllib.request.Request(
|
||||
f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments",
|
||||
data=data,
|
||||
method="POST",
|
||||
headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"}
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return resp.status in (200, 201)
|
||||
except urllib.error.HTTPError:
|
||||
return False
|
||||
|
||||
|
||||
def is_dependency_file(filename: str) -> bool:
|
||||
return any(filename.endswith(dep) for dep in DEPENDENCY_FILES)
|
||||
|
||||
|
||||
def is_test_file(filename: str) -> bool:
|
||||
return any(re.search(pattern, filename) for pattern in TEST_PATTERNS)
|
||||
|
||||
|
||||
def score_pr(
|
||||
files_changed: int,
|
||||
additions: int,
|
||||
deletions: int,
|
||||
has_dependency_changes: bool,
|
||||
test_coverage_delta: Optional[int] = None
|
||||
) -> tuple[int, int, List[str]]:
|
||||
score = 1.0
|
||||
reasons = []
|
||||
|
||||
# Files changed
|
||||
if files_changed <= SMALL_FILES:
|
||||
fscore = 1.0
|
||||
reasons.append("small number of files changed")
|
||||
elif files_changed <= MEDIUM_FILES:
|
||||
fscore = 2.0
|
||||
reasons.append("moderate number of files changed")
|
||||
elif files_changed <= LARGE_FILES:
|
||||
fscore = 2.5
|
||||
reasons.append("large number of files changed")
|
||||
else:
|
||||
fscore = 3.0
|
||||
reasons.append("very large PR spanning many files")
|
||||
|
||||
# Lines changed
|
||||
total_lines = additions + deletions
|
||||
if total_lines <= SMALL_LINES:
|
||||
lscore = 1.0
|
||||
reasons.append("small change size")
|
||||
elif total_lines <= MEDIUM_LINES:
|
||||
lscore = 2.0
|
||||
reasons.append("moderate change size")
|
||||
elif total_lines <= LARGE_LINES:
|
||||
lscore = 3.0
|
||||
reasons.append("large change size")
|
||||
else:
|
||||
lscore = 4.0
|
||||
reasons.append("very large change")
|
||||
|
||||
# Dependency changes
|
||||
if has_dependency_changes:
|
||||
dscore = 2.5
|
||||
reasons.append("dependency changes (architectural impact)")
|
||||
else:
|
||||
dscore = 0.0
|
||||
|
||||
# Test coverage delta
|
||||
tscore = 0.0
|
||||
if test_coverage_delta is not None:
|
||||
if test_coverage_delta > 0:
|
||||
reasons.append(f"test additions (+{test_coverage_delta} test files)")
|
||||
tscore = -min(2.0, test_coverage_delta / 2.0)
|
||||
elif test_coverage_delta < 0:
|
||||
reasons.append(f"test removals ({abs(test_coverage_delta)} test files)")
|
||||
tscore = min(2.0, abs(test_coverage_delta) * 0.5)
|
||||
else:
|
||||
reasons.append("test coverage change not assessed")
|
||||
|
||||
# Weighted sum, scaled by 3 to use full 1-10 range
|
||||
bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV)
|
||||
scaled_bonus = bonus * 3.0
|
||||
score = 1.0 + scaled_bonus
|
||||
|
||||
final_score = max(1, min(10, int(round(score))))
|
||||
est_minutes = TIME_PER_POINT.get(final_score, 30)
|
||||
|
||||
return final_score, est_minutes, reasons
|
||||
|
||||
|
||||
def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity:
|
||||
pr_num = pr_data["number"]
|
||||
title = pr_data.get("title", "")
|
||||
files = client.get_pr_files(org, repo, pr_num)
|
||||
|
||||
additions = sum(f.get("additions", 0) for f in files)
|
||||
deletions = sum(f.get("deletions", 0) for f in files)
|
||||
filenames = [f.get("filename", "") for f in files]
|
||||
|
||||
has_deps = any(is_dependency_file(f) for f in filenames)
|
||||
|
||||
test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", "")))
|
||||
test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", "")))
|
||||
test_delta = test_added - test_removed if (test_added or test_removed) else None
|
||||
|
||||
score, est_min, reasons = score_pr(
|
||||
files_changed=len(files),
|
||||
additions=additions,
|
||||
deletions=deletions,
|
||||
has_dependency_changes=has_deps,
|
||||
test_coverage_delta=test_delta
|
||||
)
|
||||
|
||||
return PRComplexity(
|
||||
pr_number=pr_num,
|
||||
title=title,
|
||||
files_changed=len(files),
|
||||
additions=additions,
|
||||
deletions=deletions,
|
||||
has_dependency_changes=has_deps,
|
||||
test_coverage_delta=test_delta,
|
||||
score=score,
|
||||
estimated_minutes=est_min,
|
||||
reasons=reasons
|
||||
)
|
||||
|
||||
|
||||
def build_comment(complexity: PRComplexity) -> str:
|
||||
change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines"
|
||||
deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else ""
|
||||
test_note = ""
|
||||
if complexity.test_coverage_delta is not None:
|
||||
if complexity.test_coverage_delta > 0:
|
||||
test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added"
|
||||
elif complexity.test_coverage_delta < 0:
|
||||
test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed"
|
||||
|
||||
comment = f"## 📊 PR Complexity Analysis\n\n"
|
||||
comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n"
|
||||
comment += f"| Metric | Value |\n|--------|-------|\n"
|
||||
comment += f"| Changes | {change_desc} |\n"
|
||||
comment += f"| Complexity Score | **{complexity.score}/10** |\n"
|
||||
comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n"
|
||||
comment += f"### Scoring rationale:"
|
||||
for r in complexity.reasons:
|
||||
comment += f"\n- {r}"
|
||||
if deps_note:
|
||||
comment += deps_note
|
||||
if test_note:
|
||||
comment += test_note
|
||||
comment += f"\n\n---\n"
|
||||
comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*"
|
||||
return comment
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="PR Complexity Scorer")
|
||||
parser.add_argument("--org", default="Timmy_Foundation")
|
||||
parser.add_argument("--repo", default="compounding-intelligence")
|
||||
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
|
||||
parser.add_argument("--dry-run", action="store_true")
|
||||
parser.add_argument("--apply", action="store_true")
|
||||
parser.add_argument("--output", default="metrics/pr_complexity.json")
|
||||
args = parser.parse_args()
|
||||
|
||||
token_path = args.token
|
||||
if os.path.exists(token_path):
|
||||
with open(token_path) as f:
|
||||
token = f.read().strip()
|
||||
else:
|
||||
token = args.token
|
||||
|
||||
if not token:
|
||||
print("ERROR: No Gitea token provided", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
client = GiteaClient(token)
|
||||
|
||||
print(f"Fetching open PRs for {args.org}/{args.repo}...")
|
||||
prs = client.get_open_prs(args.org, args.repo)
|
||||
if not prs:
|
||||
print("No open PRs found.")
|
||||
sys.exit(0)
|
||||
|
||||
print(f"Found {len(prs)} open PR(s). Analyzing...")
|
||||
|
||||
results = []
|
||||
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for pr in prs:
|
||||
pr_num = pr["number"]
|
||||
title = pr.get("title", "")
|
||||
print(f" Analyzing PR #{pr_num}: {title[:60]}")
|
||||
|
||||
try:
|
||||
complexity = analyze_pr(client, args.org, args.repo, pr)
|
||||
results.append(complexity.to_dict())
|
||||
|
||||
comment = build_comment(complexity)
|
||||
|
||||
if args.dry_run:
|
||||
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]")
|
||||
elif args.apply:
|
||||
success = client.post_comment(args.org, args.repo, pr_num, comment)
|
||||
status = "[commented]" if success else "[FAILED]"
|
||||
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}")
|
||||
else:
|
||||
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]")
|
||||
|
||||
except Exception as e:
|
||||
print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr)
|
||||
|
||||
with open(args.output, "w") as f:
|
||||
json.dump({
|
||||
"org": args.org,
|
||||
"repo": args.repo,
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"pr_count": len(results),
|
||||
"results": results
|
||||
}, f, indent=2)
|
||||
|
||||
if results:
|
||||
scores = [r["score"] for r in results]
|
||||
print(f"\nResults saved to {args.output}")
|
||||
print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}")
|
||||
else:
|
||||
print("\nNo results to save.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,170 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for PR Complexity Scorer — unit tests for the scoring logic.
|
||||
"""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
|
||||
from pr_complexity_scorer import (
|
||||
score_pr,
|
||||
is_dependency_file,
|
||||
is_test_file,
|
||||
TIME_PER_POINT,
|
||||
SMALL_FILES,
|
||||
MEDIUM_FILES,
|
||||
LARGE_FILES,
|
||||
SMALL_LINES,
|
||||
MEDIUM_LINES,
|
||||
LARGE_LINES,
|
||||
)
|
||||
|
||||
PASS = 0
|
||||
FAIL = 0
|
||||
|
||||
def test(name):
|
||||
def decorator(fn):
|
||||
global PASS, FAIL
|
||||
try:
|
||||
fn()
|
||||
PASS += 1
|
||||
print(f" [PASS] {name}")
|
||||
except AssertionError as e:
|
||||
FAIL += 1
|
||||
print(f" [FAIL] {name}: {e}")
|
||||
except Exception as e:
|
||||
FAIL += 1
|
||||
print(f" [FAIL] {name}: Unexpected error: {e}")
|
||||
return decorator
|
||||
|
||||
def assert_eq(a, b, msg=""):
|
||||
if a != b:
|
||||
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
|
||||
|
||||
def assert_true(v, msg=""):
|
||||
if not v:
|
||||
raise AssertionError(msg or "Expected True")
|
||||
|
||||
def assert_false(v, msg=""):
|
||||
if v:
|
||||
raise AssertionError(msg or "Expected False")
|
||||
|
||||
|
||||
print("=== PR Complexity Scorer Tests ===\n")
|
||||
|
||||
print("-- File Classification --")
|
||||
|
||||
@test("dependency file detection — requirements.txt")
|
||||
def _():
|
||||
assert_true(is_dependency_file("requirements.txt"))
|
||||
assert_true(is_dependency_file("src/requirements.txt"))
|
||||
assert_false(is_dependency_file("requirements_test.txt"))
|
||||
|
||||
@test("dependency file detection — pyproject.toml")
|
||||
def _():
|
||||
assert_true(is_dependency_file("pyproject.toml"))
|
||||
assert_false(is_dependency_file("myproject.py"))
|
||||
|
||||
@test("test file detection — pytest style")
|
||||
def _():
|
||||
assert_true(is_test_file("tests/test_api.py"))
|
||||
assert_true(is_test_file("test_module.py"))
|
||||
assert_true(is_test_file("src/module_test.py"))
|
||||
|
||||
@test("test file detection — other frameworks")
|
||||
def _():
|
||||
assert_true(is_test_file("spec/feature_spec.rb"))
|
||||
assert_true(is_test_file("__tests__/component.test.js"))
|
||||
assert_false(is_test_file("testfixtures/helper.py"))
|
||||
|
||||
|
||||
print("\n-- Scoring Logic --")
|
||||
|
||||
@test("small PR gets low score (1-3)")
|
||||
def _():
|
||||
score, minutes, _ = score_pr(
|
||||
files_changed=3,
|
||||
additions=50,
|
||||
deletions=10,
|
||||
has_dependency_changes=False,
|
||||
test_coverage_delta=None
|
||||
)
|
||||
assert_true(1 <= score <= 3, f"Score should be low, got {score}")
|
||||
assert_true(minutes < 20)
|
||||
|
||||
@test("medium PR gets medium score (4-6)")
|
||||
def _():
|
||||
score, minutes, _ = score_pr(
|
||||
files_changed=15,
|
||||
additions=400,
|
||||
deletions=100,
|
||||
has_dependency_changes=False,
|
||||
test_coverage_delta=None
|
||||
)
|
||||
assert_true(4 <= score <= 6, f"Score should be medium, got {score}")
|
||||
assert_true(20 <= minutes <= 45)
|
||||
|
||||
@test("large PR gets high score (7-9)")
|
||||
def _():
|
||||
score, minutes, _ = score_pr(
|
||||
files_changed=60,
|
||||
additions=3000,
|
||||
deletions=1500,
|
||||
has_dependency_changes=True,
|
||||
test_coverage_delta=None
|
||||
)
|
||||
assert_true(7 <= score <= 9, f"Score should be high, got {score}")
|
||||
assert_true(minutes >= 45)
|
||||
|
||||
@test("dependency changes boost score")
|
||||
def _():
|
||||
base_score, _, _ = score_pr(
|
||||
files_changed=10, additions=200, deletions=50,
|
||||
has_dependency_changes=False, test_coverage_delta=None
|
||||
)
|
||||
dep_score, _, _ = score_pr(
|
||||
files_changed=10, additions=200, deletions=50,
|
||||
has_dependency_changes=True, test_coverage_delta=None
|
||||
)
|
||||
assert_true(dep_score > base_score, f"Deps: {base_score} -> {dep_score}")
|
||||
|
||||
@test("adding tests lowers complexity")
|
||||
def _():
|
||||
base_score, _, _ = score_pr(
|
||||
files_changed=8, additions=150, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=None
|
||||
)
|
||||
better_score, _, _ = score_pr(
|
||||
files_changed=8, additions=180, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=3
|
||||
)
|
||||
assert_true(better_score < base_score, f"Tests: {base_score} -> {better_score}")
|
||||
|
||||
@test("removing tests increases complexity")
|
||||
def _():
|
||||
base_score, _, _ = score_pr(
|
||||
files_changed=8, additions=150, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=None
|
||||
)
|
||||
worse_score, _, _ = score_pr(
|
||||
files_changed=8, additions=150, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=-2
|
||||
)
|
||||
assert_true(worse_score > base_score, f"Remove tests: {base_score} -> {worse_score}")
|
||||
|
||||
@test("score bounded 1-10")
|
||||
def _():
|
||||
for files, adds, dels in [(1, 10, 5), (100, 10000, 5000)]:
|
||||
score, _, _ = score_pr(files, adds, dels, False, None)
|
||||
assert_true(1 <= score <= 10, f"Score {score} out of range")
|
||||
|
||||
@test("estimated minutes exist for all scores")
|
||||
def _():
|
||||
for s in range(1, 11):
|
||||
assert_true(s in TIME_PER_POINT, f"Missing time for score {s}")
|
||||
|
||||
|
||||
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
|
||||
sys.exit(0 if FAIL == 0 else 1)
|
||||
222
tests/test_linter_runner.py
Normal file
222
tests/test_linter_runner.py
Normal file
@@ -0,0 +1,222 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Tests for linter_runner module (Issue #155).
|
||||
|
||||
Tests cover:
|
||||
- Language detection by file extension
|
||||
- Linter result aggregation
|
||||
- Violation parsing (JSON output formats)
|
||||
- Exit code logic (fail-on)
|
||||
- Report formatting (console/JSON)
|
||||
"""
|
||||
import json
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
# Add scripts to path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
|
||||
|
||||
from linter_runner import (
|
||||
Violation,
|
||||
LinterResult,
|
||||
detect_languages,
|
||||
parse_linter_output,
|
||||
_result_to_dict,
|
||||
EXTENSION_TO_LANGUAGE,
|
||||
LINTERS_BY_LANGUAGE,
|
||||
)
|
||||
|
||||
|
||||
class TestLanguageDetection:
|
||||
"""Test detect_languages() identifies languages correctly."""
|
||||
|
||||
def test_detects_python_files(self, tmp_path: Path):
|
||||
(tmp_path / "main.py").write_text("print('hello')")
|
||||
(tmp_path / "lib" / "utils.py").mkdir(parents=True)
|
||||
(tmp_path / "lib" / "utils.py").write_text("def foo(): pass")
|
||||
|
||||
result = detect_languages(tmp_path)
|
||||
assert "python" in result
|
||||
assert len(result["python"]) == 2
|
||||
|
||||
def test_detects_javascript_files(self, tmp_path: Path):
|
||||
(tmp_path / "app.js").write_text("console.log('hi')")
|
||||
(tmp_path / "component.jsx").write_text("<div/>")
|
||||
|
||||
result = detect_languages(tmp_path)
|
||||
assert "javascript" in result
|
||||
assert len(result["javascript"]) == 2
|
||||
|
||||
def test_detects_shell_files(self, tmp_path: Path):
|
||||
(tmp_path / "setup.sh").write_text("#!/bin/bash\necho hi")
|
||||
(tmp_path / "build.sh").write_text("make")
|
||||
|
||||
result = detect_languages(tmp_path)
|
||||
assert "shell" in result
|
||||
assert len(result["shell"]) == 2
|
||||
|
||||
def test_detects_yaml_files(self, tmp_path: Path):
|
||||
(tmp_path / "config.yml").write_text("key: value")
|
||||
(tmp_path / "env.yaml").write_text("env: test")
|
||||
|
||||
result = detect_languages(tmp_path)
|
||||
assert "yaml" in result
|
||||
assert len(result["yaml"]) == 2
|
||||
|
||||
def test_ignores_git_directory(self, tmp_path: Path):
|
||||
git_dir = tmp_path / ".git"
|
||||
git_dir.mkdir()
|
||||
(git_dir / "config").write_text("placeholder")
|
||||
(tmp_path / "script.py").write_text("print(1)")
|
||||
|
||||
result = detect_languages(tmp_path)
|
||||
assert "python" in result
|
||||
assert not any(".git" in str(f) for f in result.get("python", []))
|
||||
|
||||
def test_returns_empty_for_nonexistent_path(self):
|
||||
result = detect_languages(Path("/nonexistent/path/xyz"))
|
||||
assert result == {}
|
||||
|
||||
def test_mixed_languages(self, tmp_path: Path):
|
||||
(tmp_path / "app.py").write_text("")
|
||||
(tmp_path / "main.js").write_text("")
|
||||
(tmp_path / "deploy.sh").write_text("")
|
||||
|
||||
result = detect_languages(tmp_path)
|
||||
langs = set(result.keys())
|
||||
assert {"python", "javascript", "shell"} <= langs
|
||||
|
||||
def test_limits_files_to_known_languages(self, tmp_path: Path):
|
||||
(tmp_path / "readme.txt").write_text("text")
|
||||
(tmp_path / "data.csv").write_text("a,b,c")
|
||||
|
||||
result = detect_languages(tmp_path)
|
||||
assert len(result) == 0
|
||||
|
||||
|
||||
class TestViolationParsing:
|
||||
"""Test parse_linter_output parses various linter formats."""
|
||||
|
||||
def test_parses_pylint_json(self):
|
||||
stdout = json.dumps([
|
||||
{"type": "error", "module": "test.py", "line": 10, "column": 5,
|
||||
"message": "Missing docstring", "symbol": "missing-docstring"},
|
||||
{"type": "warning", "module": "test.py", "line": 15, "column": 1,
|
||||
"message": "Line too long", "symbol": "line-too-long"},
|
||||
])
|
||||
violations = parse_linter_output("pylint", stdout, "", Path("/repo"))
|
||||
assert len(violations) == 2
|
||||
assert violations[0].severity == "error"
|
||||
assert violations[0].message == "Missing docstring"
|
||||
assert violations[1].severity == "warning"
|
||||
assert violations[1].code == "line-too-long"
|
||||
|
||||
def test_parses_ruff_json(self):
|
||||
stdout = json.dumps([
|
||||
{"filename": "src/main.py", "location": {"row": 5, "column": 1},
|
||||
"code": "E501", "message": "Line too long"},
|
||||
])
|
||||
violations = parse_linter_output("ruff", stdout, "", Path("/repo"))
|
||||
assert len(violations) == 1
|
||||
assert violations[0].file == "src/main.py"
|
||||
assert violations[0].line == 5
|
||||
assert violations[0].code == "E501"
|
||||
|
||||
def test_parses_eslint_json(self):
|
||||
stdout = json.dumps([
|
||||
{"fileName": "app.js", "range": {"start": {"line": 2, "column": 0}},
|
||||
"message": "Unexpected console statement", "severity": 2, "ruleId": "no-console"},
|
||||
])
|
||||
violations = parse_linter_output("eslint", stdout, "", Path("/repo"))
|
||||
assert len(violations) == 1
|
||||
assert violations[0].severity == "error"
|
||||
assert violations[0].code == "no-console"
|
||||
|
||||
def test_parses_shellcheck_json1(self):
|
||||
stdout = json.dumps({
|
||||
"issues": [
|
||||
{"file": "script.sh", "line": 3, "column": 1,
|
||||
"message": "Quote this to prevent word splitting", "level": "warning", "code": "SC2086"},
|
||||
]
|
||||
})
|
||||
violations = parse_linter_output("shellcheck", stdout, "", Path("/repo"))
|
||||
assert len(violations) == 1
|
||||
assert violations[0].severity == "warning"
|
||||
assert violations[0].code == "SC2086"
|
||||
|
||||
def test_parses_yamllint_parsable(self):
|
||||
stdout = "config.yaml:3:1: [error] wrong document start (document-start)\n"
|
||||
violations = parse_linter_output("yamllint", stdout, "", Path("/repo"))
|
||||
assert len(violations) == 1
|
||||
assert violations[0].file == "config.yaml"
|
||||
assert violations[0].line == 3
|
||||
assert violations[0].severity == "error"
|
||||
assert violations[0].code == "document-start"
|
||||
|
||||
def test_returns_empty_on_invalid_json(self):
|
||||
stdout = "Not valid JSON"
|
||||
violations = parse_linter_output("pylint", stdout, "", Path("/repo"))
|
||||
assert violations == []
|
||||
|
||||
def test_strips_leading_slash_from_paths(self):
|
||||
stdout = json.dumps([{"type": "error", "module": "/repo/src/test.py",
|
||||
"line": 1, "column": 1, "message": "test", "symbol": "T001"}])
|
||||
violations = parse_linter_output("pylint", stdout, "", Path("/repo"))
|
||||
assert violations[0].file == "src/test.py"
|
||||
|
||||
|
||||
class TestLinterResult:
|
||||
"""Test LinterResult and JSON serialization."""
|
||||
|
||||
def test_result_to_dict_roundtrip(self):
|
||||
v = Violation(file="test.py", line=10, column=5, message="msg",
|
||||
severity="error", linter="pylint", code="E001")
|
||||
r = LinterResult(linter_name="pylint", language="python", violations=[v])
|
||||
d = _result_to_dict(r)
|
||||
assert d["linter"] == "pylint"
|
||||
assert d["violations"][0]["file"] == "test.py"
|
||||
assert d["violations"][0]["code"] == "E001"
|
||||
|
||||
|
||||
class TestIntegration:
|
||||
"""End-to-end integration tests with temporary repos."""
|
||||
|
||||
def test_linter_runner_accepts_repo_path(self, tmp_path: Path):
|
||||
(tmp_path / "main.py").write_text("print('hello')")
|
||||
(tmp_path / "bad.py").write_text("import unused_module\nx=1")
|
||||
|
||||
from linter_runner import detect_languages, run_linters_for_language
|
||||
|
||||
langs = detect_languages(tmp_path)
|
||||
assert "python" in langs
|
||||
|
||||
result = run_linters_for_language("python", langs["python"][:1], tmp_path)
|
||||
assert result.language == "python"
|
||||
assert result.violations or result.error # either linter output or not-installed
|
||||
|
||||
def test_json_output_structure(self, tmp_path: Path):
|
||||
(tmp_path / "script.py").write_text("print(1)")
|
||||
|
||||
from linter_runner import detect_languages, run_linters_for_language, _result_to_dict
|
||||
|
||||
langs = detect_languages(tmp_path)
|
||||
if "python" not in langs:
|
||||
pytest.skip("No Python files detected")
|
||||
|
||||
result = run_linters_for_language("python", langs["python"], tmp_path)
|
||||
report = {
|
||||
"repo": tmp_path.name,
|
||||
"languages": {"python": _result_to_dict(result)},
|
||||
"summary": {
|
||||
"total_issues": len(result.violations),
|
||||
"errors": sum(1 for v in result.violations if v.severity == "error"),
|
||||
},
|
||||
}
|
||||
json.dumps(report) # should not raise
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Run: pytest tests/test_linter_runner.py -v")
|
||||
|
||||
Reference in New Issue
Block a user