Compare commits

..

1 Commits

Author SHA1 Message Date
STEP35 Claude Code
44607f8484 feat: add dependency freshness checker — issue #161
Some checks failed
Test / pytest (pull_request) Failing after 8s
Implements scripts/dependency_freshness.py which compares installed
dependencies against latest PyPI versions and flags packages that are
more than 2 major versions behind. Includes comprehensive tests in
scripts/test_dependency_freshness.py.

Closes #161
2026-04-26 09:58:30 -04:00
5 changed files with 450 additions and 424 deletions

View File

@@ -0,0 +1,271 @@
#!/usr/bin/env python3
"""dependency_freshness.py - Compare installed dependencies against latest PyPI versions.
Identify packages that are more than 2 major versions behind.
Outputs a human-readable report by default or JSON with --json flag.
"""
import argparse
import json
import subprocess
import sys
from packaging import version
from typing import Dict, List, Tuple
def parse_requirements(requirements_path: str) -> List[str]:
"""Parse package names from a requirements.txt file."""
packages = []
try:
with open(requirements_path, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
pkg_name = line
for delim in ['[', '>', '<', '=', '!', ';', '@']:
if delim in pkg_name:
pkg_name = pkg_name.split(delim)[0]
pkg_name = pkg_name.strip()
if pkg_name:
packages.append(pkg_name.lower())
except FileNotFoundError:
print(f"Warning: requirements file not found: {requirements_path}", file=sys.stderr)
return packages
def get_installed_packages() -> Dict[str, str]:
"""Get all installed packages via pip list --format=json."""
try:
result = subprocess.run(
[sys.executable, '-m', 'pip', 'list', '--format=json'],
capture_output=True, text=True, check=True
)
packages = json.loads(result.stdout)
return {pkg['name'].lower(): pkg['version'] for pkg in packages}
except subprocess.CalledProcessError as e:
print(f"Error running pip list: {e}", file=sys.stderr)
sys.exit(1)
except json.JSONDecodeError as e:
print(f"Error parsing pip output: {e}", file=sys.stderr)
sys.exit(1)
def get_outdated_packages() -> Dict[str, dict]:
"""Get outdated packages via pip list --outdated --format=json."""
try:
result = subprocess.run(
[sys.executable, '-m', 'pip', 'list', '--outdated', '--format=json'],
capture_output=True, text=True, check=True
)
outdated_list = json.loads(result.stdout)
outdated = {}
for pkg in outdated_list:
name = pkg['name'].lower()
outdated[name] = {
'installed': pkg.get('version', ''),
'latest': pkg.get('latest_version', ''),
'latest_filetype': pkg.get('latest_filetype', '')
}
return outdated
except subprocess.CalledProcessError as e:
print(f"Error running pip list --outdated: {e}", file=sys.stderr)
sys.exit(1)
except json.JSONDecodeError as e:
print(f"Error parsing pip outdated output: {e}", file=sys.stderr)
sys.exit(1)
def get_major_version(v: str) -> int:
"""Extract major version number from a version string."""
try:
parsed = version.parse(v)
if hasattr(parsed, 'major'):
return int(parsed.major)
parts = str(v).split('.')
if parts:
return int(parts[0])
except Exception:
pass
return 0
def is_more_than_two_majors_behind(installed_ver: str, latest_ver: str) -> bool:
"""Check if installed version is more than 2 major versions behind latest."""
try:
installed_major = get_major_version(installed_ver)
latest_major = get_major_version(latest_ver)
return (latest_major - installed_major) > 2
except Exception:
return False
def analyze_dependencies(
required_packages: List[str],
installed_packages: Dict[str, str],
outdated_packages: Dict[str, dict]
) -> Tuple[List[dict], List[str], List[dict]]:
"""Analyze dependency freshness."""
very_outdated = []
missing = []
outdated_but_not_critical = []
for pkg in required_packages:
if pkg not in installed_packages:
missing.append(pkg)
continue
installed_ver = installed_packages[pkg]
if pkg not in outdated_packages:
continue
latest_ver = outdated_packages[pkg]['latest']
if is_more_than_two_majors_behind(installed_ver, latest_ver):
very_outdated.append({
'package': pkg,
'installed': installed_ver,
'latest': latest_ver,
'major_diff': get_major_version(latest_ver) - get_major_version(installed_ver)
})
else:
outdated_but_not_critical.append({
'package': pkg,
'installed': installed_ver,
'latest': latest_ver,
'major_diff': get_major_version(latest_ver) - get_major_version(installed_ver)
})
return very_outdated, missing, outdated_but_not_critical
def generate_human_report(
very_outdated: List[dict],
missing: List[str],
outdated_but_not_critical: List[dict],
requirements_path: str
) -> str:
"""Generate a human-readable staleness report."""
lines = []
lines.append("=" * 60)
lines.append("DEPENDENCY FRESHNESS REPORT")
lines.append("=" * 60)
lines.append(f"Requirements file: {requirements_path}")
total = len(very_outdated) + len(missing) + len(outdated_but_not_critical)
lines.append(f"Total dependencies checked: {total}")
lines.append(f"Very outdated (>2 major versions behind): {len(very_outdated)}")
lines.append(f"Outdated but within 2 major versions: {len(outdated_but_not_critical)}")
lines.append(f"Missing (not installed): {len(missing)}")
lines.append("")
if very_outdated:
lines.append("!!! VERY OUTDATED PACKAGES (consider updating):")
lines.append("-" * 60)
for pkg_info in very_outdated:
lines.append(f" {pkg_info['package']}")
lines.append(f" Installed: {pkg_info['installed']}")
lines.append(f" Latest: {pkg_info['latest']}")
lines.append(f" Major diff: {pkg_info['major_diff']}")
lines.append("")
else:
lines.append("✓ No packages more than 2 major versions behind.")
lines.append("")
if outdated_but_not_critical:
lines.append(f"Outdated packages (within 2 major versions):")
lines.append("-" * 60)
for pkg_info in outdated_but_not_critical:
lines.append(f" {pkg_info['package']}: {pkg_info['installed']} -> {pkg_info['latest']} (major diff: {pkg_info['major_diff']})")
lines.append("")
if missing:
lines.append(f"Missing packages (not installed):")
lines.append("-" * 60)
for pkg in missing:
lines.append(f" {pkg}")
lines.append("")
lines.append("=" * 60)
lines.append("For full details, run: python3 -m pip list --outdated")
lines.append("=" * 60)
return "\n".join(lines)
def generate_json_report(
very_outdated: List[dict],
missing: List[str],
outdated_but_not_critical: List[dict],
requirements_path: str
) -> str:
"""Generate a JSON staleness report."""
report = {
'requirements_file': requirements_path,
'summary': {
'total_dependencies': len(very_outdated) + len(missing) + len(outdated_but_not_critical),
'very_outdated_count': len(very_outdated),
'outdated_within_threshold_count': len(outdated_but_not_critical),
'missing_count': len(missing)
},
'very_outdated': very_outdated,
'outdated_within_threshold': outdated_but_not_critical,
'missing': missing
}
return json.dumps(report, indent=2)
def main():
parser = argparse.ArgumentParser(
description='Check dependency freshness against PyPI latest versions.'
)
parser.add_argument(
'--requirements', '-r',
default='requirements.txt',
help='Path to requirements.txt file (default: requirements.txt)'
)
parser.add_argument(
'--json',
action='store_true',
help='Output report as JSON instead of human-readable text'
)
parser.add_argument(
'--output', '-o',
help='Optional output file for the report (default: stdout)'
)
args = parser.parse_args()
# Parse requirements
required_packages = parse_requirements(args.requirements)
if not required_packages:
print("No packages found in requirements file.", file=sys.stderr)
sys.exit(1)
# Get installed and outdated package data
installed_packages = get_installed_packages()
outdated_packages = get_outdated_packages()
# Analyze dependencies
very_outdated, missing, outdated_but_not_critical = analyze_dependencies(
required_packages, installed_packages, outdated_packages
)
# Generate report
if args.json:
report = generate_json_report(very_outdated, missing, outdated_but_not_critical, args.requirements)
else:
report = generate_human_report(very_outdated, missing, outdated_but_not_critical, args.requirements)
# Output report
if args.output:
with open(args.output, 'w') as f:
f.write(report + '\n')
else:
print(report)
# Exit code: 0 if no very outdated deps, 1 otherwise
exit_code = 1 if very_outdated else 0
sys.exit(exit_code)
if __name__ == '__main__':
main()

View File

@@ -1,185 +0,0 @@
#!/usr/bin/env python3
"""
Review Comment Generator — Issue #126
Reads JSONL findings, deduplicates, posts as Gitea PR comments.
"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import sys
import urllib.request
import urllib.error
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional
SCRIPT_DIR = Path(__file__).resolve().parent
REPO_ROOT = SCRIPT_DIR.parent
DEFAULT_API_BASE = os.environ.get(
"GITEA_API_BASE",
"https://forge.alexanderwhitestone.com"
)
TOKEN_PATHS = [
os.path.expanduser("~/.config/gitea/token"),
os.path.expanduser("~/.hermes/gitea.token"),
os.environ.get("GITEA_TOKEN", ""),
]
def load_token() -> Optional[str]:
token = os.environ.get("GITEA_TOKEN", "")
if token:
return token
for path in TOKEN_PATHS:
if path and os.path.exists(path):
with open(path) as f:
t = f.read().strip()
if t:
return t
return None
class GiteaClient:
def __init__(self, base_url: str, token: str, org: str, repo: str):
self.base_url = base_url.rstrip("/")
self.token = token
self.org = org
self.repo = repo
def _post(self, path: str, data: Dict) -> Optional[Dict]:
url = f"{self.base_url}/api/v1{path}"
body = json.dumps(data).encode("utf-8")
req = urllib.request.Request(url, data=body, method="POST")
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
err = e.read().decode() if e.read() else str(e)
print(f"[ERROR] HTTP {e.code}: {err}", file=sys.stderr)
return None
except Exception as e:
print(f"[ERROR] {e}", file=sys.stderr)
return None
def post_issue_comment(self, issue_num: int, body: str) -> Optional[Dict]:
return self._post(
f"/repos/{self.org}/{self.repo}/issues/{issue_num}/comments",
{"body": body}
)
def content_hash(finding: Dict) -> str:
key = f"{finding['file']}:{finding['line']}:{finding['text']}"
return hashlib.sha256(key.encode("utf-8")).hexdigest()
def format_comment(finding: Dict) -> str:
emoji = {
"error": "🛑",
"warning": "⚠️",
"info": "",
}.get(finding.get("severity", ""), "📝")
f = finding["file"]
ln = finding["line"]
txt = finding["text"]
return f"{emoji} **Review Comment**\n\nFile: `{f}`\nLine: {ln}\n\n> {txt}\n"
def load_findings(path: Optional[Path], from_stdin: bool) -> List[Dict]:
import fileinput
findings = []
sources = ["-"] if from_stdin else [str(path)]
for line in fileinput.input(files=sources):
line = line.strip()
if not line or line.startswith("#"):
continue
try:
f = json.loads(line)
for key in ("file", "line", "text"):
if key not in f:
raise ValueError(f"Missing key: {key}")
findings.append(f)
except json.JSONDecodeError as e:
print(f"WARNING: Skipping invalid JSON: {e}", file=sys.stderr)
return findings
def main() -> int:
parser = argparse.ArgumentParser(
description="Post review findings as comments to a Gitea PR/issue"
)
parser.add_argument("--pr", type=int, required=True, help="PR/issue number")
parser.add_argument("--org", default="Timmy_Foundation", help="Gitea org")
parser.add_argument("--repo", default="compounding-intelligence", help="Repo name")
parser.add_argument("--api-base", default=DEFAULT_API_BASE, help="Gitea API base")
parser.add_argument("--token", default=None, help="API token (or env/file)")
parser.add_argument("--input", type=Path, default=None, help="JSONL input file")
parser.add_argument("--stdin", action="store_true", help="Read from stdin")
parser.add_argument("--dry-run", action="store_true", help="Show without posting")
parser.add_argument("--json", action="store_true", help="Emit JSON report")
args = parser.parse_args()
if not args.stdin and args.input is None:
print("ERROR: --input or --stdin required", file=sys.stderr)
return 1
if args.stdin and args.input:
print("ERROR: --stdin and --input exclusive", file=sys.stderr)
return 1
token = args.token or load_token()
if not token:
print("ERROR: Token not found. Set GITEA_TOKEN or ~/.config/gitea/token", file=sys.stderr)
return 1
findings = load_findings(args.input, args.stdin)
if not findings:
print("ERROR: No findings loaded", file=sys.stderr)
return 1
if not args.json: print(f"Loaded {len(findings)} finding(s)")
seen: Dict[str, Dict] = {}
for f in findings:
h = content_hash(f)
if h not in seen:
seen[h] = f
unique = list(seen.values())
if not args.json: print(f"After dedup: {len(unique)} unique")
if args.json:
report = {
"total": len(findings),
"unique": len(unique),
"findings": unique,
"generated_at": datetime.now(timezone.utc).isoformat(),
}
print(json.dumps(report, indent=2))
return 0
if args.dry_run:
print("\n=== DRY RUN — would post ===")
for i, f in enumerate(unique, 1):
print(f"\n--- Comment {i}/{len(unique)} ---")
print(format_comment(f))
return 0
client = GiteaClient(args.api_base, token, args.org, args.repo)
posted = 0
for f in unique:
body = format_comment(f)
result = client.post_issue_comment(args.pr, body)
if result:
print(f"✅ Posted: {f['file']}:{f['line']} (id={result.get('id')})")
posted += 1
else:
print(f"❌ Failed: {f['file']}:{f['line']}")
print(f"\nPosted {posted}/{len(unique)} to PR #{args.pr}")
return 0 if posted == len(unique) else 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,5 +0,0 @@
{"file": "scripts/harvester.py", "line": 47, "text": "Consider adding type hints to improve readability", "severity": "info"}
{"file": "scripts/dedup.py", "line": 89, "text": "Add null check before accessing fact['confidence'] to avoid KeyError", "severity": "warning"}
{"file": "scripts/bootstrapper.py", "line": 102, "text": "This loop is O(n^2) — could be optimized with a dict lookup", "severity": "info"}
{"file": "scripts/harvester.py", "line": 47, "text": "Consider adding type hints to improve readability", "severity": "info"}
{"file": "scripts/harvester.py", "line": 120, "text": "File handle not closed in error path — use context manager", "severity": "error"}

View File

@@ -0,0 +1,179 @@
#!/usr/bin/env python3
"""Tests for scripts/dependency_freshness.py — 9.7 Dependency Freshness."""
import json
import os
import sys
from unittest.mock import patch, MagicMock
# Import target module
sys.path.insert(0, os.path.dirname(__file__) or ".")
import importlib.util
spec = importlib.util.spec_from_file_location(
"dependency_freshness",
os.path.join(os.path.dirname(__file__) or ".", "dependency_freshness.py")
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
parse_requirements = mod.parse_requirements
get_major_version = mod.get_major_version
is_more_than_two_majors_behind = mod.is_more_than_two_majors_behind
analyze_dependencies = mod.analyze_dependencies
def test_parse_requirements_simple():
"""Parse a simple package line."""
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write("requests\n")
tmp = f.name
try:
pkgs = parse_requirements(tmp)
assert pkgs == ["requests"], f"got {pkgs}"
print("PASS: test_parse_requirements_simple")
finally:
os.unlink(tmp)
def test_parse_requirements_with_specifiers():
"""Parse lines with version specifiers."""
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write("pytest>=8,<9\n")
f.write("aiohttp>=3.8\n")
tmp = f.name
try:
pkgs = parse_requirements(tmp)
assert pkgs == ["pytest", "aiohttp"], f"got {pkgs}"
print("PASS: test_parse_requirements_with_specifiers")
finally:
os.unlink(tmp)
def test_parse_requirements_ignores_comments_and_blanks():
"""Comments and blank lines are skipped."""
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write("# This is a comment\n")
f.write("\n")
f.write(" \n")
f.write("numpy\n")
f.write("# another comment\n")
tmp = f.name
try:
pkgs = parse_requirements(tmp)
assert pkgs == ["numpy"], f"got {pkgs}"
print("PASS: test_parse_requirements_ignores_comments_and_blanks")
finally:
os.unlink(tmp)
def test_get_major_version_normal():
"""Extract major version from typical semantic strings."""
assert get_major_version("1.2.3") == 1
assert get_major_version("3.4.5") == 3
assert get_major_version("0.11.0") == 0
print("PASS: test_get_major_version_normal")
def test_get_major_version_with_rc():
"""Prerelease versions still yield major number."""
assert get_major_version("2.0.0rc1") == 2
assert get_major_version("1.0.0a1") == 1
print("PASS: test_get_major_version_with_rc")
def test_is_more_than_two_majors_behind():
"""Difference >2 triggers True; <=2 triggers False."""
assert is_more_than_two_majors_behind("1.2.3", "4.0.0") is True
assert is_more_than_two_majors_behind("3.9.0", "4.0.0") is False
assert is_more_than_two_majors_behind("2.1.0", "5.2.0") is True
assert is_more_than_two_majors_behind("8.0.0", "9.0.0") is False
assert is_more_than_two_majors_behind("4.0.0", "4.0.0") is False
print("PASS: test_is_more_than_two_majors_behind")
def test_analyze_dependencies_very_outdated():
"""Flag packages more than 2 major versions behind."""
required = ["pkg_a", "pkg_b"]
installed = {"pkg_a": "1.0.0", "pkg_b": "3.5.2"}
outdated = {
"pkg_a": {"installed": "1.0.0", "latest": "4.0.0"},
"pkg_b": {"installed": "3.5.2", "latest": "4.0.0"},
}
very_out, missing, outdated_ok = analyze_dependencies(required, installed, outdated)
assert len(very_out) == 1 and very_out[0]["package"] == "pkg_a"
assert len(missing) == 0
assert len(outdated_ok) == 1 and outdated_ok[0]["package"] == "pkg_b"
print("PASS: test_analyze_dependencies_very_outdated")
def test_analyze_dependencies_missing():
"""Detect packages not installed at all."""
required = ["pkg_a", "pkg_missing"]
installed = {"pkg_a": "2.0.0"}
outdated = {"pkg_a": {"installed": "2.0.0", "latest": "3.0.0"}}
very_out, missing, outdated_ok = analyze_dependencies(required, installed, outdated)
assert "pkg_missing" in missing
assert len(very_out) == 0
assert len(outdated_ok) == 1
print("PASS: test_analyze_dependencies_missing")
def test_analyze_dependencies_up_to_date():
"""Packages up-to-date are not flagged."""
required = ["pkg_good"]
installed = {"pkg_good": "5.0.0"}
outdated = {}
very_out, missing, outdated_ok = analyze_dependencies(required, installed, outdated)
assert len(very_out) == 0
assert len(missing) == 0
assert len(outdated_ok) == 0
print("PASS: test_analyze_dependencies_up_to_date")
def test_generate_human_report_contains_very_outdated():
"""Human report includes very outdated packages."""
very_out = [
{"package": "oldpkg", "installed": "1.0", "latest": "4.0", "major_diff": 3}
]
missing = []
outdated_ok = []
report = mod.generate_human_report(very_out, missing, outdated_ok, "requirements.txt")
assert "oldpkg" in report
assert "Installed: 1.0" in report
assert "Latest: 4.0" in report
assert "Major diff: 3" in report
print("PASS: test_generate_human_report_contains_very_outdated")
def test_generate_json_report_structure():
"""JSON report contains required keys."""
very_out = [{"package": "oldpkg", "installed": "1.0", "latest": "4.0", "major_diff": 3}]
missing = ["missing_pkg"]
outdated_ok = []
report_json = mod.generate_json_report(very_out, missing, outdated_ok, "requirements.txt")
data = json.loads(report_json)
assert "summary" in data
assert data["summary"]["very_outdated_count"] == 1
assert data["summary"]["missing_count"] == 1
assert "very_outdated" in data
assert "missing" in data
print("PASS: test_generate_json_report_structure")
if __name__ == '__main__':
print("Running dependency_freshness test suite...")
test_parse_requirements_simple()
test_parse_requirements_with_specifiers()
test_parse_requirements_ignores_comments_and_blanks()
test_get_major_version_normal()
test_get_major_version_with_rc()
test_is_more_than_two_majors_behind()
test_analyze_dependencies_very_outdated()
test_analyze_dependencies_missing()
test_analyze_dependencies_up_to_date()
test_generate_human_report_contains_very_outdated()
test_generate_json_report_structure()
print("ALL TESTS PASSED.")

View File

@@ -1,234 +0,0 @@
#!/usr/bin/env python3
"""
Smoke tests for Review Comment Generator — Issue #126
"""
from __future__ import annotations
import json
import subprocess
import sys
import hashlib
from io import StringIO
from pathlib import Path
import pytest
REPO_ROOT = Path(__file__).resolve().parents[1]
SCRIPTS_DIR = REPO_ROOT / "scripts"
GENERATOR = SCRIPTS_DIR / "review_comment_generator.py"
SAMPLE_FINDINGS = SCRIPTS_DIR / "sample_findings.jsonl"
class TestGeneratorPresence:
def test_script_exists(self):
assert GENERATOR.exists(), f"Missing: {GENERATOR}"
def test_shebang_is_python(self):
with open(GENERATOR) as f:
first = f.readline().strip()
assert first.startswith("#!"), "No shebang"
assert "python" in first.lower()
class TestDeduplication:
def test_content_hash_deterministic(self):
from hashlib import sha256
def ch(f):
key = f"{f['file']}:{f['line']}:{f['text']}"
return sha256(key.encode()).hexdigest()
finding = {"file": "a.py", "line": 1, "text": "test"}
assert ch(finding) == ch(finding)
def test_duplicate_findings_are_removed(self):
findings = [
{"file": "a.py", "line": 1, "text": "foo", "severity": "info"},
{"file": "a.py", "line": 1, "text": "foo", "severity": "warning"},
{"file": "b.py", "line": 2, "text": "bar", "severity": "info"},
]
seen = {}
for f in findings:
key = f"{f['file']}:{f['line']}:{f['text']}"
seen[key] = f
assert len(seen) == 2
def test_different_findings_are_kept(self):
findings = [
{"file": "a.py", "line": 1, "text": "foo"},
{"file": "a.py", "line": 2, "text": "foo"},
{"file": "a.py", "line": 1, "text": "bar"},
]
seen = {}
for f in findings:
key = f"{f['file']}:{f['line']}:{f['text']}"
seen[key] = f
assert len(seen) == 3
class TestCommentFormatting:
def test_format_basic(self):
sys.path.insert(0, str(SCRIPTS_DIR))
from review_comment_generator import format_comment
f = {"file": "scripts/foo.py", "line": 10, "text": "Fix this bug", "severity": "warning"}
body = format_comment(f)
assert "📝 **Review Comment**" not in body # warning uses ⚠️
assert "⚠️ **Review Comment**" in body
assert "`scripts/foo.py`" in body
assert "Line: 10" in body
assert "> Fix this bug" in body
def test_format_severity_emoji(self):
sys.path.insert(0, str(SCRIPTS_DIR))
from review_comment_generator import format_comment
cases = [("error", "🛑"), ("warning", "⚠️"), ("info", ""), ("unknown", "📝")]
for severity, emoji in cases:
f = {"file": "x.py", "line": 1, "text": "test", "severity": severity}
assert emoji in format_comment(f)
class TestFindingsLoader:
def test_load_from_file(self):
sys.path.insert(0, str(SCRIPTS_DIR))
from review_comment_generator import load_findings
findings = load_findings(SAMPLE_FINDINGS, from_stdin=False)
assert len(findings) >= 4
def test_load_ignores_blank_and_comments(self):
import tempfile, os
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as tf:
tf.write('{"file":"a.py","line":1,"text":"valid"}\n')
tf.write('\n')
tf.write('# this is a comment\n')
tf.write('{"file":"b.py","line":2,"text":"also valid"}\n')
tfname = tf.name
try:
sys.path.insert(0, str(SCRIPTS_DIR))
from review_comment_generator import load_findings
assert len(load_findings(Path(tfname), from_stdin=False)) == 2
finally:
os.unlink(tfname)
def test_invalid_json_line_skipped(self, capsys):
import tempfile, os
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as tf:
tf.write('invalid json\n')
tf.write('{"file":"ok.py","line":1,"text":"valid"}\n')
tfname = tf.name
try:
sys.path.insert(0, str(SCRIPTS_DIR))
from review_comment_generator import load_findings
assert len(load_findings(Path(tfname), from_stdin=False)) == 1
finally:
os.unlink(tfname)
class TestDryRunMode:
def test_dry_run_counts_unique(self):
result = subprocess.run(
[sys.executable, str(GENERATOR), "--pr", "126",
"--input", str(SAMPLE_FINDINGS), "--dry-run"],
capture_output=True, text=True, cwd=REPO_ROOT, timeout=15
)
assert result.returncode == 0
assert "DRY RUN" in result.stdout
assert "Review Comment" in result.stdout
def test_dry_run_shows_all_unique(self):
result = subprocess.run(
[sys.executable, str(GENERATOR), "--pr", "126",
"--input", str(SAMPLE_FINDINGS), "--dry-run"],
capture_output=True, text=True, cwd=REPO_ROOT, timeout=15
)
assert result.stdout.count("--- Comment") == 4
class TestJSONOutputMode:
def test_json_flag_emits_valid_json(self):
result = subprocess.run(
[sys.executable, str(GENERATOR), "--pr", "126",
"--input", str(SAMPLE_FINDINGS), "--json"],
capture_output=True, text=True, cwd=REPO_ROOT, timeout=15
)
assert result.returncode == 0
payload = json.loads(result.stdout)
assert "total" in payload and "unique" in payload and "findings" in payload
assert payload["total"] >= payload["unique"]
def test_json_findings_have_required_fields(self):
result = subprocess.run(
[sys.executable, str(GENERATOR), "--pr", "126",
"--input", str(SAMPLE_FINDINGS), "--json"],
capture_output=True, text=True, cwd=REPO_ROOT, timeout=15
)
payload = json.loads(result.stdout)
for f in payload["findings"]:
assert "file" in f and "line" in f and "text" in f
class TestGiteaClient:
def test_post_issue_comment_builds_correct_url(self):
sys.path.insert(0, str(SCRIPTS_DIR))
from review_comment_generator import GiteaClient
client = GiteaClient("https://example.com", "token123", "MyOrg", "myrepo")
assert client.org == "MyOrg" and client.repo == "myrepo"
def test_generate_comment_body_has_required_fields(self):
sys.path.insert(0, str(SCRIPTS_DIR))
from review_comment_generator import format_comment
f = {"file": "x.py", "line": 5, "text": "Fix this", "severity": "error"}
body = format_comment(f)
assert "x.py" in body and "5" in body and "Fix this" in body
class TestFullPipeline:
def test_end_to_end_json_output(self):
result = subprocess.run(
[sys.executable, str(GENERATOR), "--pr", "126",
"--input", str(SAMPLE_FINDINGS), "--json"],
capture_output=True, text=True, cwd=REPO_ROOT, timeout=15
)
assert result.returncode == 0
data = json.loads(result.stdout)
assert data["total"] == 5
assert data["unique"] == 4
f = data["findings"][0]
for key in ("file", "line", "text", "severity"):
assert key in f
def test_token_loading_fallback(self):
sys.path.insert(0, str(SCRIPTS_DIR))
from review_comment_generator import load_token
token = load_token()
assert token is None or isinstance(token, str)
class TestErrorHandling:
def test_missing_input_shows_error(self):
result = subprocess.run(
[sys.executable, str(GENERATOR), "--pr", "126"],
capture_output=True, text=True, cwd=REPO_ROOT, timeout=15
)
assert result.returncode != 0
assert "--input" in result.stderr or "--stdin" in result.stderr
def test_invalid_json_line_skipped(self):
import tempfile, os
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as tf:
tf.write('invalid json\n')
tf.write('{"file":"ok.py","line":1,"text":"valid"}\n')
tfname = tf.name
try:
result = subprocess.run(
[sys.executable, str(GENERATOR), "--pr", "126",
"--input", tfname, "--json"],
capture_output=True, text=True, cwd=REPO_ROOT, timeout=15
)
data = json.loads(result.stdout)
assert data["total"] == 1
assert data["unique"] == 1
finally:
os.unlink(tfname)
if __name__ == "__main__":
pytest.main([__file__, "-v"])