diff --git a/scripts/pr_complexity_scorer.py b/scripts/pr_complexity_scorer.py new file mode 100644 index 0000000..842569c --- /dev/null +++ b/scripts/pr_complexity_scorer.py @@ -0,0 +1,351 @@ +#!/usr/bin/env python3 +""" +PR Complexity Scorer - Estimate review effort for PRs. +""" + +import argparse +import json +import os +import re +import sys +from dataclasses import dataclass, asdict +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional +import urllib.request +import urllib.error + +GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1" + +DEPENDENCY_FILES = { + "requirements.txt", "pyproject.toml", "setup.py", "setup.cfg", + "Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile", + "go.mod", "Cargo.toml", "pom.xml", "build.gradle" +} + +TEST_PATTERNS = [ + r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$", + r"spec/.*\.rb$", r".*_spec\.rb$", + r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$" +] + +WEIGHT_FILES = 0.25 +WEIGHT_LINES = 0.25 +WEIGHT_DEPS = 0.30 +WEIGHT_TEST_COV = 0.20 + +SMALL_FILES = 5 +MEDIUM_FILES = 20 +LARGE_FILES = 50 + +SMALL_LINES = 100 +MEDIUM_LINES = 500 +LARGE_LINES = 2000 + +TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120} + + +@dataclass +class PRComplexity: + pr_number: int + title: str + files_changed: int + additions: int + deletions: int + has_dependency_changes: bool + test_coverage_delta: Optional[int] + score: int + estimated_minutes: int + reasons: List[str] + + def to_dict(self) -> dict: + return asdict(self) + + +class GiteaClient: + def __init__(self, token: str): + self.token = token + self.base_url = GITEA_BASE.rstrip("/") + + def _request(self, path: str, params: Dict = None) -> Any: + url = f"{self.base_url}{path}" + if params: + qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None) + url += f"?{qs}" + + req = urllib.request.Request(url) + req.add_header("Authorization", f"token {self.token}") + req.add_header("Content-Type", "application/json") + + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return json.loads(resp.read().decode()) + except urllib.error.HTTPError as e: + print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr) + return None + except urllib.error.URLError as e: + print(f"Network error: {e}", file=sys.stderr) + return None + + def get_open_prs(self, org: str, repo: str) -> List[Dict]: + prs = [] + page = 1 + while True: + batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"}) + if not batch: + break + prs.extend(batch) + if len(batch) < 50: + break + page += 1 + return prs + + def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]: + files = [] + page = 1 + while True: + batch = self._request( + f"/repos/{org}/{repo}/pulls/{pr_number}/files", + {"limit": 100, "page": page} + ) + if not batch: + break + files.extend(batch) + if len(batch) < 100: + break + page += 1 + return files + + def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool: + data = json.dumps({"body": body}).encode("utf-8") + req = urllib.request.Request( + f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments", + data=data, + method="POST", + headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"} + ) + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return resp.status in (200, 201) + except urllib.error.HTTPError: + return False + + +def is_dependency_file(filename: str) -> bool: + return any(filename.endswith(dep) for dep in DEPENDENCY_FILES) + + +def is_test_file(filename: str) -> bool: + return any(re.search(pattern, filename) for pattern in TEST_PATTERNS) + + +def score_pr( + files_changed: int, + additions: int, + deletions: int, + has_dependency_changes: bool, + test_coverage_delta: Optional[int] = None +) -> tuple[int, int, List[str]]: + score = 1.0 + reasons = [] + + # Files changed + if files_changed <= SMALL_FILES: + fscore = 1.0 + reasons.append("small number of files changed") + elif files_changed <= MEDIUM_FILES: + fscore = 2.0 + reasons.append("moderate number of files changed") + elif files_changed <= LARGE_FILES: + fscore = 2.5 + reasons.append("large number of files changed") + else: + fscore = 3.0 + reasons.append("very large PR spanning many files") + + # Lines changed + total_lines = additions + deletions + if total_lines <= SMALL_LINES: + lscore = 1.0 + reasons.append("small change size") + elif total_lines <= MEDIUM_LINES: + lscore = 2.0 + reasons.append("moderate change size") + elif total_lines <= LARGE_LINES: + lscore = 3.0 + reasons.append("large change size") + else: + lscore = 4.0 + reasons.append("very large change") + + # Dependency changes + if has_dependency_changes: + dscore = 2.5 + reasons.append("dependency changes (architectural impact)") + else: + dscore = 0.0 + + # Test coverage delta + tscore = 0.0 + if test_coverage_delta is not None: + if test_coverage_delta > 0: + reasons.append(f"test additions (+{test_coverage_delta} test files)") + tscore = -min(2.0, test_coverage_delta / 2.0) + elif test_coverage_delta < 0: + reasons.append(f"test removals ({abs(test_coverage_delta)} test files)") + tscore = min(2.0, abs(test_coverage_delta) * 0.5) + else: + reasons.append("test coverage change not assessed") + + # Weighted sum, scaled by 3 to use full 1-10 range + bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV) + scaled_bonus = bonus * 3.0 + score = 1.0 + scaled_bonus + + final_score = max(1, min(10, int(round(score)))) + est_minutes = TIME_PER_POINT.get(final_score, 30) + + return final_score, est_minutes, reasons + + +def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity: + pr_num = pr_data["number"] + title = pr_data.get("title", "") + files = client.get_pr_files(org, repo, pr_num) + + additions = sum(f.get("additions", 0) for f in files) + deletions = sum(f.get("deletions", 0) for f in files) + filenames = [f.get("filename", "") for f in files] + + has_deps = any(is_dependency_file(f) for f in filenames) + + test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", ""))) + test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", ""))) + test_delta = test_added - test_removed if (test_added or test_removed) else None + + score, est_min, reasons = score_pr( + files_changed=len(files), + additions=additions, + deletions=deletions, + has_dependency_changes=has_deps, + test_coverage_delta=test_delta + ) + + return PRComplexity( + pr_number=pr_num, + title=title, + files_changed=len(files), + additions=additions, + deletions=deletions, + has_dependency_changes=has_deps, + test_coverage_delta=test_delta, + score=score, + estimated_minutes=est_min, + reasons=reasons + ) + + +def build_comment(complexity: PRComplexity) -> str: + change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines" + deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else "" + test_note = "" + if complexity.test_coverage_delta is not None: + if complexity.test_coverage_delta > 0: + test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added" + elif complexity.test_coverage_delta < 0: + test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed" + + comment = f"## 📊 PR Complexity Analysis\n\n" + comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n" + comment += f"| Metric | Value |\n|--------|-------|\n" + comment += f"| Changes | {change_desc} |\n" + comment += f"| Complexity Score | **{complexity.score}/10** |\n" + comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n" + comment += f"### Scoring rationale:" + for r in complexity.reasons: + comment += f"\n- {r}" + if deps_note: + comment += deps_note + if test_note: + comment += test_note + comment += f"\n\n---\n" + comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*" + return comment + + +def main(): + parser = argparse.ArgumentParser(description="PR Complexity Scorer") + parser.add_argument("--org", default="Timmy_Foundation") + parser.add_argument("--repo", default="compounding-intelligence") + parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token")) + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--apply", action="store_true") + parser.add_argument("--output", default="metrics/pr_complexity.json") + args = parser.parse_args() + + token_path = args.token + if os.path.exists(token_path): + with open(token_path) as f: + token = f.read().strip() + else: + token = args.token + + if not token: + print("ERROR: No Gitea token provided", file=sys.stderr) + sys.exit(1) + + client = GiteaClient(token) + + print(f"Fetching open PRs for {args.org}/{args.repo}...") + prs = client.get_open_prs(args.org, args.repo) + if not prs: + print("No open PRs found.") + sys.exit(0) + + print(f"Found {len(prs)} open PR(s). Analyzing...") + + results = [] + Path(args.output).parent.mkdir(parents=True, exist_ok=True) + + for pr in prs: + pr_num = pr["number"] + title = pr.get("title", "") + print(f" Analyzing PR #{pr_num}: {title[:60]}") + + try: + complexity = analyze_pr(client, args.org, args.repo, pr) + results.append(complexity.to_dict()) + + comment = build_comment(complexity) + + if args.dry_run: + print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]") + elif args.apply: + success = client.post_comment(args.org, args.repo, pr_num, comment) + status = "[commented]" if success else "[FAILED]" + print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}") + else: + print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]") + + except Exception as e: + print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr) + + with open(args.output, "w") as f: + json.dump({ + "org": args.org, + "repo": args.repo, + "timestamp": datetime.now(timezone.utc).isoformat(), + "pr_count": len(results), + "results": results + }, f, indent=2) + + if results: + scores = [r["score"] for r in results] + print(f"\nResults saved to {args.output}") + print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}") + else: + print("\nNo results to save.") + + +if __name__ == "__main__": + main() diff --git a/scripts/test_pr_complexity_scorer.py b/scripts/test_pr_complexity_scorer.py new file mode 100644 index 0000000..4e98774 --- /dev/null +++ b/scripts/test_pr_complexity_scorer.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 +""" +Tests for PR Complexity Scorer — unit tests for the scoring logic. +""" + +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent)) + +from pr_complexity_scorer import ( + score_pr, + is_dependency_file, + is_test_file, + TIME_PER_POINT, + SMALL_FILES, + MEDIUM_FILES, + LARGE_FILES, + SMALL_LINES, + MEDIUM_LINES, + LARGE_LINES, +) + +PASS = 0 +FAIL = 0 + +def test(name): + def decorator(fn): + global PASS, FAIL + try: + fn() + PASS += 1 + print(f" [PASS] {name}") + except AssertionError as e: + FAIL += 1 + print(f" [FAIL] {name}: {e}") + except Exception as e: + FAIL += 1 + print(f" [FAIL] {name}: Unexpected error: {e}") + return decorator + +def assert_eq(a, b, msg=""): + if a != b: + raise AssertionError(f"{msg} expected {b!r}, got {a!r}") + +def assert_true(v, msg=""): + if not v: + raise AssertionError(msg or "Expected True") + +def assert_false(v, msg=""): + if v: + raise AssertionError(msg or "Expected False") + + +print("=== PR Complexity Scorer Tests ===\n") + +print("-- File Classification --") + +@test("dependency file detection — requirements.txt") +def _(): + assert_true(is_dependency_file("requirements.txt")) + assert_true(is_dependency_file("src/requirements.txt")) + assert_false(is_dependency_file("requirements_test.txt")) + +@test("dependency file detection — pyproject.toml") +def _(): + assert_true(is_dependency_file("pyproject.toml")) + assert_false(is_dependency_file("myproject.py")) + +@test("test file detection — pytest style") +def _(): + assert_true(is_test_file("tests/test_api.py")) + assert_true(is_test_file("test_module.py")) + assert_true(is_test_file("src/module_test.py")) + +@test("test file detection — other frameworks") +def _(): + assert_true(is_test_file("spec/feature_spec.rb")) + assert_true(is_test_file("__tests__/component.test.js")) + assert_false(is_test_file("testfixtures/helper.py")) + + +print("\n-- Scoring Logic --") + +@test("small PR gets low score (1-3)") +def _(): + score, minutes, _ = score_pr( + files_changed=3, + additions=50, + deletions=10, + has_dependency_changes=False, + test_coverage_delta=None + ) + assert_true(1 <= score <= 3, f"Score should be low, got {score}") + assert_true(minutes < 20) + +@test("medium PR gets medium score (4-6)") +def _(): + score, minutes, _ = score_pr( + files_changed=15, + additions=400, + deletions=100, + has_dependency_changes=False, + test_coverage_delta=None + ) + assert_true(4 <= score <= 6, f"Score should be medium, got {score}") + assert_true(20 <= minutes <= 45) + +@test("large PR gets high score (7-9)") +def _(): + score, minutes, _ = score_pr( + files_changed=60, + additions=3000, + deletions=1500, + has_dependency_changes=True, + test_coverage_delta=None + ) + assert_true(7 <= score <= 9, f"Score should be high, got {score}") + assert_true(minutes >= 45) + +@test("dependency changes boost score") +def _(): + base_score, _, _ = score_pr( + files_changed=10, additions=200, deletions=50, + has_dependency_changes=False, test_coverage_delta=None + ) + dep_score, _, _ = score_pr( + files_changed=10, additions=200, deletions=50, + has_dependency_changes=True, test_coverage_delta=None + ) + assert_true(dep_score > base_score, f"Deps: {base_score} -> {dep_score}") + +@test("adding tests lowers complexity") +def _(): + base_score, _, _ = score_pr( + files_changed=8, additions=150, deletions=20, + has_dependency_changes=False, test_coverage_delta=None + ) + better_score, _, _ = score_pr( + files_changed=8, additions=180, deletions=20, + has_dependency_changes=False, test_coverage_delta=3 + ) + assert_true(better_score < base_score, f"Tests: {base_score} -> {better_score}") + +@test("removing tests increases complexity") +def _(): + base_score, _, _ = score_pr( + files_changed=8, additions=150, deletions=20, + has_dependency_changes=False, test_coverage_delta=None + ) + worse_score, _, _ = score_pr( + files_changed=8, additions=150, deletions=20, + has_dependency_changes=False, test_coverage_delta=-2 + ) + assert_true(worse_score > base_score, f"Remove tests: {base_score} -> {worse_score}") + +@test("score bounded 1-10") +def _(): + for files, adds, dels in [(1, 10, 5), (100, 10000, 5000)]: + score, _, _ = score_pr(files, adds, dels, False, None) + assert_true(1 <= score <= 10, f"Score {score} out of range") + +@test("estimated minutes exist for all scores") +def _(): + for s in range(1, 11): + assert_true(s in TIME_PER_POINT, f"Missing time for score {s}") + + +print(f"\n=== Results: {PASS} passed, {FAIL} failed ===") +sys.exit(0 if FAIL == 0 else 1)