#!/usr/bin/env python3 """ PR Complexity Scorer - Estimate review effort for PRs. """ import argparse import json import os import re import sys from dataclasses import dataclass, asdict from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional import urllib.request import urllib.error GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1" DEPENDENCY_FILES = { "requirements.txt", "pyproject.toml", "setup.py", "setup.cfg", "Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile", "go.mod", "Cargo.toml", "pom.xml", "build.gradle" } TEST_PATTERNS = [ r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$", r"spec/.*\.rb$", r".*_spec\.rb$", r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$" ] WEIGHT_FILES = 0.25 WEIGHT_LINES = 0.25 WEIGHT_DEPS = 0.30 WEIGHT_TEST_COV = 0.20 SMALL_FILES = 5 MEDIUM_FILES = 20 LARGE_FILES = 50 SMALL_LINES = 100 MEDIUM_LINES = 500 LARGE_LINES = 2000 TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120} @dataclass class PRComplexity: pr_number: int title: str files_changed: int additions: int deletions: int has_dependency_changes: bool test_coverage_delta: Optional[int] score: int estimated_minutes: int reasons: List[str] def to_dict(self) -> dict: return asdict(self) class GiteaClient: def __init__(self, token: str): self.token = token self.base_url = GITEA_BASE.rstrip("/") def _request(self, path: str, params: Dict = None) -> Any: url = f"{self.base_url}{path}" if params: qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None) url += f"?{qs}" req = urllib.request.Request(url) req.add_header("Authorization", f"token {self.token}") req.add_header("Content-Type", "application/json") try: with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) except urllib.error.HTTPError as e: print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr) return None except urllib.error.URLError as e: print(f"Network error: {e}", file=sys.stderr) return None def get_open_prs(self, org: str, repo: str) -> List[Dict]: prs = [] page = 1 while True: batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"}) if not batch: break prs.extend(batch) if len(batch) < 50: break page += 1 return prs def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]: files = [] page = 1 while True: batch = self._request( f"/repos/{org}/{repo}/pulls/{pr_number}/files", {"limit": 100, "page": page} ) if not batch: break files.extend(batch) if len(batch) < 100: break page += 1 return files def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool: data = json.dumps({"body": body}).encode("utf-8") req = urllib.request.Request( f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments", data=data, method="POST", headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"} ) try: with urllib.request.urlopen(req, timeout=30) as resp: return resp.status in (200, 201) except urllib.error.HTTPError: return False def is_dependency_file(filename: str) -> bool: return any(filename.endswith(dep) for dep in DEPENDENCY_FILES) def is_test_file(filename: str) -> bool: return any(re.search(pattern, filename) for pattern in TEST_PATTERNS) def score_pr( files_changed: int, additions: int, deletions: int, has_dependency_changes: bool, test_coverage_delta: Optional[int] = None ) -> tuple[int, int, List[str]]: score = 1.0 reasons = [] # Files changed if files_changed <= SMALL_FILES: fscore = 1.0 reasons.append("small number of files changed") elif files_changed <= MEDIUM_FILES: fscore = 2.0 reasons.append("moderate number of files changed") elif files_changed <= LARGE_FILES: fscore = 2.5 reasons.append("large number of files changed") else: fscore = 3.0 reasons.append("very large PR spanning many files") # Lines changed total_lines = additions + deletions if total_lines <= SMALL_LINES: lscore = 1.0 reasons.append("small change size") elif total_lines <= MEDIUM_LINES: lscore = 2.0 reasons.append("moderate change size") elif total_lines <= LARGE_LINES: lscore = 3.0 reasons.append("large change size") else: lscore = 4.0 reasons.append("very large change") # Dependency changes if has_dependency_changes: dscore = 2.5 reasons.append("dependency changes (architectural impact)") else: dscore = 0.0 # Test coverage delta tscore = 0.0 if test_coverage_delta is not None: if test_coverage_delta > 0: reasons.append(f"test additions (+{test_coverage_delta} test files)") tscore = -min(2.0, test_coverage_delta / 2.0) elif test_coverage_delta < 0: reasons.append(f"test removals ({abs(test_coverage_delta)} test files)") tscore = min(2.0, abs(test_coverage_delta) * 0.5) else: reasons.append("test coverage change not assessed") # Weighted sum, scaled by 3 to use full 1-10 range bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV) scaled_bonus = bonus * 3.0 score = 1.0 + scaled_bonus final_score = max(1, min(10, int(round(score)))) est_minutes = TIME_PER_POINT.get(final_score, 30) return final_score, est_minutes, reasons def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity: pr_num = pr_data["number"] title = pr_data.get("title", "") files = client.get_pr_files(org, repo, pr_num) additions = sum(f.get("additions", 0) for f in files) deletions = sum(f.get("deletions", 0) for f in files) filenames = [f.get("filename", "") for f in files] has_deps = any(is_dependency_file(f) for f in filenames) test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", ""))) test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", ""))) test_delta = test_added - test_removed if (test_added or test_removed) else None score, est_min, reasons = score_pr( files_changed=len(files), additions=additions, deletions=deletions, has_dependency_changes=has_deps, test_coverage_delta=test_delta ) return PRComplexity( pr_number=pr_num, title=title, files_changed=len(files), additions=additions, deletions=deletions, has_dependency_changes=has_deps, test_coverage_delta=test_delta, score=score, estimated_minutes=est_min, reasons=reasons ) def build_comment(complexity: PRComplexity) -> str: change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines" deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else "" test_note = "" if complexity.test_coverage_delta is not None: if complexity.test_coverage_delta > 0: test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added" elif complexity.test_coverage_delta < 0: test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed" comment = f"## 📊 PR Complexity Analysis\n\n" comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n" comment += f"| Metric | Value |\n|--------|-------|\n" comment += f"| Changes | {change_desc} |\n" comment += f"| Complexity Score | **{complexity.score}/10** |\n" comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n" comment += f"### Scoring rationale:" for r in complexity.reasons: comment += f"\n- {r}" if deps_note: comment += deps_note if test_note: comment += test_note comment += f"\n\n---\n" comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*" return comment def main(): parser = argparse.ArgumentParser(description="PR Complexity Scorer") parser.add_argument("--org", default="Timmy_Foundation") parser.add_argument("--repo", default="compounding-intelligence") parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token")) parser.add_argument("--dry-run", action="store_true") parser.add_argument("--apply", action="store_true") parser.add_argument("--output", default="metrics/pr_complexity.json") args = parser.parse_args() token_path = args.token if os.path.exists(token_path): with open(token_path) as f: token = f.read().strip() else: token = args.token if not token: print("ERROR: No Gitea token provided", file=sys.stderr) sys.exit(1) client = GiteaClient(token) print(f"Fetching open PRs for {args.org}/{args.repo}...") prs = client.get_open_prs(args.org, args.repo) if not prs: print("No open PRs found.") sys.exit(0) print(f"Found {len(prs)} open PR(s). Analyzing...") results = [] Path(args.output).parent.mkdir(parents=True, exist_ok=True) for pr in prs: pr_num = pr["number"] title = pr.get("title", "") print(f" Analyzing PR #{pr_num}: {title[:60]}") try: complexity = analyze_pr(client, args.org, args.repo, pr) results.append(complexity.to_dict()) comment = build_comment(complexity) if args.dry_run: print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]") elif args.apply: success = client.post_comment(args.org, args.repo, pr_num, comment) status = "[commented]" if success else "[FAILED]" print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}") else: print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]") except Exception as e: print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr) with open(args.output, "w") as f: json.dump({ "org": args.org, "repo": args.repo, "timestamp": datetime.now(timezone.utc).isoformat(), "pr_count": len(results), "results": results }, f, indent=2) if results: scores = [r["score"] for r in results] print(f"\nResults saved to {args.output}") print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}") else: print("\nNo results to save.") if __name__ == "__main__": main()