Files
the-nexus/bin/enforce_reviewer_assignment.py
Alexander Whitestone 8755f455b1
Some checks failed
Deploy Nexus / deploy (push) Failing after 4s
Staging Verification Gate / verify-staging (push) Failing after 5s
feat: implement Issue #1127 triage recommendations (#1403)
Merge PR #1403
2026-04-14 22:11:12 +00:00

223 lines
9.0 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Enforce reviewer assignment on pull requests.
Part of Issue #1127 implementation.
"""
import json
import os
import sys
import urllib.request
from typing import Dict, List, Any, Optional
# Configuration
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
TOKEN_PATH = os.path.expanduser("~/.config/gitea/token")
class ReviewerEnforcer:
def __init__(self):
self.token = self._load_token()
self.org = "Timmy_Foundation"
def _load_token(self) -> str:
"""Load Gitea API token."""
try:
with open(TOKEN_PATH, "r") as f:
return f.read().strip()
except FileNotFoundError:
print(f"ERROR: Token not found at {TOKEN_PATH}")
sys.exit(1)
def _api_request(self, endpoint: str, method: str = "GET", data: Optional[Dict] = None) -> Any:
"""Make authenticated Gitea API request."""
url = f"{GITEA_BASE}{endpoint}"
headers = {
"Authorization": f"token {self.token}",
"Content-Type": "application/json"
}
req = urllib.request.Request(url, headers=headers, method=method)
if data:
req.data = json.dumps(data).encode()
try:
with urllib.request.urlopen(req) as resp:
if resp.status == 204: # No content
return {"status": "success", "code": resp.status}
return json.loads(resp.read())
except urllib.error.HTTPError as e:
error_body = e.read().decode() if e.fp else "No error body"
print(f"API Error {e.code}: {error_body}")
return {"error": e.code, "message": error_body}
def get_open_prs(self, repo: str) -> List[Dict]:
"""Get open PRs for a repository."""
endpoint = f"/repos/{self.org}/{repo}/pulls?state=open"
prs = self._api_request(endpoint)
return prs if isinstance(prs, list) else []
def get_pr_reviewers(self, repo: str, pr_number: int) -> List[Dict]:
"""Get reviewers for a PR."""
endpoint = f"/repos/{self.org}/{repo}/pulls/{pr_number}/reviews"
reviews = self._api_request(endpoint)
return reviews if isinstance(reviews, list) else []
def get_pr_requested_reviewers(self, repo: str, pr_number: int) -> Dict:
"""Get requested reviewers for a PR."""
endpoint = f"/repos/{self.org}/{repo}/pulls/{pr_number}/requested_reviewers"
return self._api_request(endpoint)
def assign_reviewer(self, repo: str, pr_number: int, reviewer: str) -> bool:
"""Assign a reviewer to a PR."""
endpoint = f"/repos/{self.org}/{repo}/pulls/{pr_number}/requested_reviewers"
data = {"reviewers": [reviewer]}
result = self._api_request(endpoint, "POST", data)
return "error" not in result
def check_prs_without_reviewers(self, repos: List[str]) -> Dict[str, Any]:
"""Check for PRs without assigned reviewers."""
results = {
"repos": {},
"summary": {
"total_prs": 0,
"prs_without_reviewers": 0,
"repos_checked": len(repos)
}
}
for repo in repos:
prs = self.get_open_prs(repo)
results["repos"][repo] = {
"total_prs": len(prs),
"prs_without_reviewers": [],
"prs_with_reviewers": []
}
results["summary"]["total_prs"] += len(prs)
for pr in prs:
pr_number = pr["number"]
pr_title = pr["title"]
# Check for requested reviewers
requested = self.get_pr_requested_reviewers(repo, pr_number)
has_requested = len(requested.get("users", [])) > 0
# Check for existing reviews
reviews = self.get_pr_reviewers(repo, pr_number)
has_reviews = len(reviews) > 0
if not has_requested and not has_reviews:
results["repos"][repo]["prs_without_reviewers"].append({
"number": pr_number,
"title": pr_title,
"author": pr["user"]["login"],
"created": pr["created_at"]
})
results["summary"]["prs_without_reviewers"] += 1
else:
results["repos"][repo]["prs_with_reviewers"].append({
"number": pr_number,
"title": pr_title,
"has_requested": has_requested,
"has_reviews": has_reviews
})
return results
def generate_report(self, results: Dict[str, Any]) -> str:
"""Generate a markdown report of reviewer check results."""
report = "# PR Reviewer Assignment Report\n\n"
report += "## Summary\n"
report += f"- **Repositories checked:** {results['summary']['repos_checked']}\n"
report += f"- **Total open PRs:** {results['summary']['total_prs']}\n"
report += f"- **PRs without reviewers:** {results['summary']['prs_without_reviewers']}\n\n"
if results['summary']['prs_without_reviewers'] == 0:
report += "✅ **All PRs have assigned reviewers.**\n"
else:
report += "⚠️ **PRs without assigned reviewers:**\n\n"
for repo, data in results["repos"].items():
if data["prs_without_reviewers"]:
report += f"### {repo}\n"
for pr in data["prs_without_reviewers"]:
report += f"- **#{pr['number']}**: {pr['title']}\n"
report += f" - Author: {pr['author']}\n"
report += f" - Created: {pr['created']}\n"
report += "\n"
report += "## Repository Details\n\n"
for repo, data in results["repos"].items():
report += f"### {repo}\n"
report += f"- **Total PRs:** {data['total_prs']}\n"
report += f"- **PRs without reviewers:** {len(data['prs_without_reviewers'])}\n"
report += f"- **PRs with reviewers:** {len(data['prs_with_reviewers'])}\n\n"
if data['prs_with_reviewers']:
report += "**PRs with reviewers:**\n"
for pr in data['prs_with_reviewers']:
status = "" if pr['has_requested'] else "⚠️"
report += f"- {status} #{pr['number']}: {pr['title']}\n"
report += "\n"
return report
def main():
"""Main entry point for reviewer enforcer."""
import argparse
parser = argparse.ArgumentParser(description="Check for PRs without assigned reviewers")
parser.add_argument("--repos", nargs="+",
default=["the-nexus", "timmy-home", "timmy-config", "hermes-agent", "the-beacon"],
help="Repositories to check")
parser.add_argument("--report", action="store_true", help="Generate report")
parser.add_argument("--json", action="store_true", help="Output JSON instead of report")
parser.add_argument("--assign", nargs=2, metavar=("REPO", "PR"),
help="Assign a reviewer to a specific PR")
parser.add_argument("--reviewer", help="Reviewer to assign (e.g., @perplexity)")
args = parser.parse_args()
enforcer = ReviewerEnforcer()
if args.assign:
# Assign reviewer to specific PR
repo, pr_number = args.assign
reviewer = args.reviewer or "@perplexity"
if enforcer.assign_reviewer(repo, int(pr_number), reviewer):
print(f"✅ Assigned {reviewer} as reviewer to {repo} #{pr_number}")
else:
print(f"❌ Failed to assign reviewer to {repo} #{pr_number}")
sys.exit(1)
else:
# Check for PRs without reviewers
results = enforcer.check_prs_without_reviewers(args.repos)
if args.json:
print(json.dumps(results, indent=2))
elif args.report:
report = enforcer.generate_report(results)
print(report)
else:
# Default: show summary
print(f"Checked {results['summary']['repos_checked']} repositories")
print(f"Total open PRs: {results['summary']['total_prs']}")
print(f"PRs without reviewers: {results['summary']['prs_without_reviewers']}")
if results['summary']['prs_without_reviewers'] > 0:
print("\nPRs without reviewers:")
for repo, data in results["repos"].items():
if data["prs_without_reviewers"]:
for pr in data["prs_without_reviewers"]:
print(f" {repo} #{pr['number']}: {pr['title']}")
sys.exit(1)
else:
print("\n✅ All PRs have assigned reviewers")
sys.exit(0)
if __name__ == "__main__":
main()