#!/usr/bin/env python3 """ Enforce reviewer assignment on pull requests. Part of Issue #1127 implementation. """ import json import os import sys import urllib.request from typing import Dict, List, Any, Optional # Configuration GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1" TOKEN_PATH = os.path.expanduser("~/.config/gitea/token") class ReviewerEnforcer: def __init__(self): self.token = self._load_token() self.org = "Timmy_Foundation" def _load_token(self) -> str: """Load Gitea API token.""" try: with open(TOKEN_PATH, "r") as f: return f.read().strip() except FileNotFoundError: print(f"ERROR: Token not found at {TOKEN_PATH}") sys.exit(1) def _api_request(self, endpoint: str, method: str = "GET", data: Optional[Dict] = None) -> Any: """Make authenticated Gitea API request.""" url = f"{GITEA_BASE}{endpoint}" headers = { "Authorization": f"token {self.token}", "Content-Type": "application/json" } req = urllib.request.Request(url, headers=headers, method=method) if data: req.data = json.dumps(data).encode() try: with urllib.request.urlopen(req) as resp: if resp.status == 204: # No content return {"status": "success", "code": resp.status} return json.loads(resp.read()) except urllib.error.HTTPError as e: error_body = e.read().decode() if e.fp else "No error body" print(f"API Error {e.code}: {error_body}") return {"error": e.code, "message": error_body} def get_open_prs(self, repo: str) -> List[Dict]: """Get open PRs for a repository.""" endpoint = f"/repos/{self.org}/{repo}/pulls?state=open" prs = self._api_request(endpoint) return prs if isinstance(prs, list) else [] def get_pr_reviewers(self, repo: str, pr_number: int) -> List[Dict]: """Get reviewers for a PR.""" endpoint = f"/repos/{self.org}/{repo}/pulls/{pr_number}/reviews" reviews = self._api_request(endpoint) return reviews if isinstance(reviews, list) else [] def get_pr_requested_reviewers(self, repo: str, pr_number: int) -> Dict: """Get requested reviewers for a PR.""" endpoint = f"/repos/{self.org}/{repo}/pulls/{pr_number}/requested_reviewers" return self._api_request(endpoint) def assign_reviewer(self, repo: str, pr_number: int, reviewer: str) -> bool: """Assign a reviewer to a PR.""" endpoint = f"/repos/{self.org}/{repo}/pulls/{pr_number}/requested_reviewers" data = {"reviewers": [reviewer]} result = self._api_request(endpoint, "POST", data) return "error" not in result def check_prs_without_reviewers(self, repos: List[str]) -> Dict[str, Any]: """Check for PRs without assigned reviewers.""" results = { "repos": {}, "summary": { "total_prs": 0, "prs_without_reviewers": 0, "repos_checked": len(repos) } } for repo in repos: prs = self.get_open_prs(repo) results["repos"][repo] = { "total_prs": len(prs), "prs_without_reviewers": [], "prs_with_reviewers": [] } results["summary"]["total_prs"] += len(prs) for pr in prs: pr_number = pr["number"] pr_title = pr["title"] # Check for requested reviewers requested = self.get_pr_requested_reviewers(repo, pr_number) has_requested = len(requested.get("users", [])) > 0 # Check for existing reviews reviews = self.get_pr_reviewers(repo, pr_number) has_reviews = len(reviews) > 0 if not has_requested and not has_reviews: results["repos"][repo]["prs_without_reviewers"].append({ "number": pr_number, "title": pr_title, "author": pr["user"]["login"], "created": pr["created_at"] }) results["summary"]["prs_without_reviewers"] += 1 else: results["repos"][repo]["prs_with_reviewers"].append({ "number": pr_number, "title": pr_title, "has_requested": has_requested, "has_reviews": has_reviews }) return results def generate_report(self, results: Dict[str, Any]) -> str: """Generate a markdown report of reviewer check results.""" report = "# PR Reviewer Assignment Report\n\n" report += "## Summary\n" report += f"- **Repositories checked:** {results['summary']['repos_checked']}\n" report += f"- **Total open PRs:** {results['summary']['total_prs']}\n" report += f"- **PRs without reviewers:** {results['summary']['prs_without_reviewers']}\n\n" if results['summary']['prs_without_reviewers'] == 0: report += "✅ **All PRs have assigned reviewers.**\n" else: report += "⚠️ **PRs without assigned reviewers:**\n\n" for repo, data in results["repos"].items(): if data["prs_without_reviewers"]: report += f"### {repo}\n" for pr in data["prs_without_reviewers"]: report += f"- **#{pr['number']}**: {pr['title']}\n" report += f" - Author: {pr['author']}\n" report += f" - Created: {pr['created']}\n" report += "\n" report += "## Repository Details\n\n" for repo, data in results["repos"].items(): report += f"### {repo}\n" report += f"- **Total PRs:** {data['total_prs']}\n" report += f"- **PRs without reviewers:** {len(data['prs_without_reviewers'])}\n" report += f"- **PRs with reviewers:** {len(data['prs_with_reviewers'])}\n\n" if data['prs_with_reviewers']: report += "**PRs with reviewers:**\n" for pr in data['prs_with_reviewers']: status = "✅" if pr['has_requested'] else "⚠️" report += f"- {status} #{pr['number']}: {pr['title']}\n" report += "\n" return report def main(): """Main entry point for reviewer enforcer.""" import argparse parser = argparse.ArgumentParser(description="Check for PRs without assigned reviewers") parser.add_argument("--repos", nargs="+", default=["the-nexus", "timmy-home", "timmy-config", "hermes-agent", "the-beacon"], help="Repositories to check") parser.add_argument("--report", action="store_true", help="Generate report") parser.add_argument("--json", action="store_true", help="Output JSON instead of report") parser.add_argument("--assign", nargs=2, metavar=("REPO", "PR"), help="Assign a reviewer to a specific PR") parser.add_argument("--reviewer", help="Reviewer to assign (e.g., @perplexity)") args = parser.parse_args() enforcer = ReviewerEnforcer() if args.assign: # Assign reviewer to specific PR repo, pr_number = args.assign reviewer = args.reviewer or "@perplexity" if enforcer.assign_reviewer(repo, int(pr_number), reviewer): print(f"✅ Assigned {reviewer} as reviewer to {repo} #{pr_number}") else: print(f"❌ Failed to assign reviewer to {repo} #{pr_number}") sys.exit(1) else: # Check for PRs without reviewers results = enforcer.check_prs_without_reviewers(args.repos) if args.json: print(json.dumps(results, indent=2)) elif args.report: report = enforcer.generate_report(results) print(report) else: # Default: show summary print(f"Checked {results['summary']['repos_checked']} repositories") print(f"Total open PRs: {results['summary']['total_prs']}") print(f"PRs without reviewers: {results['summary']['prs_without_reviewers']}") if results['summary']['prs_without_reviewers'] > 0: print("\nPRs without reviewers:") for repo, data in results["repos"].items(): if data["prs_without_reviewers"]: for pr in data["prs_without_reviewers"]: print(f" {repo} #{pr['number']}: {pr['title']}") sys.exit(1) else: print("\n✅ All PRs have assigned reviewers") sys.exit(0) if __name__ == "__main__": main()