diff --git a/scripts/linter_runner.py b/scripts/linter_runner.py new file mode 100644 index 0000000..f6657f1 --- /dev/null +++ b/scripts/linter_runner.py @@ -0,0 +1,530 @@ +#!/usr/bin/env python3 +""" +Linter Runner — detect languages and run linters across a repo. + +Acceptance criteria for #155: + [x] Detects language per repo + [x] Runs: pylint, eslint, shellcheck, etc. + [x] Collects violations (file, line, message, severity) + [x] Output: lint report per repo + +Usage: + python3 scripts/linter_runner.py --repo . + python3 scripts/linter_runner.py --all # Scan all repos in knowledge/repos/ + python3 scripts/linter_runner.py --repo . --format json # Machine-readable output + python3 scripts/linter_runner.py --repo . --fail-on error # Exit non-zero if errors found + +Output format (console): + === Lint Report: repo === + Python: 3 issues (1 error, 2 warnings) + Shell: 1 issue (1 error) + Total: 4 issues + +Output format (JSON): --format json + {"repo": "...", "issues": [...], "summary": {...}} +""" + +import argparse +import json +import os +import subprocess +import sys +import time +from dataclasses import dataclass, asdict +from pathlib import Path +from typing import Optional + +SCRIPT_DIR = Path(__file__).resolve().parent +REPO_ROOT = SCRIPT_DIR.parent + + +@dataclass +class Violation: + """A single lint violation.""" + file: str + line: Optional[int] + column: Optional[int] + message: str + severity: str # "error", "warning", "info" + linter: str + code: Optional[str] = None + + +@dataclass +class LinterResult: + """Result from running a single linter.""" + linter_name: str + language: str + violations: list[Violation] + timed_out: bool = False + error: Optional[str] = None + + +# --------------------------------------------------------------------------- +# Language detection +# --------------------------------------------------------------------------- + +EXTENSION_TO_LANGUAGE = { + ".py": "python", + ".js": "javascript", + ".ts": "typescript", + ".jsx": "javascript", + ".tsx": "typescript", + ".sh": "shell", + ".bash": "shell", + ".zsh": "shell", + ".yaml": "yaml", + ".yml": "yaml", + ".json": "json", + ".md": "markdown", + ".rb": "ruby", + ".go": "go", + ".rs": "rust", + ".c": "c", + ".cpp": "cpp", + ".h": "c", + ".java": "java", + ".php": "php", + ".swift": "swift", + ".kt": "kotlin", + ".scala": "scala", +} + +# Which linters to run per language, in order of preference +LINTERS_BY_LANGUAGE = { + "python": [ + ("pylint", ["pylint", "--output-format=json", "--reports=no"]), + ("ruff", ["ruff", "check", "--output-format=json"]), + ("flake8", ["flake8", "--format=json"]), + ], + "javascript": [ + ("eslint", ["eslint", "--format=json", "--max-warnings=0"]), + ], + "typescript": [ + ("eslint", ["eslint", "--format=json", "--max-warnings=0"]), + ], + "shell": [ + ("shellcheck", ["shellcheck", "--format=json1"]), + ], + "yaml": [ + ("yamllint", ["yamllint", "-f", "parsable"]), + ], + "json": [ + ("jsonlinter", ["python3", "-m", "json.tool"]), # Simple syntax check + ], + "markdown": [], # No linter yet + "ruby": [ + ("rubocop", ["rubocop", "--format", "json"]), + ], + "go": [ + ("golangci-lint", ["golangci-lint", "run", "--out-format", "json"]), + ], + "rust": [ + ("cargo clippy", ["cargo", "clippy", "--message-format=json"]), + ], +} + + +def detect_languages(repo_path: Path) -> dict[str, list[Path]]: + """ + Scan repo and return mapping: language -> list of file paths. + Only includes languages we have linters for.""" + language_files: dict[str, list[Path]] = {lang: [] for lang in LINTERS_BY_LANGUAGE.keys()} + + if not repo_path.exists(): + return language_files + + exclude_dirs = {".git", ".gitea", "node_modules", "__pycache__", ".venv", "venv", "build", "dist"} + + for root, dirs, files in os.walk(repo_path): + # Prune excluded dirs + dirs[:] = [d for d in dirs if d not in exclude_dirs] + + for fname in files: + file_path = Path(root) / fname + suffix = file_path.suffix.lower() + lang = EXTENSION_TO_LANGUAGE.get(suffix) + if lang and lang in LINTERS_BY_LANGUAGE and LINTERS_BY_LANGUAGE[lang]: + language_files[lang].append(file_path) + + # Remove empty languages + return {lang: files for lang, files in language_files.items() if files} + + +def find_linter_executable(name: str) -> Optional[str]: + """Find linter binary in PATH, return full path or None.""" + for path_dir in os.environ.get("PATH", "").split(os.pathsep): + candidate = Path(path_dir) / name + if candidate.exists(): + return str(candidate) + # Special handling for multi-word linters like "cargo clippy" + if " " in name: + primary = name.split()[0] + for path_dir in os.environ.get("PATH", "").split(os.pathsep): + candidate = Path(path_dir) / primary + if candidate.exists(): + return name # Return full command string + return None + + +def run_linter( + linter_name: str, + command_template: list[str], + files: list[Path], + repo_path: Path, +) -> LinterResult: + """ + Execute a linter on a set of files. + Returns LinterResult with violations or error. + """ + # Build command: [linter_bin, args..., files...] + # Most linters accept file paths as positional args at the end + cmd = [linter_name] if " " not in linter_name else linter_name.split() + cmd.extend(command_template[1:]) # Skip the duplicated linter name from template + + # Add file paths, relative to repo root for cleaner output + rel_files = [str(f.relative_to(repo_path)) for f in files] + cmd.extend(rel_files) + + try: + proc = subprocess.run( + cmd, + cwd=repo_path, + capture_output=True, + text=True, + timeout=60, + ) + except subprocess.TimeoutExpired: + return LinterResult( + linter_name=linter_name, + language="unknown", + violations=[], + timed_out=True, + error="Linter timed out after 60s", + ) + except FileNotFoundError: + return LinterResult( + linter_name=linter_name, + language="unknown", + violations=[], + error=f"Linter not found: {linter_name}", + ) + + # Parse output based on linter type + violations = parse_linter_output(linter_name, proc.stdout, proc.stderr, repo_path) + + return LinterResult( + linter_name=linter_name, + language=guess_language_for_linter(linter_name), + violations=violations, + error=proc.stderr.strip() if proc.returncode != 0 and not violations else None, + ) + + +def guess_language_for_linter(linter_name: str) -> str: + """Map linter name back to language category.""" + mapping = { + "pylint": "python", + "ruff": "python", + "flake8": "python", + "eslint": "javascript", + "shellcheck": "shell", + "yamllint": "yaml", + "jsonlinter": "json", + "rubocop": "ruby", + "golangci-lint": "go", + "cargo clippy": "rust", + } + return mapping.get(linter_name, "unknown") + + +def parse_linter_output( + linter_name: str, + stdout: str, + stderr: str, + repo_path: Path, +) -> list[Violation]: + """ + Parse linter output into Violation objects. + Supports JSON output (pylint, ruff, eslint, shellcheck json1, yamllint parsable). + """ + violations: list[Violation] = [] + + if linter_name in ("pylint", "ruff", "eslint"): + # JSON array output + try: + data = json.loads(stdout) + except json.JSONDecodeError: + return [] + + if linter_name == "pylint": + for msg in data: + violations.append(Violation( + file=msg.get("path", "").lstrip("./"), + line=msg.get("line"), + column=msg.get("column"), + message=msg.get("message", ""), + severity="error" if msg.get("type") == "error" else "warning", + linter=linter_name, + code=msg.get("symbol"), + )) + elif linter_name == "ruff": + for entry in data: + violations.append(Violation( + file=entry.get("filename", "").lstrip("./"), + line=entry.get("location", {}).get("row"), + column=entry.get("location", {}).get("column"), + message=entry.get("message", ""), + severity="error", # ruff treats all as errors + linter=linter_name, + code=entry.get("code"), + )) + elif linter_name == "eslint": + for entry in data: + violations.append(Violation( + file=entry.get("fileName", "").lstrip("./"), + line=entry.get("range", {}).get("start", {}).get("line"), + column=entry.get("range", {}).get("start", {}).get("column"), + message=entry.get("message", ""), + severity=entry.get("severity", 1) and "error" or "warning", + linter=linter_name, + code=entry.get("ruleId"), + )) + + elif linter_name == "shellcheck": + # shellcheck --format=json1 + try: + data = json.loads(stdout) + for issue in data.get("issues", []): + violations.append(Violation( + file=issue.get("file", "").lstrip("./"), + line=issue.get("line"), + column=issue.get("column"), + message=issue.get("message", ""), + severity="error" if issue.get("level") == "error" else "warning", + linter=linter_name, + code=str(issue.get("code")), + )) + except json.JSONDecodeError: + pass + + elif linter_name == "yamllint": + # parsable: file:line:col: level message [rule] + # Example: test.yaml:3:1: [error] wrong document start (document-start) + for line in stdout.splitlines(): + parts = line.split(":") + if len(parts) >= 4: + file_rel = parts[0].lstrip("./") + line_num = int(parts[1]) if parts[1].isdigit() else None + col_num = int(parts[2]) if parts[2].isdigit() else None + rest = ":".join(parts[3:]).strip() + # Parse: "[error] message (rule)" + import re + m = re.match(r'\[(\w+)\]\s+(.+?)(?:\s+\(([^)]+)\))?$', rest) + if m: + severity = m.group(1).lower() + message = m.group(2) + code = m.group(3) + violations.append(Violation( + file=file_rel, + line=line_num, + column=col_num, + message=message, + severity=severity, + linter=linter_name, + code=code, + )) + + elif linter_name == "jsonlinter": + # json.tool syntax check — no violations, just exit code + if proc.returncode != 0: + violations.append(Violation( + file="(multiple)", + line=None, + column=None, + message="JSON syntax error (run json.tool on each file individually)", + severity="error", + linter="json.tool", + )) + + return violations + + +def run_linters_for_language( + language: str, + files: list[Path], + repo_path: Path, +) -> LinterResult: + """ + Run the first available linter for this language. + Returns the first successful run, or aggregates all errors if none available. + """ + linter_options = LINTERS_BY_LANGUAGE.get(language, []) + if not linter_options: + return LinterResult(linter_name="none", language=language, violations=[], + error=f"No linter configured for {language}") + + for linter_name, cmd_template in linter_options: + # Check if linter exists + if not find_linter_executable(linter_name): + continue # Try next linter for this language + + result = run_linter(linter_name, cmd_template, files, repo_path) + if not result.error and not result.timed_out: + return result + # If this linter failed to start (not found), try next + if result.error and "not found" in result.error.lower(): + continue + + # All linters failed + errors = [] + for linter_name, _ in linter_options: + if find_linter_executable(linter_name): + errors.append(f"{linter_name}: not runnable") + else: + errors.append(f"{linter_name}: not installed") + return LinterResult( + linter_name="/".join(l[0] for l in linter_options), + language=language, + violations=[], + error="; ".join(errors), + ) + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def parse_args(): + p = argparse.ArgumentParser(description="Linter Runner for compounding-intelligence") + p.add_argument("--repo", type=str, help="Path to repository (absolute or relative)") + p.add_argument("--all", action="store_true", help="Scan all repos in knowledge/repos/") + p.add_argument("--format", choices=["console", "json"], default="console", + help="Output format (default: console)") + p.add_argument("--fail-on", choices=["error", "warning", "any"], default="error", + help="Exit non-zero if any violations at this level are found") + p.add_argument("--output", type=str, help="Write report to file (default: stdout)") + return p.parse_args() + + +def main(): + args = parse_args() + + if not args.repo and not args.all: + print("ERROR: Must specify --repo or --all", file=sys.stderr) + sys.exit(2) + + repos_to_scan = [] + if args.repo: + repos_to_scan.append(Path(args.repo).resolve()) + if args.all: + repos_dir = REPO_ROOT / "knowledge" / "repos" + if repos_dir.exists(): + for yaml_file in repos_dir.glob("*.yaml"): + # Extract repo name from filename + repos_to_scan.append(REPO_ROOT / yaml_file.stem) + else: + print(f"WARNING: knowledge/repos/ not found, --all has nothing to scan", file=sys.stderr) + + all_results: dict[str, dict] = {} + exit_code = 0 + + for repo_path in repos_to_scan: + if not repo_path.exists(): + print(f"WARNING: Repo not found: {repo_path}", file=sys.stderr) + continue + + repo_name = repo_path.name + print(f"\n=== Scanning: {repo_name} ===") if args.format == "console" else None + + lang_files = detect_languages(repo_path) + results_by_lang: dict[str, LinterResult] = {} + + for language, files in sorted(lang_files.items()): + # Limit files for sanity (first 200 for now) + if len(files) > 200: + print(f" {language}: {len(files)} files (limiting to first 200)", file=sys.stderr) + files = files[:200] + + result = run_linters_for_language(language, files, repo_path) + results_by_lang[language] = result + + if args.format == "console": + _print_language_result(language, result, repo_name) + else: + pass # JSON aggregation below + + # Build summary + total_issues = sum(len(r.violations) for r in results_by_lang.values()) + total_errors = sum(1 for v in (v for r in results_by_lang.values() for v in r.violations) + if v.severity == "error") + total_warnings = sum(1 for v in (v for r in results_by_lang.values() for v in r.violations) + if v.severity == "warning") + + if args.format == "console": + print(f" Summary: {total_issues} issues ({total_errors} errors, {total_warnings} warnings)") + else: + all_results[repo_name] = { + "languages": {lang: _result_to_dict(res) for lang, res in results_by_lang.items()}, + "summary": { + "total_issues": total_issues, + "errors": total_errors, + "warnings": total_warnings, + }, + } + + # Determine exit code based on --fail-on + if args.fail_on == "error" and total_errors > 0: + exit_code = 1 + elif args.fail_on == "warning" and total_issues > 0: + exit_code = 1 + elif args.fail_on == "any" and total_issues > 0: + exit_code = 1 + + if args.format == "json": + output = json.dumps({"repos": all_results, "meta": {"scanned": len(repos_to_scan)}}, indent=2) + if args.output: + Path(args.output).write_text(output) + else: + print(output) + + sys.exit(exit_code) + + +def _print_language_result(language: str, result: LinterResult, repo_name: str): + """Pretty-print a single language's lint results.""" + status = "✓" + if result.error: + status = "✗" + print(f" {language}: {result.error}") + elif result.timed_out: + status = "⌛" + print(f" {language}: timed out") + else: + n_violations = len(result.violations) + if n_violations == 0: + print(f" {language}: clean") + else: + errors = sum(1 for v in result.violations if v.severity == "error") + warnings = n_violations - errors + print(f" {language}: {n_violations} issues ({errors} errors, {warnings} warnings)") + # Show first 3 violations as preview + for v in result.violations[:3]: + loc = f"{v.file}:{v.line or '?'}" + print(f" {loc} [{v.severity.upper()}] {v.message[:70]}") + if len(result.violations) > 3: + print(f" ... and {len(result.violations) - 3} more") + + +def _result_to_dict(result: LinterResult) -> dict: + return { + "linter": result.linter_name, + "language": result.language, + "violations": [asdict(v) for v in result.violations], + "timed_out": result.timed_out, + "error": result.error, + } + + +if __name__ == "__main__": + main() diff --git a/tests/test_linter_runner.py b/tests/test_linter_runner.py new file mode 100644 index 0000000..fe3dea8 --- /dev/null +++ b/tests/test_linter_runner.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python3 +"""Tests for linter_runner module (Issue #155). + +Tests cover: +- Language detection by file extension +- Linter result aggregation +- Violation parsing (JSON output formats) +- Exit code logic (fail-on) +- Report formatting (console/JSON) +""" +import json +import sys +import tempfile +from pathlib import Path + +import pytest + +# Add scripts to path +sys.path.insert(0, str(Path(__file__).parent.parent / "scripts")) + +from linter_runner import ( + Violation, + LinterResult, + detect_languages, + parse_linter_output, + _result_to_dict, + EXTENSION_TO_LANGUAGE, + LINTERS_BY_LANGUAGE, +) + + +class TestLanguageDetection: + """Test detect_languages() identifies languages correctly.""" + + def test_detects_python_files(self, tmp_path: Path): + (tmp_path / "main.py").write_text("print('hello')") + (tmp_path / "lib" / "utils.py").mkdir(parents=True) + (tmp_path / "lib" / "utils.py").write_text("def foo(): pass") + + result = detect_languages(tmp_path) + assert "python" in result + assert len(result["python"]) == 2 + + def test_detects_javascript_files(self, tmp_path: Path): + (tmp_path / "app.js").write_text("console.log('hi')") + (tmp_path / "component.jsx").write_text("
") + + result = detect_languages(tmp_path) + assert "javascript" in result + assert len(result["javascript"]) == 2 + + def test_detects_shell_files(self, tmp_path: Path): + (tmp_path / "setup.sh").write_text("#!/bin/bash\necho hi") + (tmp_path / "build.sh").write_text("make") + + result = detect_languages(tmp_path) + assert "shell" in result + assert len(result["shell"]) == 2 + + def test_detects_yaml_files(self, tmp_path: Path): + (tmp_path / "config.yml").write_text("key: value") + (tmp_path / "env.yaml").write_text("env: test") + + result = detect_languages(tmp_path) + assert "yaml" in result + assert len(result["yaml"]) == 2 + + def test_ignores_git_directory(self, tmp_path: Path): + git_dir = tmp_path / ".git" + git_dir.mkdir() + (git_dir / "config").write_text("placeholder") + (tmp_path / "script.py").write_text("print(1)") + + result = detect_languages(tmp_path) + assert "python" in result + assert not any(".git" in str(f) for f in result.get("python", [])) + + def test_returns_empty_for_nonexistent_path(self): + result = detect_languages(Path("/nonexistent/path/xyz")) + assert result == {} + + def test_mixed_languages(self, tmp_path: Path): + (tmp_path / "app.py").write_text("") + (tmp_path / "main.js").write_text("") + (tmp_path / "deploy.sh").write_text("") + + result = detect_languages(tmp_path) + langs = set(result.keys()) + assert {"python", "javascript", "shell"} <= langs + + def test_limits_files_to_known_languages(self, tmp_path: Path): + (tmp_path / "readme.txt").write_text("text") + (tmp_path / "data.csv").write_text("a,b,c") + + result = detect_languages(tmp_path) + assert len(result) == 0 + + +class TestViolationParsing: + """Test parse_linter_output parses various linter formats.""" + + def test_parses_pylint_json(self): + stdout = json.dumps([ + {"type": "error", "module": "test.py", "line": 10, "column": 5, + "message": "Missing docstring", "symbol": "missing-docstring"}, + {"type": "warning", "module": "test.py", "line": 15, "column": 1, + "message": "Line too long", "symbol": "line-too-long"}, + ]) + violations = parse_linter_output("pylint", stdout, "", Path("/repo")) + assert len(violations) == 2 + assert violations[0].severity == "error" + assert violations[0].message == "Missing docstring" + assert violations[1].severity == "warning" + assert violations[1].code == "line-too-long" + + def test_parses_ruff_json(self): + stdout = json.dumps([ + {"filename": "src/main.py", "location": {"row": 5, "column": 1}, + "code": "E501", "message": "Line too long"}, + ]) + violations = parse_linter_output("ruff", stdout, "", Path("/repo")) + assert len(violations) == 1 + assert violations[0].file == "src/main.py" + assert violations[0].line == 5 + assert violations[0].code == "E501" + + def test_parses_eslint_json(self): + stdout = json.dumps([ + {"fileName": "app.js", "range": {"start": {"line": 2, "column": 0}}, + "message": "Unexpected console statement", "severity": 2, "ruleId": "no-console"}, + ]) + violations = parse_linter_output("eslint", stdout, "", Path("/repo")) + assert len(violations) == 1 + assert violations[0].severity == "error" + assert violations[0].code == "no-console" + + def test_parses_shellcheck_json1(self): + stdout = json.dumps({ + "issues": [ + {"file": "script.sh", "line": 3, "column": 1, + "message": "Quote this to prevent word splitting", "level": "warning", "code": "SC2086"}, + ] + }) + violations = parse_linter_output("shellcheck", stdout, "", Path("/repo")) + assert len(violations) == 1 + assert violations[0].severity == "warning" + assert violations[0].code == "SC2086" + + def test_parses_yamllint_parsable(self): + stdout = "config.yaml:3:1: [error] wrong document start (document-start)\n" + violations = parse_linter_output("yamllint", stdout, "", Path("/repo")) + assert len(violations) == 1 + assert violations[0].file == "config.yaml" + assert violations[0].line == 3 + assert violations[0].severity == "error" + assert violations[0].code == "document-start" + + def test_returns_empty_on_invalid_json(self): + stdout = "Not valid JSON" + violations = parse_linter_output("pylint", stdout, "", Path("/repo")) + assert violations == [] + + def test_strips_leading_slash_from_paths(self): + stdout = json.dumps([{"type": "error", "module": "/repo/src/test.py", + "line": 1, "column": 1, "message": "test", "symbol": "T001"}]) + violations = parse_linter_output("pylint", stdout, "", Path("/repo")) + assert violations[0].file == "src/test.py" + + +class TestLinterResult: + """Test LinterResult and JSON serialization.""" + + def test_result_to_dict_roundtrip(self): + v = Violation(file="test.py", line=10, column=5, message="msg", + severity="error", linter="pylint", code="E001") + r = LinterResult(linter_name="pylint", language="python", violations=[v]) + d = _result_to_dict(r) + assert d["linter"] == "pylint" + assert d["violations"][0]["file"] == "test.py" + assert d["violations"][0]["code"] == "E001" + + +class TestIntegration: + """End-to-end integration tests with temporary repos.""" + + def test_linter_runner_accepts_repo_path(self, tmp_path: Path): + (tmp_path / "main.py").write_text("print('hello')") + (tmp_path / "bad.py").write_text("import unused_module\nx=1") + + from linter_runner import detect_languages, run_linters_for_language + + langs = detect_languages(tmp_path) + assert "python" in langs + + result = run_linters_for_language("python", langs["python"][:1], tmp_path) + assert result.language == "python" + assert result.violations or result.error # either linter output or not-installed + + def test_json_output_structure(self, tmp_path: Path): + (tmp_path / "script.py").write_text("print(1)") + + from linter_runner import detect_languages, run_linters_for_language, _result_to_dict + + langs = detect_languages(tmp_path) + if "python" not in langs: + pytest.skip("No Python files detected") + + result = run_linters_for_language("python", langs["python"], tmp_path) + report = { + "repo": tmp_path.name, + "languages": {"python": _result_to_dict(result)}, + "summary": { + "total_issues": len(result.violations), + "errors": sum(1 for v in result.violations if v.severity == "error"), + }, + } + json.dumps(report) # should not raise + + +if __name__ == "__main__": + print("Run: pytest tests/test_linter_runner.py -v") +