diff --git a/scripts/review_comment_generator.py b/scripts/review_comment_generator.py new file mode 100755 index 0000000..c139dc9 --- /dev/null +++ b/scripts/review_comment_generator.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python3 +""" +Review Comment Generator — Issue #126 +Reads JSONL findings, deduplicates, posts as Gitea PR comments. +""" + +from __future__ import annotations + +import argparse +import hashlib +import json +import os +import sys +import urllib.request +import urllib.error +from datetime import datetime, timezone +from pathlib import Path +from typing import Dict, List, Optional + +SCRIPT_DIR = Path(__file__).resolve().parent +REPO_ROOT = SCRIPT_DIR.parent + +DEFAULT_API_BASE = os.environ.get( + "GITEA_API_BASE", + "https://forge.alexanderwhitestone.com" +) +TOKEN_PATHS = [ + os.path.expanduser("~/.config/gitea/token"), + os.path.expanduser("~/.hermes/gitea.token"), + os.environ.get("GITEA_TOKEN", ""), +] + +def load_token() -> Optional[str]: + token = os.environ.get("GITEA_TOKEN", "") + if token: + return token + for path in TOKEN_PATHS: + if path and os.path.exists(path): + with open(path) as f: + t = f.read().strip() + if t: + return t + return None + +class GiteaClient: + def __init__(self, base_url: str, token: str, org: str, repo: str): + self.base_url = base_url.rstrip("/") + self.token = token + self.org = org + self.repo = repo + + def _post(self, path: str, data: Dict) -> Optional[Dict]: + url = f"{self.base_url}/api/v1{path}" + body = json.dumps(data).encode("utf-8") + req = urllib.request.Request(url, data=body, method="POST") + req.add_header("Authorization", f"token {self.token}") + req.add_header("Content-Type", "application/json") + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return json.loads(resp.read().decode()) + except urllib.error.HTTPError as e: + err = e.read().decode() if e.read() else str(e) + print(f"[ERROR] HTTP {e.code}: {err}", file=sys.stderr) + return None + except Exception as e: + print(f"[ERROR] {e}", file=sys.stderr) + return None + + def post_issue_comment(self, issue_num: int, body: str) -> Optional[Dict]: + return self._post( + f"/repos/{self.org}/{self.repo}/issues/{issue_num}/comments", + {"body": body} + ) + +def content_hash(finding: Dict) -> str: + key = f"{finding['file']}:{finding['line']}:{finding['text']}" + return hashlib.sha256(key.encode("utf-8")).hexdigest() + +def format_comment(finding: Dict) -> str: + emoji = { + "error": "🛑", + "warning": "âš ī¸", + "info": "â„šī¸", + }.get(finding.get("severity", ""), "📝") + f = finding["file"] + ln = finding["line"] + txt = finding["text"] + return f"{emoji} **Review Comment**\n\nFile: `{f}`\nLine: {ln}\n\n> {txt}\n" + +def load_findings(path: Optional[Path], from_stdin: bool) -> List[Dict]: + import fileinput + findings = [] + sources = ["-"] if from_stdin else [str(path)] + for line in fileinput.input(files=sources): + line = line.strip() + if not line or line.startswith("#"): + continue + try: + f = json.loads(line) + for key in ("file", "line", "text"): + if key not in f: + raise ValueError(f"Missing key: {key}") + findings.append(f) + except json.JSONDecodeError as e: + print(f"WARNING: Skipping invalid JSON: {e}", file=sys.stderr) + return findings + +def main() -> int: + parser = argparse.ArgumentParser( + description="Post review findings as comments to a Gitea PR/issue" + ) + parser.add_argument("--pr", type=int, required=True, help="PR/issue number") + parser.add_argument("--org", default="Timmy_Foundation", help="Gitea org") + parser.add_argument("--repo", default="compounding-intelligence", help="Repo name") + parser.add_argument("--api-base", default=DEFAULT_API_BASE, help="Gitea API base") + parser.add_argument("--token", default=None, help="API token (or env/file)") + parser.add_argument("--input", type=Path, default=None, help="JSONL input file") + parser.add_argument("--stdin", action="store_true", help="Read from stdin") + parser.add_argument("--dry-run", action="store_true", help="Show without posting") + parser.add_argument("--json", action="store_true", help="Emit JSON report") + + args = parser.parse_args() + + if not args.stdin and args.input is None: + print("ERROR: --input or --stdin required", file=sys.stderr) + return 1 + if args.stdin and args.input: + print("ERROR: --stdin and --input exclusive", file=sys.stderr) + return 1 + + token = args.token or load_token() + if not token: + print("ERROR: Token not found. Set GITEA_TOKEN or ~/.config/gitea/token", file=sys.stderr) + return 1 + + findings = load_findings(args.input, args.stdin) + if not findings: + print("ERROR: No findings loaded", file=sys.stderr) + return 1 + + if not args.json: print(f"Loaded {len(findings)} finding(s)") + + seen: Dict[str, Dict] = {} + for f in findings: + h = content_hash(f) + if h not in seen: + seen[h] = f + + unique = list(seen.values()) + if not args.json: print(f"After dedup: {len(unique)} unique") + + if args.json: + report = { + "total": len(findings), + "unique": len(unique), + "findings": unique, + "generated_at": datetime.now(timezone.utc).isoformat(), + } + print(json.dumps(report, indent=2)) + return 0 + + if args.dry_run: + print("\n=== DRY RUN — would post ===") + for i, f in enumerate(unique, 1): + print(f"\n--- Comment {i}/{len(unique)} ---") + print(format_comment(f)) + return 0 + + client = GiteaClient(args.api_base, token, args.org, args.repo) + posted = 0 + for f in unique: + body = format_comment(f) + result = client.post_issue_comment(args.pr, body) + if result: + print(f"✅ Posted: {f['file']}:{f['line']} (id={result.get('id')})") + posted += 1 + else: + print(f"❌ Failed: {f['file']}:{f['line']}") + + print(f"\nPosted {posted}/{len(unique)} to PR #{args.pr}") + return 0 if posted == len(unique) else 1 + +if __name__ == "__main__": + sys.exit(main()) + diff --git a/scripts/sample_findings.jsonl b/scripts/sample_findings.jsonl new file mode 100644 index 0000000..efb7741 --- /dev/null +++ b/scripts/sample_findings.jsonl @@ -0,0 +1,5 @@ +{"file": "scripts/harvester.py", "line": 47, "text": "Consider adding type hints to improve readability", "severity": "info"} +{"file": "scripts/dedup.py", "line": 89, "text": "Add null check before accessing fact['confidence'] to avoid KeyError", "severity": "warning"} +{"file": "scripts/bootstrapper.py", "line": 102, "text": "This loop is O(n^2) — could be optimized with a dict lookup", "severity": "info"} +{"file": "scripts/harvester.py", "line": 47, "text": "Consider adding type hints to improve readability", "severity": "info"} +{"file": "scripts/harvester.py", "line": 120, "text": "File handle not closed in error path — use context manager", "severity": "error"} diff --git a/tests/test_review_comment_generator.py b/tests/test_review_comment_generator.py new file mode 100644 index 0000000..498ed46 --- /dev/null +++ b/tests/test_review_comment_generator.py @@ -0,0 +1,234 @@ +#!/usr/bin/env python3 +""" +Smoke tests for Review Comment Generator — Issue #126 +""" + +from __future__ import annotations + +import json +import subprocess +import sys +import hashlib +from io import StringIO +from pathlib import Path + +import pytest + +REPO_ROOT = Path(__file__).resolve().parents[1] +SCRIPTS_DIR = REPO_ROOT / "scripts" +GENERATOR = SCRIPTS_DIR / "review_comment_generator.py" +SAMPLE_FINDINGS = SCRIPTS_DIR / "sample_findings.jsonl" + + +class TestGeneratorPresence: + def test_script_exists(self): + assert GENERATOR.exists(), f"Missing: {GENERATOR}" + + def test_shebang_is_python(self): + with open(GENERATOR) as f: + first = f.readline().strip() + assert first.startswith("#!"), "No shebang" + assert "python" in first.lower() + + +class TestDeduplication: + def test_content_hash_deterministic(self): + from hashlib import sha256 + def ch(f): + key = f"{f['file']}:{f['line']}:{f['text']}" + return sha256(key.encode()).hexdigest() + finding = {"file": "a.py", "line": 1, "text": "test"} + assert ch(finding) == ch(finding) + + def test_duplicate_findings_are_removed(self): + findings = [ + {"file": "a.py", "line": 1, "text": "foo", "severity": "info"}, + {"file": "a.py", "line": 1, "text": "foo", "severity": "warning"}, + {"file": "b.py", "line": 2, "text": "bar", "severity": "info"}, + ] + seen = {} + for f in findings: + key = f"{f['file']}:{f['line']}:{f['text']}" + seen[key] = f + assert len(seen) == 2 + + def test_different_findings_are_kept(self): + findings = [ + {"file": "a.py", "line": 1, "text": "foo"}, + {"file": "a.py", "line": 2, "text": "foo"}, + {"file": "a.py", "line": 1, "text": "bar"}, + ] + seen = {} + for f in findings: + key = f"{f['file']}:{f['line']}:{f['text']}" + seen[key] = f + assert len(seen) == 3 + + +class TestCommentFormatting: + def test_format_basic(self): + sys.path.insert(0, str(SCRIPTS_DIR)) + from review_comment_generator import format_comment + f = {"file": "scripts/foo.py", "line": 10, "text": "Fix this bug", "severity": "warning"} + body = format_comment(f) + assert "📝 **Review Comment**" not in body # warning uses âš ī¸ + assert "âš ī¸ **Review Comment**" in body + assert "`scripts/foo.py`" in body + assert "Line: 10" in body + assert "> Fix this bug" in body + + def test_format_severity_emoji(self): + sys.path.insert(0, str(SCRIPTS_DIR)) + from review_comment_generator import format_comment + cases = [("error", "🛑"), ("warning", "âš ī¸"), ("info", "â„šī¸"), ("unknown", "📝")] + for severity, emoji in cases: + f = {"file": "x.py", "line": 1, "text": "test", "severity": severity} + assert emoji in format_comment(f) + + +class TestFindingsLoader: + def test_load_from_file(self): + sys.path.insert(0, str(SCRIPTS_DIR)) + from review_comment_generator import load_findings + findings = load_findings(SAMPLE_FINDINGS, from_stdin=False) + assert len(findings) >= 4 + + def test_load_ignores_blank_and_comments(self): + import tempfile, os + with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as tf: + tf.write('{"file":"a.py","line":1,"text":"valid"}\n') + tf.write('\n') + tf.write('# this is a comment\n') + tf.write('{"file":"b.py","line":2,"text":"also valid"}\n') + tfname = tf.name + try: + sys.path.insert(0, str(SCRIPTS_DIR)) + from review_comment_generator import load_findings + assert len(load_findings(Path(tfname), from_stdin=False)) == 2 + finally: + os.unlink(tfname) + + def test_invalid_json_line_skipped(self, capsys): + import tempfile, os + with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as tf: + tf.write('invalid json\n') + tf.write('{"file":"ok.py","line":1,"text":"valid"}\n') + tfname = tf.name + try: + sys.path.insert(0, str(SCRIPTS_DIR)) + from review_comment_generator import load_findings + assert len(load_findings(Path(tfname), from_stdin=False)) == 1 + finally: + os.unlink(tfname) + + +class TestDryRunMode: + def test_dry_run_counts_unique(self): + result = subprocess.run( + [sys.executable, str(GENERATOR), "--pr", "126", + "--input", str(SAMPLE_FINDINGS), "--dry-run"], + capture_output=True, text=True, cwd=REPO_ROOT, timeout=15 + ) + assert result.returncode == 0 + assert "DRY RUN" in result.stdout + assert "Review Comment" in result.stdout + + def test_dry_run_shows_all_unique(self): + result = subprocess.run( + [sys.executable, str(GENERATOR), "--pr", "126", + "--input", str(SAMPLE_FINDINGS), "--dry-run"], + capture_output=True, text=True, cwd=REPO_ROOT, timeout=15 + ) + assert result.stdout.count("--- Comment") == 4 + + +class TestJSONOutputMode: + def test_json_flag_emits_valid_json(self): + result = subprocess.run( + [sys.executable, str(GENERATOR), "--pr", "126", + "--input", str(SAMPLE_FINDINGS), "--json"], + capture_output=True, text=True, cwd=REPO_ROOT, timeout=15 + ) + assert result.returncode == 0 + payload = json.loads(result.stdout) + assert "total" in payload and "unique" in payload and "findings" in payload + assert payload["total"] >= payload["unique"] + + def test_json_findings_have_required_fields(self): + result = subprocess.run( + [sys.executable, str(GENERATOR), "--pr", "126", + "--input", str(SAMPLE_FINDINGS), "--json"], + capture_output=True, text=True, cwd=REPO_ROOT, timeout=15 + ) + payload = json.loads(result.stdout) + for f in payload["findings"]: + assert "file" in f and "line" in f and "text" in f + + +class TestGiteaClient: + def test_post_issue_comment_builds_correct_url(self): + sys.path.insert(0, str(SCRIPTS_DIR)) + from review_comment_generator import GiteaClient + client = GiteaClient("https://example.com", "token123", "MyOrg", "myrepo") + assert client.org == "MyOrg" and client.repo == "myrepo" + + def test_generate_comment_body_has_required_fields(self): + sys.path.insert(0, str(SCRIPTS_DIR)) + from review_comment_generator import format_comment + f = {"file": "x.py", "line": 5, "text": "Fix this", "severity": "error"} + body = format_comment(f) + assert "x.py" in body and "5" in body and "Fix this" in body + + +class TestFullPipeline: + def test_end_to_end_json_output(self): + result = subprocess.run( + [sys.executable, str(GENERATOR), "--pr", "126", + "--input", str(SAMPLE_FINDINGS), "--json"], + capture_output=True, text=True, cwd=REPO_ROOT, timeout=15 + ) + assert result.returncode == 0 + data = json.loads(result.stdout) + assert data["total"] == 5 + assert data["unique"] == 4 + f = data["findings"][0] + for key in ("file", "line", "text", "severity"): + assert key in f + + def test_token_loading_fallback(self): + sys.path.insert(0, str(SCRIPTS_DIR)) + from review_comment_generator import load_token + token = load_token() + assert token is None or isinstance(token, str) + + +class TestErrorHandling: + def test_missing_input_shows_error(self): + result = subprocess.run( + [sys.executable, str(GENERATOR), "--pr", "126"], + capture_output=True, text=True, cwd=REPO_ROOT, timeout=15 + ) + assert result.returncode != 0 + assert "--input" in result.stderr or "--stdin" in result.stderr + + def test_invalid_json_line_skipped(self): + import tempfile, os + with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as tf: + tf.write('invalid json\n') + tf.write('{"file":"ok.py","line":1,"text":"valid"}\n') + tfname = tf.name + try: + result = subprocess.run( + [sys.executable, str(GENERATOR), "--pr", "126", + "--input", tfname, "--json"], + capture_output=True, text=True, cwd=REPO_ROOT, timeout=15 + ) + data = json.loads(result.stdout) + assert data["total"] == 1 + assert data["unique"] == 1 + finally: + os.unlink(tfname) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])