317 lines
12 KiB
Python
317 lines
12 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
"""
|
|||
|
|
Admin Actions Toolkit for Issue #1255
|
|||
|
|
Provides scripts and documentation for repo-owner admin actions.
|
|||
|
|
|
|||
|
|
Issue #1255: [IaC] Admin actions for Rockachopa — branch protection, cron setup, PR merge
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import json
|
|||
|
|
import os
|
|||
|
|
import sys
|
|||
|
|
import urllib.request
|
|||
|
|
from typing import Dict, List, Any, Optional
|
|||
|
|
|
|||
|
|
# Configuration
|
|||
|
|
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
|
|||
|
|
TOKEN_PATH = os.path.expanduser("~/.config/gitea/token")
|
|||
|
|
ORG = "Timmy_Foundation"
|
|||
|
|
REPO = "the-nexus"
|
|||
|
|
|
|||
|
|
class AdminActions:
|
|||
|
|
def __init__(self, admin_token: Optional[str] = None):
|
|||
|
|
self.admin_token = admin_token or self._load_token()
|
|||
|
|
|
|||
|
|
def _load_token(self) -> str:
|
|||
|
|
"""Load Gitea API token."""
|
|||
|
|
try:
|
|||
|
|
with open(TOKEN_PATH, "r") as f:
|
|||
|
|
return f.read().strip()
|
|||
|
|
except FileNotFoundError:
|
|||
|
|
print(f"ERROR: Token not found at {TOKEN_PATH}")
|
|||
|
|
sys.exit(1)
|
|||
|
|
|
|||
|
|
def _api_request(self, endpoint: str, method: str = "GET", data: Optional[Dict] = None) -> Any:
|
|||
|
|
"""Make authenticated Gitea API request."""
|
|||
|
|
url = f"{GITEA_BASE}{endpoint}"
|
|||
|
|
headers = {
|
|||
|
|
"Authorization": f"token {self.admin_token}",
|
|||
|
|
"Content-Type": "application/json"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
req = urllib.request.Request(url, headers=headers, method=method)
|
|||
|
|
if data:
|
|||
|
|
req.data = json.dumps(data).encode()
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
with urllib.request.urlopen(req) as resp:
|
|||
|
|
if resp.status == 204: # No content
|
|||
|
|
return {"status": "success", "code": resp.status}
|
|||
|
|
return json.loads(resp.read())
|
|||
|
|
except urllib.error.HTTPError as e:
|
|||
|
|
error_body = e.read().decode() if e.fp else "No error body"
|
|||
|
|
print(f"API Error {e.code}: {error_body}")
|
|||
|
|
return {"error": e.code, "message": error_body}
|
|||
|
|
|
|||
|
|
def check_branch_protection(self, branch: str = "main") -> Dict[str, Any]:
|
|||
|
|
"""Check current branch protection settings."""
|
|||
|
|
endpoint = f"/repos/{ORG}/{REPO}/branch_protection/{branch}"
|
|||
|
|
result = self._api_request(endpoint)
|
|||
|
|
|
|||
|
|
if isinstance(result, dict) and "error" in result:
|
|||
|
|
return {"error": result["error"], "protected": False}
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
"protected": True,
|
|||
|
|
"settings": result
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
def enable_rebase_before_merge(self, branch: str = "main") -> Dict[str, Any]:
|
|||
|
|
"""Enable rebase-before-merge on branch."""
|
|||
|
|
# Get current settings
|
|||
|
|
current = self.check_branch_protection(branch)
|
|||
|
|
|
|||
|
|
if not current.get("protected"):
|
|||
|
|
# Create new branch protection
|
|||
|
|
data = {
|
|||
|
|
"branch_name": branch,
|
|||
|
|
"enable_push": False,
|
|||
|
|
"require_signed_commits": False,
|
|||
|
|
"block_on_outdated_branch": True,
|
|||
|
|
"required_approvals": 1,
|
|||
|
|
"dismiss_stale_reviews": True,
|
|||
|
|
"require_code_owner_reviews": True
|
|||
|
|
}
|
|||
|
|
endpoint = f"/repos/{ORG}/{REPO}/branch_protection"
|
|||
|
|
return self._api_request(endpoint, "POST", data)
|
|||
|
|
else:
|
|||
|
|
# Update existing branch protection
|
|||
|
|
settings = current["settings"]
|
|||
|
|
settings["block_on_outdated_branch"] = True
|
|||
|
|
|
|||
|
|
endpoint = f"/repos/{ORG}/{REPO}/branch_protection/{branch}"
|
|||
|
|
return self._api_request(endpoint, "PATCH", settings)
|
|||
|
|
|
|||
|
|
def check_pr_status(self, pr_number: int) -> Dict[str, Any]:
|
|||
|
|
"""Check if a PR is merged or open."""
|
|||
|
|
endpoint = f"/repos/{ORG}/{REPO}/pulls/{pr_number}"
|
|||
|
|
result = self._api_request(endpoint)
|
|||
|
|
|
|||
|
|
if isinstance(result, dict) and "error" in result:
|
|||
|
|
return {"error": result["error"]}
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
"number": result["number"],
|
|||
|
|
"title": result["title"],
|
|||
|
|
"state": result["state"],
|
|||
|
|
"merged": result.get("merged", False),
|
|||
|
|
"merged_at": result.get("merged_at"),
|
|||
|
|
"html_url": result["html_url"]
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
def merge_pr(self, pr_number: int, merge_method: str = "rebase") -> Dict[str, Any]:
|
|||
|
|
"""Merge a PR."""
|
|||
|
|
endpoint = f"/repos/{ORG}/{REPO}/pulls/{pr_number}/merge"
|
|||
|
|
data = {
|
|||
|
|
"Do": merge_method,
|
|||
|
|
"MergeMessageField": "Merge PR",
|
|||
|
|
"MergeTitleField": f"Merge PR #{pr_number}",
|
|||
|
|
"ForceMerge": False
|
|||
|
|
}
|
|||
|
|
return self._api_request(endpoint, "PUT", data)
|
|||
|
|
|
|||
|
|
def generate_setup_script(self) -> str:
|
|||
|
|
"""Generate setup script for admin actions."""
|
|||
|
|
script = """#!/bin/bash
|
|||
|
|
# Admin Actions Setup Script for Issue #1255
|
|||
|
|
# Run this script as repo owner (@Rockachopa)
|
|||
|
|
|
|||
|
|
set -euo pipefail
|
|||
|
|
|
|||
|
|
echo "=========================================="
|
|||
|
|
echo "Admin Actions Setup for the-nexus"
|
|||
|
|
echo "=========================================="
|
|||
|
|
|
|||
|
|
# Configuration
|
|||
|
|
REPO="Timmy_Foundation/the-nexus"
|
|||
|
|
ADMIN_TOKEN="${GITEA_ADMIN_TOKEN:-}"
|
|||
|
|
|
|||
|
|
if [ -z "$ADMIN_TOKEN" ]; then
|
|||
|
|
echo "ERROR: GITEA_ADMIN_TOKEN environment variable not set"
|
|||
|
|
echo "Set it with: export GITEA_ADMIN_TOKEN=<your_admin_token>"
|
|||
|
|
exit 1
|
|||
|
|
fi
|
|||
|
|
|
|||
|
|
# 1. Enable rebase-before-merge on main
|
|||
|
|
echo ""
|
|||
|
|
echo "1. Enabling rebase-before-merge on main branch..."
|
|||
|
|
curl -X POST \\
|
|||
|
|
-H "Authorization: token $ADMIN_TOKEN" \\
|
|||
|
|
-H "Content-Type: application/json" \\
|
|||
|
|
"https://forge.alexanderwhitestone.com/api/v1/repos/$REPO/branch_protection" \\
|
|||
|
|
-d '{
|
|||
|
|
"branch_name": "main",
|
|||
|
|
"enable_push": false,
|
|||
|
|
"require_signed_commits": false,
|
|||
|
|
"block_on_outdated_branch": true,
|
|||
|
|
"required_approvals": 1,
|
|||
|
|
"dismiss_stale_reviews": true,
|
|||
|
|
"require_code_owner_reviews": true
|
|||
|
|
}'
|
|||
|
|
|
|||
|
|
echo ""
|
|||
|
|
echo "✅ Branch protection configured"
|
|||
|
|
|
|||
|
|
# 2. Check PR #1254 status
|
|||
|
|
echo ""
|
|||
|
|
echo "2. Checking PR #1254 status..."
|
|||
|
|
PR_STATUS=$(curl -s -H "Authorization: token $ADMIN_TOKEN" \\
|
|||
|
|
"https://forge.alexanderwhitestone.com/api/v1/repos/$REPO/pulls/1254" | jq -r '.state')
|
|||
|
|
|
|||
|
|
if [ "$PR_STATUS" = "open" ]; then
|
|||
|
|
echo "PR #1254 is open. Consider reviewing and merging."
|
|||
|
|
echo "URL: https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/pulls/1254"
|
|||
|
|
elif [ "$PR_STATUS" = "closed" ]; then
|
|||
|
|
echo "PR #1254 is already closed."
|
|||
|
|
else
|
|||
|
|
echo "Could not determine PR #1254 status"
|
|||
|
|
fi
|
|||
|
|
|
|||
|
|
# 3. Set up stale-pr-closer cron
|
|||
|
|
echo ""
|
|||
|
|
echo "3. Setting up stale-pr-closer cron..."
|
|||
|
|
echo "After merging PR #1254, add this to crontab:"
|
|||
|
|
echo ""
|
|||
|
|
echo "# Stale PR closer - runs every 6 hours"
|
|||
|
|
echo "0 */6 * * * GITEA_TOKEN=\\"$ADMIN_TOKEN\\" REPO=\\"$REPO\\" /path/to/the-nexus/.githooks/stale-pr-closer.sh >> /var/log/stale-pr-closer.log 2>&1"
|
|||
|
|
echo ""
|
|||
|
|
echo "Test with dry run first:"
|
|||
|
|
echo "GITEA_TOKEN=\\"$ADMIN_TOKEN\\" DRY_RUN=true .githooks/stale-pr-closer.sh"
|
|||
|
|
|
|||
|
|
# 4. Optional: Grant admin access to perplexity
|
|||
|
|
echo ""
|
|||
|
|
echo "4. Optional: Grant admin access to perplexity"
|
|||
|
|
echo "To grant admin access to @perplexity:"
|
|||
|
|
echo "1. Go to: https://forge.alexanderwhitestone.com/$REPO/settings/collaborators"
|
|||
|
|
echo "2. Find @perplexity"
|
|||
|
|
echo "3. Change role to Admin"
|
|||
|
|
echo ""
|
|||
|
|
echo "This allows @perplexity to handle branch protection and repo settings."
|
|||
|
|
|
|||
|
|
echo ""
|
|||
|
|
echo "=========================================="
|
|||
|
|
echo "Setup complete!"
|
|||
|
|
echo "=========================================="
|
|||
|
|
"""
|
|||
|
|
return script
|
|||
|
|
|
|||
|
|
def generate_report(self) -> str:
|
|||
|
|
"""Generate admin actions report."""
|
|||
|
|
report = "# Admin Actions Report for Issue #1255\n\n"
|
|||
|
|
report += f"Generated: {__import__('datetime').datetime.now().isoformat()}\n\n"
|
|||
|
|
|
|||
|
|
# Check branch protection
|
|||
|
|
report += "## 1. Branch Protection Status\n"
|
|||
|
|
protection = self.check_branch_protection("main")
|
|||
|
|
|
|||
|
|
if protection.get("protected"):
|
|||
|
|
settings = protection["settings"]
|
|||
|
|
report += "✅ Branch protection is enabled\n"
|
|||
|
|
report += f"- Require PR: {settings.get('required_approvals', 'N/A')}\n"
|
|||
|
|
report += f"- Dismiss stale reviews: {settings.get('dismiss_stale_reviews', 'N/A')}\n"
|
|||
|
|
report += f"- Block on outdated branch: {settings.get('block_on_outdated_branch', 'N/A')}\n"
|
|||
|
|
else:
|
|||
|
|
report += "❌ Branch protection is NOT enabled\n"
|
|||
|
|
report += "Action required: Enable branch protection on main\n"
|
|||
|
|
|
|||
|
|
# Check PR #1254
|
|||
|
|
report += "\n## 2. PR #1254 Status\n"
|
|||
|
|
pr_status = self.check_pr_status(1254)
|
|||
|
|
|
|||
|
|
if "error" in pr_status:
|
|||
|
|
report += f"❌ Could not check PR #1254: {pr_status['error']}\n"
|
|||
|
|
else:
|
|||
|
|
if pr_status["merged"]:
|
|||
|
|
report += f"✅ PR #1254 is merged\n"
|
|||
|
|
report += f"- Merged at: {pr_status['merged_at']}\n"
|
|||
|
|
elif pr_status["state"] == "open":
|
|||
|
|
report += f"⚠️ PR #1254 is open\n"
|
|||
|
|
report += f"- Title: {pr_status['title']}\n"
|
|||
|
|
report += f"- URL: {pr_status['html_url']}\n"
|
|||
|
|
report += "Action required: Review and merge PR #1254\n"
|
|||
|
|
else:
|
|||
|
|
report += f"ℹ️ PR #1254 is {pr_status['state']}\n"
|
|||
|
|
|
|||
|
|
# Recommendations
|
|||
|
|
report += "\n## 3. Recommendations\n"
|
|||
|
|
if not protection.get("protected"):
|
|||
|
|
report += "1. **Enable branch protection** on main with rebase-before-merge\n"
|
|||
|
|
if pr_status.get("state") == "open":
|
|||
|
|
report += "2. **Review and merge PR #1254**\n"
|
|||
|
|
|
|||
|
|
report += "3. **Set up stale-pr-closer cron** on Hermes\n"
|
|||
|
|
report += "4. **Grant admin access to @perplexity** (optional)\n"
|
|||
|
|
|
|||
|
|
return report
|
|||
|
|
|
|||
|
|
|
|||
|
|
def main():
|
|||
|
|
"""Main entry point."""
|
|||
|
|
import argparse
|
|||
|
|
|
|||
|
|
parser = argparse.ArgumentParser(description="Admin Actions Toolkit for Issue #1255")
|
|||
|
|
parser.add_argument("--check", action="store_true", help="Check current status")
|
|||
|
|
parser.add_argument("--enable-rebase", action="store_true", help="Enable rebase-before-merge")
|
|||
|
|
parser.add_argument("--check-pr", type=int, metavar=("PR",), help="Check PR status")
|
|||
|
|
parser.add_argument("--generate-script", action="store_true", help="Generate setup script")
|
|||
|
|
parser.add_argument("--report", action="store_true", help="Generate report")
|
|||
|
|
|
|||
|
|
args = parser.parse_args()
|
|||
|
|
|
|||
|
|
admin = AdminActions()
|
|||
|
|
|
|||
|
|
if args.check:
|
|||
|
|
# Check current status
|
|||
|
|
protection = admin.check_branch_protection("main")
|
|||
|
|
pr_status = admin.check_pr_status(1254)
|
|||
|
|
|
|||
|
|
print("Current Status:")
|
|||
|
|
print(f" Branch protection: {'Enabled' if protection.get('protected') else 'Disabled'}")
|
|||
|
|
print(f" PR #1254: {pr_status.get('state', 'unknown')}")
|
|||
|
|
|
|||
|
|
elif args.enable_rebase:
|
|||
|
|
# Enable rebase-before-merge
|
|||
|
|
result = admin.enable_rebase_before_merge("main")
|
|||
|
|
if "error" in result:
|
|||
|
|
print(f"❌ Failed to enable rebase-before-merge: {result['error']}")
|
|||
|
|
sys.exit(1)
|
|||
|
|
else:
|
|||
|
|
print("✅ Rebase-before-merge enabled on main")
|
|||
|
|
|
|||
|
|
elif args.check_pr:
|
|||
|
|
# Check specific PR
|
|||
|
|
pr_status = admin.check_pr_status(args.check_pr)
|
|||
|
|
if "error" in pr_status:
|
|||
|
|
print(f"❌ Could not check PR #{args.check_pr}: {pr_status['error']}")
|
|||
|
|
else:
|
|||
|
|
print(f"PR #{pr_status['number']}: {pr_status['title']}")
|
|||
|
|
print(f" State: {pr_status['state']}")
|
|||
|
|
print(f" Merged: {pr_status['merged']}")
|
|||
|
|
|
|||
|
|
elif args.generate_script:
|
|||
|
|
# Generate setup script
|
|||
|
|
script = admin.generate_setup_script()
|
|||
|
|
print(script)
|
|||
|
|
|
|||
|
|
elif args.report:
|
|||
|
|
# Generate report
|
|||
|
|
report = admin.generate_report()
|
|||
|
|
print(report)
|
|||
|
|
|
|||
|
|
else:
|
|||
|
|
# Default: show help
|
|||
|
|
parser.print_help()
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
main()
|