352 lines
12 KiB
Python
352 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
PR Complexity Scorer - Estimate review effort for PRs.
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
import urllib.request
|
|
import urllib.error
|
|
|
|
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
|
|
|
|
DEPENDENCY_FILES = {
|
|
"requirements.txt", "pyproject.toml", "setup.py", "setup.cfg",
|
|
"Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile",
|
|
"go.mod", "Cargo.toml", "pom.xml", "build.gradle"
|
|
}
|
|
|
|
TEST_PATTERNS = [
|
|
r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$",
|
|
r"spec/.*\.rb$", r".*_spec\.rb$",
|
|
r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$"
|
|
]
|
|
|
|
WEIGHT_FILES = 0.25
|
|
WEIGHT_LINES = 0.25
|
|
WEIGHT_DEPS = 0.30
|
|
WEIGHT_TEST_COV = 0.20
|
|
|
|
SMALL_FILES = 5
|
|
MEDIUM_FILES = 20
|
|
LARGE_FILES = 50
|
|
|
|
SMALL_LINES = 100
|
|
MEDIUM_LINES = 500
|
|
LARGE_LINES = 2000
|
|
|
|
TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120}
|
|
|
|
|
|
@dataclass
|
|
class PRComplexity:
|
|
pr_number: int
|
|
title: str
|
|
files_changed: int
|
|
additions: int
|
|
deletions: int
|
|
has_dependency_changes: bool
|
|
test_coverage_delta: Optional[int]
|
|
score: int
|
|
estimated_minutes: int
|
|
reasons: List[str]
|
|
|
|
def to_dict(self) -> dict:
|
|
return asdict(self)
|
|
|
|
|
|
class GiteaClient:
|
|
def __init__(self, token: str):
|
|
self.token = token
|
|
self.base_url = GITEA_BASE.rstrip("/")
|
|
|
|
def _request(self, path: str, params: Dict = None) -> Any:
|
|
url = f"{self.base_url}{path}"
|
|
if params:
|
|
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
|
|
url += f"?{qs}"
|
|
|
|
req = urllib.request.Request(url)
|
|
req.add_header("Authorization", f"token {self.token}")
|
|
req.add_header("Content-Type", "application/json")
|
|
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
return json.loads(resp.read().decode())
|
|
except urllib.error.HTTPError as e:
|
|
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
|
|
return None
|
|
except urllib.error.URLError as e:
|
|
print(f"Network error: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
def get_open_prs(self, org: str, repo: str) -> List[Dict]:
|
|
prs = []
|
|
page = 1
|
|
while True:
|
|
batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"})
|
|
if not batch:
|
|
break
|
|
prs.extend(batch)
|
|
if len(batch) < 50:
|
|
break
|
|
page += 1
|
|
return prs
|
|
|
|
def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]:
|
|
files = []
|
|
page = 1
|
|
while True:
|
|
batch = self._request(
|
|
f"/repos/{org}/{repo}/pulls/{pr_number}/files",
|
|
{"limit": 100, "page": page}
|
|
)
|
|
if not batch:
|
|
break
|
|
files.extend(batch)
|
|
if len(batch) < 100:
|
|
break
|
|
page += 1
|
|
return files
|
|
|
|
def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool:
|
|
data = json.dumps({"body": body}).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments",
|
|
data=data,
|
|
method="POST",
|
|
headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"}
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
return resp.status in (200, 201)
|
|
except urllib.error.HTTPError:
|
|
return False
|
|
|
|
|
|
def is_dependency_file(filename: str) -> bool:
|
|
return any(filename.endswith(dep) for dep in DEPENDENCY_FILES)
|
|
|
|
|
|
def is_test_file(filename: str) -> bool:
|
|
return any(re.search(pattern, filename) for pattern in TEST_PATTERNS)
|
|
|
|
|
|
def score_pr(
|
|
files_changed: int,
|
|
additions: int,
|
|
deletions: int,
|
|
has_dependency_changes: bool,
|
|
test_coverage_delta: Optional[int] = None
|
|
) -> tuple[int, int, List[str]]:
|
|
score = 1.0
|
|
reasons = []
|
|
|
|
# Files changed
|
|
if files_changed <= SMALL_FILES:
|
|
fscore = 1.0
|
|
reasons.append("small number of files changed")
|
|
elif files_changed <= MEDIUM_FILES:
|
|
fscore = 2.0
|
|
reasons.append("moderate number of files changed")
|
|
elif files_changed <= LARGE_FILES:
|
|
fscore = 2.5
|
|
reasons.append("large number of files changed")
|
|
else:
|
|
fscore = 3.0
|
|
reasons.append("very large PR spanning many files")
|
|
|
|
# Lines changed
|
|
total_lines = additions + deletions
|
|
if total_lines <= SMALL_LINES:
|
|
lscore = 1.0
|
|
reasons.append("small change size")
|
|
elif total_lines <= MEDIUM_LINES:
|
|
lscore = 2.0
|
|
reasons.append("moderate change size")
|
|
elif total_lines <= LARGE_LINES:
|
|
lscore = 3.0
|
|
reasons.append("large change size")
|
|
else:
|
|
lscore = 4.0
|
|
reasons.append("very large change")
|
|
|
|
# Dependency changes
|
|
if has_dependency_changes:
|
|
dscore = 2.5
|
|
reasons.append("dependency changes (architectural impact)")
|
|
else:
|
|
dscore = 0.0
|
|
|
|
# Test coverage delta
|
|
tscore = 0.0
|
|
if test_coverage_delta is not None:
|
|
if test_coverage_delta > 0:
|
|
reasons.append(f"test additions (+{test_coverage_delta} test files)")
|
|
tscore = -min(2.0, test_coverage_delta / 2.0)
|
|
elif test_coverage_delta < 0:
|
|
reasons.append(f"test removals ({abs(test_coverage_delta)} test files)")
|
|
tscore = min(2.0, abs(test_coverage_delta) * 0.5)
|
|
else:
|
|
reasons.append("test coverage change not assessed")
|
|
|
|
# Weighted sum, scaled by 3 to use full 1-10 range
|
|
bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV)
|
|
scaled_bonus = bonus * 3.0
|
|
score = 1.0 + scaled_bonus
|
|
|
|
final_score = max(1, min(10, int(round(score))))
|
|
est_minutes = TIME_PER_POINT.get(final_score, 30)
|
|
|
|
return final_score, est_minutes, reasons
|
|
|
|
|
|
def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity:
|
|
pr_num = pr_data["number"]
|
|
title = pr_data.get("title", "")
|
|
files = client.get_pr_files(org, repo, pr_num)
|
|
|
|
additions = sum(f.get("additions", 0) for f in files)
|
|
deletions = sum(f.get("deletions", 0) for f in files)
|
|
filenames = [f.get("filename", "") for f in files]
|
|
|
|
has_deps = any(is_dependency_file(f) for f in filenames)
|
|
|
|
test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", "")))
|
|
test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", "")))
|
|
test_delta = test_added - test_removed if (test_added or test_removed) else None
|
|
|
|
score, est_min, reasons = score_pr(
|
|
files_changed=len(files),
|
|
additions=additions,
|
|
deletions=deletions,
|
|
has_dependency_changes=has_deps,
|
|
test_coverage_delta=test_delta
|
|
)
|
|
|
|
return PRComplexity(
|
|
pr_number=pr_num,
|
|
title=title,
|
|
files_changed=len(files),
|
|
additions=additions,
|
|
deletions=deletions,
|
|
has_dependency_changes=has_deps,
|
|
test_coverage_delta=test_delta,
|
|
score=score,
|
|
estimated_minutes=est_min,
|
|
reasons=reasons
|
|
)
|
|
|
|
|
|
def build_comment(complexity: PRComplexity) -> str:
|
|
change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines"
|
|
deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else ""
|
|
test_note = ""
|
|
if complexity.test_coverage_delta is not None:
|
|
if complexity.test_coverage_delta > 0:
|
|
test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added"
|
|
elif complexity.test_coverage_delta < 0:
|
|
test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed"
|
|
|
|
comment = f"## 📊 PR Complexity Analysis\n\n"
|
|
comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n"
|
|
comment += f"| Metric | Value |\n|--------|-------|\n"
|
|
comment += f"| Changes | {change_desc} |\n"
|
|
comment += f"| Complexity Score | **{complexity.score}/10** |\n"
|
|
comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n"
|
|
comment += f"### Scoring rationale:"
|
|
for r in complexity.reasons:
|
|
comment += f"\n- {r}"
|
|
if deps_note:
|
|
comment += deps_note
|
|
if test_note:
|
|
comment += test_note
|
|
comment += f"\n\n---\n"
|
|
comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*"
|
|
return comment
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="PR Complexity Scorer")
|
|
parser.add_argument("--org", default="Timmy_Foundation")
|
|
parser.add_argument("--repo", default="compounding-intelligence")
|
|
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
|
|
parser.add_argument("--dry-run", action="store_true")
|
|
parser.add_argument("--apply", action="store_true")
|
|
parser.add_argument("--output", default="metrics/pr_complexity.json")
|
|
args = parser.parse_args()
|
|
|
|
token_path = args.token
|
|
if os.path.exists(token_path):
|
|
with open(token_path) as f:
|
|
token = f.read().strip()
|
|
else:
|
|
token = args.token
|
|
|
|
if not token:
|
|
print("ERROR: No Gitea token provided", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
client = GiteaClient(token)
|
|
|
|
print(f"Fetching open PRs for {args.org}/{args.repo}...")
|
|
prs = client.get_open_prs(args.org, args.repo)
|
|
if not prs:
|
|
print("No open PRs found.")
|
|
sys.exit(0)
|
|
|
|
print(f"Found {len(prs)} open PR(s). Analyzing...")
|
|
|
|
results = []
|
|
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
for pr in prs:
|
|
pr_num = pr["number"]
|
|
title = pr.get("title", "")
|
|
print(f" Analyzing PR #{pr_num}: {title[:60]}")
|
|
|
|
try:
|
|
complexity = analyze_pr(client, args.org, args.repo, pr)
|
|
results.append(complexity.to_dict())
|
|
|
|
comment = build_comment(complexity)
|
|
|
|
if args.dry_run:
|
|
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]")
|
|
elif args.apply:
|
|
success = client.post_comment(args.org, args.repo, pr_num, comment)
|
|
status = "[commented]" if success else "[FAILED]"
|
|
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}")
|
|
else:
|
|
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]")
|
|
|
|
except Exception as e:
|
|
print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr)
|
|
|
|
with open(args.output, "w") as f:
|
|
json.dump({
|
|
"org": args.org,
|
|
"repo": args.repo,
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"pr_count": len(results),
|
|
"results": results
|
|
}, f, indent=2)
|
|
|
|
if results:
|
|
scores = [r["score"] for r in results]
|
|
print(f"\nResults saved to {args.output}")
|
|
print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}")
|
|
else:
|
|
print("\nNo results to save.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|