Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Payne
c4586eb2be feat(review): add Review Quality Scorer (#127)
Some checks failed
Test / pytest (pull_request) Failing after 8s
Score PR reviews across 5 categories: style, logic, security,
performance, tests. Produces weighted composite and markdown report.

Closes #127
2026-04-26 12:22:25 -04:00
Rockachopa
4b5a675355 feat: add PR complexity scorer — estimate review effort\n\nImplements issue #135: a script that analyzes open PRs and computes\na complexity score (1-10) based on files changed, lines added/removed,\ndependency changes, and test coverage delta. Also estimates review time.\n\nThe scorer can be run with --dry-run to preview or --apply to post\nscore comments directly on PRs.\n\nOutput: metrics/pr_complexity.json with full analysis.\n\nCloses #135
Some checks failed
Test / pytest (push) Failing after 10s
2026-04-26 09:34:57 -04:00
5 changed files with 1065 additions and 288 deletions

View File

@@ -1,288 +0,0 @@
#!/usr/bin/env python3
"""
Codebase Genome Diff — Detect structural changes between two versions.
Compares two git refs (commits, branches, tags) and produces a human-readable
report of structural changes:
• Added/removed/renamed files
• Changed functions/classes (signature modifications)
• New dependencies (imports, requirements, etc.)
Usage:
python3 scripts/genome_diff.py --ref1 <commit1> --ref2 <commit2>
python3 scripts/genome_diff.py --ref1 main --ref2 feature-branch
python3 scripts/genome_diff.py --ref1 v1.0 --ref2 v2.0 --output report.txt
"""
import argparse
import json
import os
import re
import subprocess
import sys
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, SCRIPT_DIR)
from diff_analyzer import DiffAnalyzer, ChangeCategory
@dataclass
class FunctionChange:
file: str
name: str
kind: str # 'function' or 'class'
change_type: str # 'added' or 'removed' (simplified)
old_line: Optional[int] = None
new_line: Optional[int] = None
@dataclass
class DependencyChange:
file: str
module: str
change_type: str # 'added' or 'removed' or 'modified'
line: int = 0
@dataclass
class GenomeDiffReport:
ref1: str
ref2: str
file_changes: List[Dict[str, Any]] = field(default_factory=list)
function_changes: List[FunctionChange] = field(default_factory=list)
dependency_changes: List[DependencyChange] = field(default_factory=list)
total_files_changed: int = 0
total_functions_changed: int = 0
total_dependencies_changed: int = 0
def to_dict(self) -> Dict[str, Any]:
return {
"ref1": self.ref1,
"ref2": self.ref2,
"summary": {
"files": self.total_files_changed,
"functions": self.total_functions_changed,
"dependencies": self.total_dependencies_changed,
},
"file_changes": self.file_changes,
"function_changes": [fc.__dict__ for fc in self.function_changes],
"dependency_changes": [dc.__dict__ for dc in self.dependency_changes],
}
def human_report(self) -> str:
lines = []
lines.append(f"Codebase Genome Diff: {self.ref1}{self.ref2}")
lines.append("=" * 60)
lines.append(f" Files changed: {self.total_files_changed}")
lines.append(f" Functions changed: {self.total_functions_changed}")
lines.append(f" Dependencies changed: {self.total_dependencies_changed}")
lines.append("")
for fc in self.file_changes:
kind = []
if fc.get('is_new'):
kind.append("NEW")
if fc.get('is_deleted'):
kind.append("DELETED")
if fc.get('is_renamed'):
kind.append("RENAMED")
if fc.get('is_binary'):
kind.append("BINARY")
kind_str = f" [{', '.join(kind)}]" if kind else ""
lines.append(f" {fc['path']}{kind_str} (+{fc['added_lines']}/-{fc['deleted_lines']})")
lines.append("")
for fc in self.function_changes:
op = {'added': '+', 'removed': '-', 'modified': '~'}.get(fc.change_type, '?')
lines.append(f" [{op}] {fc.file}: {fc.kind} '{fc.name}'")
lines.append("")
for dc in self.dependency_changes:
op = '+' if dc.change_type == 'added' else '-'
lines.append(f" [{op}] {dc.file}: {dc.module}")
lines.append("")
return "\n".join(lines)
def run_git_diff(ref1: str, ref2: str) -> str:
result = subprocess.run(
['git', 'diff', '--unified=0', f'{ref1}...{ref2}'],
capture_output=True, text=True, cwd=SCRIPT_DIR
)
if result.returncode not in (0, 1):
print(f"git diff failed: {result.stderr}", file=sys.stderr)
sys.exit(1)
return result.stdout
def extract_function_changes(diff_text: str) -> List[FunctionChange]:
changes: List[FunctionChange] = []
pattern = re.compile(r'^([+\-])\s*(def|class)\s+(\w+)', re.MULTILINE)
hunk_header_re = re.compile(r'^@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@')
current_old_line: Optional[int] = None
current_new_line: Optional[int] = None
for line in diff_text.split('\n'):
hdr = hunk_header_re.match(line)
if hdr:
current_old_line = int(hdr.group(1))
current_new_line = int(hdr.group(3))
continue
m = pattern.match(line)
if m:
op = m.group(1)
kind = m.group(2)
name = m.group(3)
change_type = "added" if op == '+' else "removed"
line_num = current_new_line if change_type == "added" else current_old_line
changes.append(FunctionChange(
file="<unknown>",
name=name,
kind=kind,
change_type=change_type,
new_line=line_num if change_type == "added" else None,
old_line=line_num if change_type == "removed" else None,
))
# Advance line counters heuristically
if op == '-':
if current_old_line is not None:
current_old_line += 1
elif op == '+':
if current_new_line is not None:
current_new_line += 1
elif line.startswith(' '):
if current_old_line is not None:
current_old_line += 1
if current_new_line is not None:
current_new_line += 1
# lines starting with other prefixes (like \\ No newline) ignored
return changes
def extract_dependency_changes(diff_text: str, analyzer: DiffAnalyzer) -> List[DependencyChange]:
changes: List[DependencyChange] = []
import_pattern = re.compile(
r'^([+\-])\s*(?:import\s+([\w\.]+)|from\s+([\w\.]+)\s+import)',
re.MULTILINE
)
file_diffs = analyzer._split_files(diff_text)
for file_diff in file_diffs:
file_match = re.search(r'^diff --git a/.*? b/(.*?)$', file_diff, re.MULTILINE)
if not file_match:
continue
filepath = file_match.group(1)
# Scan each line for import changes
for line in file_diff.split('\n'):
m = import_pattern.match(line)
if m:
change_type = "added" if m.group(1) == '+' else "removed"
module = m.group(2) or m.group(3)
changes.append(DependencyChange(
file=filepath,
module=module,
change_type=change_type,
line=0
))
# Detect if this file is a dependency manifest
req_file_pattern = re.compile(
r'^[\+\-].*?(requirements(.*?)\.txt|pyproject\.toml|setup\.py|Pipfile)'
)
if any(req_file_pattern.match(line) for line in file_diff.split('\n')):
if not any(c.file == filepath and c.module == "<file>" for c in changes):
changes.append(DependencyChange(
file=filepath,
module="<file>",
change_type="modified",
line=0
))
return changes
def correlate_function_changes_with_files(diff_text: str, functions: List[FunctionChange]) -> List[FunctionChange]:
result: List[FunctionChange] = []
# Split diff into per-file sections
file_sections: List[tuple[str, str]] = []
current_file: Optional[str] = None
current_lines: List[str] = []
for line in diff_text.split('\n'):
if line.startswith('diff --git'):
if current_file is not None:
file_sections.append((current_file, '\n'.join(current_lines)))
m = re.match(r'^diff --git a/.*? b/(.*?)$', line)
current_file = m.group(1) if m else "unknown"
current_lines = [line]
else:
current_lines.append(line)
if current_file is not None:
file_sections.append((current_file, '\n'.join(current_lines)))
pattern = re.compile(r'^([+\-])\s*(def|class)\s+(\w+)', re.MULTILINE)
for filepath, section in file_sections:
for m in pattern.finditer(section):
op = m.group(1)
kind = m.group(2)
name = m.group(3)
change_type = "added" if op == '+' else "removed"
result.append(FunctionChange(
file=filepath,
name=name,
kind=kind,
change_type=change_type
))
return result
def main():
parser = argparse.ArgumentParser(description="Codebase Genome Diff — structural changes between versions")
parser.add_argument("--ref1", required=True, help="First git ref (commit, branch, tag)")
parser.add_argument("--ref2", required=True, help="Second git ref")
parser.add_argument("--output", help="Write report to file")
parser.add_argument("--json", action="store_true", help="Output JSON instead of human report")
args = parser.parse_args()
try:
diff_text = run_git_diff(args.ref1, args.ref2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if not diff_text.strip():
print(f"No differences between {args.ref1} and {args.ref2}.")
sys.exit(0)
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff_text)
file_changes = [fc.to_dict() for fc in summary.files]
func_changes = extract_function_changes(diff_text)
func_changes = correlate_function_changes_with_files(diff_text, func_changes)
dep_changes = extract_dependency_changes(diff_text, analyzer)
report = GenomeDiffReport(
ref1=args.ref1,
ref2=args.ref2,
file_changes=file_changes,
function_changes=func_changes,
dependency_changes=dep_changes,
total_files_changed=len(file_changes),
total_functions_changed=len(func_changes),
total_dependencies_changed=len(dep_changes),
)
output = json.dumps(report.to_dict(), indent=2) if args.json else report.human_report()
if args.output:
with open(args.output, 'w') as f:
f.write(output + '\n')
print(f"Report written to {args.output}")
else:
print(output)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,351 @@
#!/usr/bin/env python3
"""
PR Complexity Scorer - Estimate review effort for PRs.
"""
import argparse
import json
import os
import re
import sys
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import urllib.request
import urllib.error
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
DEPENDENCY_FILES = {
"requirements.txt", "pyproject.toml", "setup.py", "setup.cfg",
"Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile",
"go.mod", "Cargo.toml", "pom.xml", "build.gradle"
}
TEST_PATTERNS = [
r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$",
r"spec/.*\.rb$", r".*_spec\.rb$",
r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$"
]
WEIGHT_FILES = 0.25
WEIGHT_LINES = 0.25
WEIGHT_DEPS = 0.30
WEIGHT_TEST_COV = 0.20
SMALL_FILES = 5
MEDIUM_FILES = 20
LARGE_FILES = 50
SMALL_LINES = 100
MEDIUM_LINES = 500
LARGE_LINES = 2000
TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120}
@dataclass
class PRComplexity:
pr_number: int
title: str
files_changed: int
additions: int
deletions: int
has_dependency_changes: bool
test_coverage_delta: Optional[int]
score: int
estimated_minutes: int
reasons: List[str]
def to_dict(self) -> dict:
return asdict(self)
class GiteaClient:
def __init__(self, token: str):
self.token = token
self.base_url = GITEA_BASE.rstrip("/")
def _request(self, path: str, params: Dict = None) -> Any:
url = f"{self.base_url}{path}"
if params:
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
url += f"?{qs}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
return None
except urllib.error.URLError as e:
print(f"Network error: {e}", file=sys.stderr)
return None
def get_open_prs(self, org: str, repo: str) -> List[Dict]:
prs = []
page = 1
while True:
batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"})
if not batch:
break
prs.extend(batch)
if len(batch) < 50:
break
page += 1
return prs
def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]:
files = []
page = 1
while True:
batch = self._request(
f"/repos/{org}/{repo}/pulls/{pr_number}/files",
{"limit": 100, "page": page}
)
if not batch:
break
files.extend(batch)
if len(batch) < 100:
break
page += 1
return files
def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool:
data = json.dumps({"body": body}).encode("utf-8")
req = urllib.request.Request(
f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments",
data=data,
method="POST",
headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"}
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status in (200, 201)
except urllib.error.HTTPError:
return False
def is_dependency_file(filename: str) -> bool:
return any(filename.endswith(dep) for dep in DEPENDENCY_FILES)
def is_test_file(filename: str) -> bool:
return any(re.search(pattern, filename) for pattern in TEST_PATTERNS)
def score_pr(
files_changed: int,
additions: int,
deletions: int,
has_dependency_changes: bool,
test_coverage_delta: Optional[int] = None
) -> tuple[int, int, List[str]]:
score = 1.0
reasons = []
# Files changed
if files_changed <= SMALL_FILES:
fscore = 1.0
reasons.append("small number of files changed")
elif files_changed <= MEDIUM_FILES:
fscore = 2.0
reasons.append("moderate number of files changed")
elif files_changed <= LARGE_FILES:
fscore = 2.5
reasons.append("large number of files changed")
else:
fscore = 3.0
reasons.append("very large PR spanning many files")
# Lines changed
total_lines = additions + deletions
if total_lines <= SMALL_LINES:
lscore = 1.0
reasons.append("small change size")
elif total_lines <= MEDIUM_LINES:
lscore = 2.0
reasons.append("moderate change size")
elif total_lines <= LARGE_LINES:
lscore = 3.0
reasons.append("large change size")
else:
lscore = 4.0
reasons.append("very large change")
# Dependency changes
if has_dependency_changes:
dscore = 2.5
reasons.append("dependency changes (architectural impact)")
else:
dscore = 0.0
# Test coverage delta
tscore = 0.0
if test_coverage_delta is not None:
if test_coverage_delta > 0:
reasons.append(f"test additions (+{test_coverage_delta} test files)")
tscore = -min(2.0, test_coverage_delta / 2.0)
elif test_coverage_delta < 0:
reasons.append(f"test removals ({abs(test_coverage_delta)} test files)")
tscore = min(2.0, abs(test_coverage_delta) * 0.5)
else:
reasons.append("test coverage change not assessed")
# Weighted sum, scaled by 3 to use full 1-10 range
bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV)
scaled_bonus = bonus * 3.0
score = 1.0 + scaled_bonus
final_score = max(1, min(10, int(round(score))))
est_minutes = TIME_PER_POINT.get(final_score, 30)
return final_score, est_minutes, reasons
def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity:
pr_num = pr_data["number"]
title = pr_data.get("title", "")
files = client.get_pr_files(org, repo, pr_num)
additions = sum(f.get("additions", 0) for f in files)
deletions = sum(f.get("deletions", 0) for f in files)
filenames = [f.get("filename", "") for f in files]
has_deps = any(is_dependency_file(f) for f in filenames)
test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", "")))
test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", "")))
test_delta = test_added - test_removed if (test_added or test_removed) else None
score, est_min, reasons = score_pr(
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta
)
return PRComplexity(
pr_number=pr_num,
title=title,
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta,
score=score,
estimated_minutes=est_min,
reasons=reasons
)
def build_comment(complexity: PRComplexity) -> str:
change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines"
deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else ""
test_note = ""
if complexity.test_coverage_delta is not None:
if complexity.test_coverage_delta > 0:
test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added"
elif complexity.test_coverage_delta < 0:
test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed"
comment = f"## 📊 PR Complexity Analysis\n\n"
comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n"
comment += f"| Metric | Value |\n|--------|-------|\n"
comment += f"| Changes | {change_desc} |\n"
comment += f"| Complexity Score | **{complexity.score}/10** |\n"
comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n"
comment += f"### Scoring rationale:"
for r in complexity.reasons:
comment += f"\n- {r}"
if deps_note:
comment += deps_note
if test_note:
comment += test_note
comment += f"\n\n---\n"
comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*"
return comment
def main():
parser = argparse.ArgumentParser(description="PR Complexity Scorer")
parser.add_argument("--org", default="Timmy_Foundation")
parser.add_argument("--repo", default="compounding-intelligence")
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--apply", action="store_true")
parser.add_argument("--output", default="metrics/pr_complexity.json")
args = parser.parse_args()
token_path = args.token
if os.path.exists(token_path):
with open(token_path) as f:
token = f.read().strip()
else:
token = args.token
if not token:
print("ERROR: No Gitea token provided", file=sys.stderr)
sys.exit(1)
client = GiteaClient(token)
print(f"Fetching open PRs for {args.org}/{args.repo}...")
prs = client.get_open_prs(args.org, args.repo)
if not prs:
print("No open PRs found.")
sys.exit(0)
print(f"Found {len(prs)} open PR(s). Analyzing...")
results = []
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
for pr in prs:
pr_num = pr["number"]
title = pr.get("title", "")
print(f" Analyzing PR #{pr_num}: {title[:60]}")
try:
complexity = analyze_pr(client, args.org, args.repo, pr)
results.append(complexity.to_dict())
comment = build_comment(complexity)
if args.dry_run:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]")
elif args.apply:
success = client.post_comment(args.org, args.repo, pr_num, comment)
status = "[commented]" if success else "[FAILED]"
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}")
else:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]")
except Exception as e:
print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr)
with open(args.output, "w") as f:
json.dump({
"org": args.org,
"repo": args.repo,
"timestamp": datetime.now(timezone.utc).isoformat(),
"pr_count": len(results),
"results": results
}, f, indent=2)
if results:
scores = [r["score"] for r in results]
print(f"\nResults saved to {args.output}")
print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}")
else:
print("\nNo results to save.")
if __name__ == "__main__":
main()

