233 lines
7.1 KiB
Python
Executable File
233 lines
7.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Auto-Reviewer — reviews open PRs, approves clean ones, rejects bad ones.
|
|
|
|
Checks:
|
|
1. Diff size (not too big, not empty)
|
|
2. No merge conflicts
|
|
3. No secrets
|
|
4. References the linked issue
|
|
5. Has meaningful changes (not just whitespace)
|
|
6. Files changed are in expected locations
|
|
|
|
Approves clean PRs via Gitea API.
|
|
Comments on bad PRs with specific feedback.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import urllib.request
|
|
import urllib.error
|
|
import base64
|
|
import subprocess
|
|
from datetime import datetime, timezone
|
|
|
|
GITEA_URL = "https://forge.alexanderwhitestone.com"
|
|
TOKEN_FILE = os.path.expanduser("~/.config/gitea/token")
|
|
STATE_DIR = os.path.expanduser("~/.hermes/mimo-swarm/state")
|
|
LOG_DIR = os.path.expanduser("~/.hermes/mimo-swarm/logs")
|
|
|
|
REPO = "Timmy_Foundation/the-nexus"
|
|
|
|
# Review thresholds
|
|
MAX_DIFF_LINES = 500
|
|
MIN_DIFF_LINES = 1
|
|
|
|
|
|
def load_token():
|
|
with open(TOKEN_FILE) as f:
|
|
return f.read().strip()
|
|
|
|
|
|
def api_get(path, token):
|
|
url = f"{GITEA_URL}/api/v1{path}"
|
|
req = urllib.request.Request(url, headers={
|
|
"Authorization": f"token {token}",
|
|
"Accept": "application/json",
|
|
})
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
return json.loads(resp.read())
|
|
except:
|
|
return None
|
|
|
|
|
|
def api_post(path, token, data):
|
|
url = f"{GITEA_URL}/api/v1{path}"
|
|
body = json.dumps(data).encode()
|
|
req = urllib.request.Request(url, data=body, headers={
|
|
"Authorization": f"token {token}",
|
|
"Content-Type": "application/json",
|
|
}, method="POST")
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
return json.loads(resp.read())
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
|
|
def log(msg):
|
|
ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
print(f"[{ts}] {msg}")
|
|
log_file = os.path.join(LOG_DIR, f"reviewer-{datetime.now().strftime('%Y%m%d')}.log")
|
|
with open(log_file, "a") as f:
|
|
f.write(f"[{ts}] {msg}\n")
|
|
|
|
|
|
def get_pr_diff(repo, pr_num, token):
|
|
"""Get PR diff content."""
|
|
url = f"{GITEA_URL}/api/v1/repos/{repo}/pulls/{pr_num}.diff"
|
|
req = urllib.request.Request(url, headers={"Authorization": f"token {token}"})
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
return resp.read().decode()
|
|
except:
|
|
return ""
|
|
|
|
|
|
def get_pr_files(repo, pr_num, token):
|
|
"""Get list of files changed in PR."""
|
|
files = []
|
|
page = 1
|
|
while True:
|
|
data = api_get(f"/repos/{repo}/pulls/{pr_num}/files?limit=50&page={page}", token)
|
|
if not data:
|
|
break
|
|
files.extend(data)
|
|
if len(data) < 50:
|
|
break
|
|
page += 1
|
|
return files
|
|
|
|
|
|
def get_pr_reviews(repo, pr_num, token):
|
|
"""Get existing reviews on PR."""
|
|
return api_get(f"/repos/{repo}/pulls/{pr_num}/reviews", token) or []
|
|
|
|
|
|
def review_pr(pr, token):
|
|
"""Review a single PR. Returns (approved: bool, comment: str)."""
|
|
pr_num = pr["number"]
|
|
title = pr.get("title", "")
|
|
body = pr.get("body", "") or ""
|
|
head_ref = pr.get("head", {}).get("ref", "")
|
|
|
|
issues = []
|
|
|
|
# 1. Check diff
|
|
diff = get_pr_diff(REPO, pr_num, token)
|
|
diff_lines = len([l for l in diff.split("\n") if l.startswith("+") and not l.startswith("+++")])
|
|
|
|
if diff_lines == 0:
|
|
issues.append("Empty diff — no actual changes")
|
|
elif diff_lines > MAX_DIFF_LINES:
|
|
issues.append(f"Diff too large ({diff_lines} lines) — may be too complex for automated review")
|
|
|
|
# 2. Check for merge conflicts
|
|
if "<<<<<<<<" in diff or "========" in diff.split("@@")[-1] if "@@" in diff else False:
|
|
issues.append("Merge conflict markers detected")
|
|
|
|
# 3. Check for secrets
|
|
secret_patterns = [
|
|
(r'sk-[a-zA-Z0-9]{20,}', "API key"),
|
|
(r'api_key\s*=\s*["\'][a-zA-Z0-9]{10,}', "API key assignment"),
|
|
(r'password\s*=\s*["\'][^\s"\']{8,}', "Hardcoded password"),
|
|
]
|
|
for pattern, name in secret_patterns:
|
|
if re.search(pattern, diff):
|
|
issues.append(f"Potential {name} leaked in diff")
|
|
|
|
# 4. Check issue reference
|
|
if f"#{pr_num}" not in body and "Closes #" not in body and "Fixes #" not in body:
|
|
# Check if the branch name references an issue
|
|
if not re.search(r'issue-\d+', head_ref):
|
|
issues.append("PR does not reference an issue number")
|
|
|
|
# 5. Check files changed
|
|
files = get_pr_files(REPO, pr_num, token)
|
|
if not files:
|
|
issues.append("No files changed")
|
|
|
|
# 6. Check if it's from a mimo worker
|
|
is_mimo = "mimo" in head_ref.lower() or "Automated by mimo" in body
|
|
|
|
# 7. Check for destructive changes
|
|
for f in files:
|
|
if f.get("status") == "removed" and f.get("filename", "").endswith((".js", ".html", ".py")):
|
|
issues.append(f"File deleted: {f['filename']} — verify this is intentional")
|
|
|
|
# Decision
|
|
if issues:
|
|
comment = f"## Auto-Review: CHANGES REQUESTED\n\n"
|
|
comment += f"**Diff:** {diff_lines} lines across {len(files)} files\n\n"
|
|
comment += "**Issues found:**\n"
|
|
for issue in issues:
|
|
comment += f"- {issue}\n"
|
|
comment += "\nPlease address these issues and update the PR."
|
|
return False, comment
|
|
else:
|
|
comment = f"## Auto-Review: APPROVED\n\n"
|
|
comment += f"**Diff:** {diff_lines} lines across {len(files)} files\n"
|
|
comment += f"**Checks passed:** syntax, security, issue reference, diff size\n"
|
|
comment += f"**Source:** {'mimo-v2-pro swarm' if is_mimo else 'manual'}\n"
|
|
return True, comment
|
|
|
|
|
|
def main():
|
|
token = load_token()
|
|
log("=" * 50)
|
|
log("AUTO-REVIEWER — scanning open PRs")
|
|
|
|
# Get open PRs
|
|
prs = api_get(f"/repos/{REPO}/pulls?state=open&limit=20", token)
|
|
if not prs:
|
|
log("No open PRs")
|
|
return
|
|
|
|
approved = 0
|
|
rejected = 0
|
|
|
|
for pr in prs:
|
|
pr_num = pr["number"]
|
|
author = pr["user"]["login"]
|
|
|
|
# Skip PRs by humans (only auto-review mimo PRs)
|
|
head_ref = pr.get("head", {}).get("ref", "")
|
|
body = pr.get("body", "") or ""
|
|
is_mimo = "mimo" in head_ref.lower() or "Automated by mimo" in body
|
|
|
|
if not is_mimo:
|
|
log(f" SKIP #{pr_num} (human PR by {author})")
|
|
continue
|
|
|
|
# Check if already reviewed
|
|
reviews = get_pr_reviews(REPO, pr_num, token)
|
|
already_reviewed = any(r.get("user", {}).get("login") == "Rockachopa" for r in reviews)
|
|
if already_reviewed:
|
|
log(f" SKIP #{pr_num} (already reviewed)")
|
|
continue
|
|
|
|
# Review
|
|
is_approved, comment = review_pr(pr, token)
|
|
|
|
# Post review
|
|
review_event = "APPROVE" if is_approved else "REQUEST_CHANGES"
|
|
result = api_post(f"/repos/{REPO}/pulls/{pr_num}/reviews", token, {
|
|
"event": review_event,
|
|
"body": comment,
|
|
})
|
|
|
|
if is_approved:
|
|
approved += 1
|
|
log(f" APPROVED #{pr_num}: {pr['title'][:50]}")
|
|
else:
|
|
rejected += 1
|
|
log(f" REJECTED #{pr_num}: {pr['title'][:50]}")
|
|
|
|
log(f"Review complete: {approved} approved, {rejected} rejected, {len(prs)} total")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|