340
scripts/review_quality_scorer.py Executable file
View File

@@ -0,0 +1,340 @@
#!/usr/bin/env python3
"""
review_quality_scorer.py — Evaluate code review quality.
Scores PR reviews on 5 dimensions (0-100 each):
- Style: formatting, naming, conventions, lint
- Logic: algorithmic correctness, edge cases, reasoning
- Security: vulnerabilities, auth/authz, data exposure
- Performance: efficiency, bottlenecks, resource usage
- Tests: coverage, test quality, missing tests
Produces a weighted composite score and a human-readable report.
Usage:
python3 review_quality_scorer.py --input reviews.json
python3 review_quality_scorer.py --pr 123 --org Timmy_Foundation --repo compounding-intelligence
"""
from __future__ import annotations
import argparse
import json
import os
import re
import sys
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import urllib.request
import urllib.error
# ---------------------------------------------------------------------------
# Category weights (must sum to 1.0)
# ---------------------------------------------------------------------------
WEIGHTS = {
"style": 0.15,
"logic": 0.25,
"security": 0.25,
"performance": 0.15,
"tests": 0.20,
}
# ---------------------------------------------------------------------------
# Indicator patterns per category (presence suggests category was addressed)
# ---------------------------------------------------------------------------
STYLE_INDICATORS = [
r"\bstyle\b", r"\blint\b", r"\bformatting\b", r"\bnaming\b",
r"\bPEP8\b", r"\bblack\b", r"\bprettier\b", r"\bclang-format\b",
r"\bwhitespace\b", r"\bindentation\b", r"\bconsistent\b",
r"`(black|isort|flake8|eslint|prettier)`", r"\bformat\s+code\b",
r"\bstyle\s+guide\b", r"\bconventional\b",
]
LOGIC_INDICATORS = [
r"\blogic\b", r"\balgorithm\b", r"\bedge\s+case\b", r"\bredgecase\b",
r"\bincorrect\b", r"\bwrong\b", r"\bbug\b", r"\b flawed\b",
r"\bmissing\s+case\b", r"\bunhandled\b", r"\boverflow\b",
r"\bunderflow\b", r"\brace\b", r"\bcondition\b", r"\bcheck\s+this\b",
r"\bthink\s+about\b", r"\bwhat\s+happens\s+when\b",
r"\bcorrect\s+behavior\b", r"\bverify\s+logic\b",
]
SECURITY_INDICATORS = [
r"\bsecurity\b", r"\bvuln\b", r"\bCVE\b", r"\bexploit\b",
r"\battack\b", r"\bXSS\b", r"\bSQL\s+injection\b", r"\bRCE\b",
r"\bauthorization\b", r"\bauthentication\b", r"\bpermission\b",
r"\bsensitive\b", r"\bsecret\b", r"\bpassword\b", r"\btoken\b",
r"\bexposure\b", r"\bsanitize\b", r"\bescape\b", r"\bvalidate\b",
r"\binjection\b", r"\breadact\b", r"\bhardcode\b", r"\bcreds\b",
r"\bpublic\s+repo\b", r"\bexfil\b", r"\bleak\b",
]
PERFORMANCE_INDICATORS = [
r"\bperformance\b", r"\bslow\b", r"\boptimize\b", r"\bbottleneck\b",
r"\bcpu\b", r"\bmemory\b", r"\bleak\b", r"\bfast\b", r"\befficient\b",
r"\bcache\b", r"\boverhead\b", r"\bO\(n\)\b", r"\bcomplexity\b",
r"\bswap\b", r"\bpaging\b", r"\bmultiply\b", r"\bredundant\b",
r"\bperf\b", r"\bprofiling\b", r"\bprofiler\b", r"\bhot\s+path\b",
r"\batch\b", r"\block\b", r"\bthread\b", r"\bpool\b",
]
TESTS_INDICATORS = [
r"\btest\b", r"\btesting\b", r"\bcoverage\b", r"\bunittest\b",
r"\bpytest\b", r"\bassert\b", r"\bmock\b", r"\bstub\b",
r"\bfixture\b", r"\bspec\b", r"\bTDD\b", r"\bedge\s+case\b",
r"\bmissing\s+test\b", r"\bno\s+test\b", r"\btest\s+this\b",
r"\badd\s+tests\b", r"\btest\s+coverage\b", r"\bcoverage\s+gap\b",
r"\bregression\b", r"\bintegration\s+test\b", r"\bunit\s+test\b",
]
# Depth indicators (presence = deeper review)
DEPTH_MARKERS = [
r"\bwhy\b", r"\bbecause\b", r"\bexplain\b", r"\bconsider\b",
r"\balternative\b", r"\boption\b", r"\bsuggestion\b", r"\bfix\b",
r"\bupdate\b", r"\bchange\b", r"\bimprove\b", r"\bperhaps\b",
r"\bcould\s+also\b", r"\baside\b", r"\bfootnote\b",
]
# ---------------------------------------------------------------------------
# Scoring helpers
# ---------------------------------------------------------------------------
def _category_presence_score(comments: List[str], indicators: List[str]) -> float:
"""Score 0-1 based on indicator keyword matches."""
if not comments:
return 0.0
text = " ".join(comments).lower()
hits = sum(len(re.findall(pat, text, re.IGNORECASE)) for pat in indicators)
# Normalize: 1 hit = 0.2, 2 = 0.4, ... cap at 1.0
return min(1.0, hits * 0.2)
def _depth_score(comments: List[str]) -> float:
"""Measure review depth: number of substantive comments."""
if not comments:
return 0.0
depth_markers = sum(
1 for c in comments
if len(c.split()) >= 10 and re.search("|".join(DEPTH_MARKERS), c, re.IGNORECASE)
)
# 0 comments → 0, 1-2 → 0.3-0.6, 3+ → 0.7+
return min(1.0, 0.1 + depth_markers * 0.3)
def _category_score(comments: List[str], indicators: List[str], weight: float) -> float:
"""Combined score: 60% presence, 40% depth."""
pres = _category_presence_score(comments, indicators)
depth = _depth_score(comments)
return round((0.6 * pres + 0.4 * depth) * 100, 1)
@dataclass
class ReviewQualityReport:
"""Quality Scores: one 0-100 per category + composite."""
style: float
logic: float
security: float
performance: float
tests: float
composite: float
breakdown: Dict[str, float]
comment_count: int
review_count: int
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
def to_markdown(self) -> str:
lines = [
"# PR Review Quality Report",
"",
f"**Composite Score:** {self.composite:.1f} / 100",
f"**Reviews analyzed:** {self.review_count}",
f"**Comments found:** {self.comment_count}",
"",
"## Category Scores",
"| Category | Score | Weight | Contribution |",
"|----------|-------|--------|--------------|",
]
for cat in ["style", "logic", "security", "performance", "tests"]:
score = getattr(self, cat)
weight = WEIGHTS[cat]
contrib = score * weight
bar = "" * int(score / 5)
lines.append(f"| {cat.capitalize():10} | {score:5.1f} | {weight:.2f} | {contrib:5.1f} | {bar}")
lines.extend(["", "## Interpretation", ""])
if self.composite >= 80:
verdict = "Excellent — review is thorough across all categories."
elif self.composite >= 60:
verdict = "Good — major areas covered, some gaps remain."
elif self.composite >= 40:
verdict = "Fair — several categories need more attention."
else:
verdict = "Poor — review lacks depth in multiple critical areas."
lines.append(f"- {verdict}")
lines.append("")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Core
# ---------------------------------------------------------------------------
def score_review(comments: List[str]) -> ReviewQualityReport:
"""Score a list of review comment bodies."""
# Extract individual comments (body strings)
bodies = [c.strip() for c in comments if c and c.strip()]
if not bodies:
bodies = ["(no substantive review comments found)"]
# Deduplicate to avoid counting +1 on same person repeating themselves
# Actually, keep all; depth naturally inflates with volume.
style_s = _category_score(bodies, STYLE_INDICATORS, WEIGHTS["style"])
logic_s = _category_score(bodies, LOGIC_INDICATORS, WEIGHTS["logic"])
sec_s = _category_score(bodies, SECURITY_INDICATORS, WEIGHTS["security"])
perf_s = _category_score(bodies, PERFORMANCE_INDICATORS, WEIGHTS["performance"])
tests_s = _category_score(bodies, TESTS_INDICATORS, WEIGHTS["tests"])
composite = round(
style_s * WEIGHTS["style"] +
logic_s * WEIGHTS["logic"] +
sec_s * WEIGHTS["security"] +
perf_s * WEIGHTS["performance"] +
tests_s * WEIGHTS["tests"],
1
)
return ReviewQualityReport(
style=style_s,
logic=logic_s,
security=sec_s,
performance=perf_s,
tests=tests_s,
composite=composite,
breakdown={k: round(v, 1) for k, v in [
("style", style_s), ("logic", logic_s), ("security", sec_s),
("performance", perf_s), ("tests", tests_s)]},
comment_count=len(bodies),
review_count=len(set(bodies)) # Approx unique reviews
)
# ---------------------------------------------------------------------------
# Gitea integration (optional — fetch PR comments)
# ---------------------------------------------------------------------------
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
class GiteaClient:
def __init__(self, token: str):
self.token = token
self.base_url = GITEA_BASE.rstrip("/")
def _request(self, path: str, params: Dict = None) -> Any:
url = f"{self.base_url}{path}"
if params:
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
url += f"?{qs}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
return None
except urllib.error.URLError as e:
print(f"Network error: {e}", file=sys.stderr)
return None
def get_pr_comments(self, org: str, repo: str, pr_number: int) -> List[Dict]:
"""Fetch review comments (not PR discussion comments)."""
comments = []
page = 1
while True:
batch = self._request(
f"/repos/{org}/{repo}/pulls/{pr_number}/comments",
{"limit": 100, "page": page}
)
if not batch:
break
comments.extend(batch)
if len(batch) < 100:
break
page += 1
return comments
def fetch_review_comments(org: str, repo: str, pr_number: int, token: str) -> List[str]:
client = GiteaClient(token)
raw = client.get_pr_comments(org, repo, pr_number)
# Each comment object: {body, user, ...}
return [c.get("body", "") for c in raw if c.get("body")]
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(description="Review Quality Scorer")
parser.add_argument("--input", help="JSON file with review findings (list of strings)")
parser.add_argument("--pr", type=int, help="PR number to fetch from Gitea")
parser.add_argument("--org", default="Timmy_Foundation", help="Gitea org")
parser.add_argument("--repo", default="compounding-intelligence", help="Gitea repo")
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
parser.add_argument("--output", default="metrics/review_quality_report.json", help="Output path")
parser.add_argument("--markdown", action="store_true", help="Emit human-readable markdown to stdout")
args = parser.parse_args()
# Load token if file path
token_path = args.token
if os.path.exists(token_path):
with open(token_path) as f:
token = f.read().strip()
else:
token = args.token
# Get review bodies
if args.input:
with open(args.input) as f:
data = json.load(f)
if isinstance(data, dict) and "reviews" in data:
comments = data["reviews"]
elif isinstance(data, list):
comments = data
else:
print("ERROR: Input JSON must be a list or 'reviews' key", file=sys.stderr)
sys.exit(1)
elif args.pr:
if not token:
print("ERROR: Gitea token required for --pr", file=sys.stderr)
sys.exit(1)
comments = fetch_review_comments(args.org, args.repo, args.pr, token)
print(f"Fetched {len(comments)} review comments from PR #{args.pr}")
else:
print("ERROR: Must provide either --input or --pr", file=sys.stderr)
sys.exit(1)
# Score
report = score_review(comments)
# Output
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
with open(args.output, "w") as f:
json.dump(report.to_dict(), f, indent=2)
print(f"Report saved: {args.output}")
if args.markdown:
print(report.to_markdown())
else:
print(json.dumps(report.to_dict(), indent=2))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,170 @@
#!/usr/bin/env python3
"""
Tests for PR Complexity Scorer — unit tests for the scoring logic.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pr_complexity_scorer import (
score_pr,
is_dependency_file,
is_test_file,
TIME_PER_POINT,
SMALL_FILES,
MEDIUM_FILES,
LARGE_FILES,
SMALL_LINES,
MEDIUM_LINES,
LARGE_LINES,
)
PASS = 0
FAIL = 0
def test(name):
def decorator(fn):
global PASS, FAIL
try:
fn()
PASS += 1
print(f" [PASS] {name}")
except AssertionError as e:
FAIL += 1
print(f" [FAIL] {name}: {e}")
except Exception as e:
FAIL += 1
print(f" [FAIL] {name}: Unexpected error: {e}")
return decorator
def assert_eq(a, b, msg=""):
if a != b:
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
def assert_true(v, msg=""):
if not v:
raise AssertionError(msg or "Expected True")
def assert_false(v, msg=""):
if v:
raise AssertionError(msg or "Expected False")
print("=== PR Complexity Scorer Tests ===\n")
print("-- File Classification --")
@test("dependency file detection — requirements.txt")
def _():
assert_true(is_dependency_file("requirements.txt"))
assert_true(is_dependency_file("src/requirements.txt"))
assert_false(is_dependency_file("requirements_test.txt"))
@test("dependency file detection — pyproject.toml")
def _():
assert_true(is_dependency_file("pyproject.toml"))
assert_false(is_dependency_file("myproject.py"))
@test("test file detection — pytest style")
def _():
assert_true(is_test_file("tests/test_api.py"))
assert_true(is_test_file("test_module.py"))
assert_true(is_test_file("src/module_test.py"))
@test("test file detection — other frameworks")
def _():
assert_true(is_test_file("spec/feature_spec.rb"))
assert_true(is_test_file("__tests__/component.test.js"))
assert_false(is_test_file("testfixtures/helper.py"))
print("\n-- Scoring Logic --")
@test("small PR gets low score (1-3)")
def _():
score, minutes, _ = score_pr(
files_changed=3,
additions=50,
deletions=10,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(1 <= score <= 3, f"Score should be low, got {score}")
assert_true(minutes < 20)
@test("medium PR gets medium score (4-6)")
def _():
score, minutes, _ = score_pr(
files_changed=15,
additions=400,
deletions=100,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(4 <= score <= 6, f"Score should be medium, got {score}")
assert_true(20 <= minutes <= 45)
@test("large PR gets high score (7-9)")
def _():
score, minutes, _ = score_pr(
files_changed=60,
additions=3000,
deletions=1500,
has_dependency_changes=True,
test_coverage_delta=None
)
assert_true(7 <= score <= 9, f"Score should be high, got {score}")
assert_true(minutes >= 45)
@test("dependency changes boost score")
def _():
base_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=False, test_coverage_delta=None
)
dep_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=True, test_coverage_delta=None
)
assert_true(dep_score > base_score, f"Deps: {base_score} -> {dep_score}")
@test("adding tests lowers complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
better_score, _, _ = score_pr(
files_changed=8, additions=180, deletions=20,
has_dependency_changes=False, test_coverage_delta=3
)
assert_true(better_score < base_score, f"Tests: {base_score} -> {better_score}")
@test("removing tests increases complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
worse_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=-2
)
assert_true(worse_score > base_score, f"Remove tests: {base_score} -> {worse_score}")
@test("score bounded 1-10")
def _():
for files, adds, dels in [(1, 10, 5), (100, 10000, 5000)]:
score, _, _ = score_pr(files, adds, dels, False, None)
assert_true(1 <= score <= 10, f"Score {score} out of range")
@test("estimated minutes exist for all scores")
def _():
for s in range(1, 11):
assert_true(s in TIME_PER_POINT, f"Missing time for score {s}")
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
sys.exit(0 if FAIL == 0 else 1)

View File

@@ -0,0 +1,204 @@
#!/usr/bin/env python3
"""
Tests for Review Quality Scorer — unit tests for the scoring logic.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from review_quality_scorer import (
score_review,
_category_presence_score,
_depth_score,
_category_score,
ReviewQualityReport,
STYLE_INDICATORS,
LOGIC_INDICATORS,
SECURITY_INDICATORS,
PERFORMANCE_INDICATORS,
TESTS_INDICATORS,
)
PASS = 0
FAIL = 0
def test(name):
def decorator(fn):
global PASS, FAIL
try:
fn()
PASS += 1
print(f" [PASS] {name}")
except AssertionError as e:
FAIL += 1
print(f" [FAIL] {name}: {e}")
except Exception as e:
FAIL += 1
print(f" [FAIL] {name}: Unexpected error: {e}")
return decorator
def assert_eq(a, b, msg=""):
if abs(a - b) > 0.1:
raise AssertionError(f"{msg} expected ~{b!r}, got {a!r}")
def assert_true(v, msg=""):
if not v:
raise AssertionError(msg or "Expected True")
def assert_false(v, msg=""):
if v:
raise AssertionError(msg or "Expected False")
print("=== Review Quality Scorer Tests ===\n")
print("-- Category Presence --")
@test("style keywords detected")
def _():
score = _category_presence_score(
["Please fix the formatting and run black."],
STYLE_INDICATORS
)
assert_true(score > 0, f"Style score should be > 0, got {score}")
@test("logic keywords detected")
def _():
score = _category_presence_score(
["Consider the edge case when input is empty.", "This algorithm is O(n^2)"],
LOGIC_INDICATORS
)
assert_true(score > 0, f"Logic score should be > 0")
@test("security keywords detected")
def _():
score = _category_presence_score(
["Potential SQL injection here.", "Don't hardcode secrets"],
SECURITY_INDICATORS
)
assert_true(score > 0)
@test("performance keywords detected")
def _():
score = _category_presence_score(
["This loop is a bottleneck.", "Memory usage could be optimized"],
PERFORMANCE_INDICATORS
)
assert_true(score > 0)
@test("tests keywords detected")
def _():
score = _category_presence_score(
["Add tests for this branch.", "Missing test coverage here"],
TESTS_INDICATORS
)
assert_true(score > 0)
@test("no keywords → score 0")
def _():
score = _category_presence_score(
["Looks good to me."],
STYLE_INDICATORS
)
assert_eq(score, 0.0)
print("\n-- Depth Scoring --")
@test("shallow comment → low depth")
def _():
d = _depth_score(["OK"])
assert_eq(d, 0.0)
@test("substantive comment → positive depth")
def _():
d = _depth_score([
"Please consider updating this logic: when x is zero we divide by zero. "
"Why not add an early return? This would fix the edge case."
])
assert_true(d > 0.3)
print("\n-- Category Score Integration --")
@test("thorough style review scores high on style")
def _():
comments = [
"The indentation is inconsistent — please run black to auto-format.",
"Function names are camelCase but should be snake_case per PEP8.",
"Trailing whitespace on several lines — please clean up.",
"Missing .gitignore would accidentally commit __pycache__ and .venv.",
"Consider adding a linter (flake8) to catch these style issues early.",
]
rpt = score_review(comments)
assert_true(rpt.style >= 50, f"Style score should be >= 50, got {rpt.style}")
@test("thorough logic review scores high on logic")
def _():
comments = [
"What happens if the input list is empty? The algorithm would crash.",
"This nested loop is O(n^2). Could we use a dictionary for O(n)?",
"Negative numbers aren't handled — possible overflow.",
"Consider the edge case where the user passes None.",
"Please add input validation at the start of the function.",
"Why not extract this into a pure function for easier testing?",
]
rpt = score_review(comments)
assert_true(rpt.logic >= 50)
@test("thorough security review scores high on security")
def _():
comments = [
"The SQL query uses string concatenation — vulnerable to SQL injection.",
"API token is hardcoded in source — move to environment variables.",
"Check for XSS when rendering user-provided HTML.",
"Are we validating all user inputs before processing?",
"Consider rate limiting to prevent abuse.",
"Ensure secrets are never committed to the repository.",
]
rpt = score_review(comments)
assert_true(rpt.security >= 50)
@test("combines multiple categories")
def _():
comments = [
"Please run black to auto-format. Also, the O(n²) loop here will hurt performance on large inputs.",
"Security risk: hardcoded API token. Style: inconsistent indentation. Logic: missing null check could crash.",
"Missing test coverage for edge cases. Also consider caching the result to improve performance.",
"Naming violates PEP8 (style). Edge case: negative inputs cause overflow (logic). Potential XSS when rendering user HTML (security).",
"Run a linter (style), add unit tests (tests), and check for memory leaks (performance).",
]
rpt = score_review(comments)
assert_true(rpt.style >= 50)
assert_true(rpt.logic >= 40)
assert_true(rpt.security >= 50)
assert_true(rpt.performance >= 50)
assert_true(rpt.tests >= 50)
@test("composite is weighted average")
def _():
# Generate a known distribution to verify math
comments = ["x"] * 20 # very shallow
rpt = score_review(comments)
# All categories should be equal-ish
assert_true(0 <= rpt.composite <= 100)
print("\n-- Edge Cases --")
@test("empty comments produces non-zero baseline")
def _():
rpt = score_review([])
assert_true(rpt.composite >= 0)
@test("single one-word comment → very low")
def _():
rpt = score_review(["OK"])
assert_true(rpt.composite < 40)
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
sys.exit(0 if FAIL == 0 else 1)