Compare commits

..

1 Commits

Author SHA1 Message Date
Timmy
25b26c536d fix(#1480): Add duplicate PR prevention mechanism
Some checks failed
CI / test (pull_request) Failing after 51s
CI / validate (pull_request) Failing after 40s
Review Approval Gate / verify-review (pull_request) Failing after 6s
Agents keep creating duplicate PRs for the same issue (#1128
had 4+ duplicate PRs). This adds a mandatory preflight check.

Files:
  - scripts/pr-preflight-check.sh: bash preflight (exit 1 if duplicates)
  - scripts/pr_preflight_check.py: Python preflight (for agent workflows)
  - docs/DUPLICATE_PR_PREVENTION.md: usage documentation

Closes #1480, #1460, #1474
2026-04-14 18:59:02 -04:00
5 changed files with 212 additions and 392 deletions

View File

@@ -1,277 +0,0 @@
#!/usr/bin/env python3
"""
backlog_triage.py — Triage open issues in a Gitea repository.
Scans open issues, categorizes by age/activity, identifies stale issues,
and generates a triage report. Optionally auto-closes stale issues.
Usage:
python3 bin/backlog_triage.py --repo Timmy_Foundation/the-nexus
python3 bin/backlog_triage.py --repo Timmy_Foundation/the-nexus --stale-days 60 --report out.json
python3 bin/backlog_triage.py --repo Timmy_Foundation/the-nexus --auto-close-stale --dry-run
"""
import argparse
import json
import os
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
import urllib.request
import urllib.error
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
DEFAULT_STALE_DAYS = 30
DEFAULT_IDLE_DAYS = 60
BATCH_SIZE = 50 # Gitea API page size
# ---------------------------------------------------------------------------
# API helpers
# ---------------------------------------------------------------------------
def _api(token: str, method: str, path: str, data: dict = None) -> dict:
"""Make a Gitea API call."""
url = f"{GITEA_URL}/api/v1{path}"
headers = {"Authorization": f"token {token}", "Content-Type": "application/json"}
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=body, headers=headers, method=method)
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read()) if resp.status != 204 else {}
def _read_token(token: str = None) -> str:
"""Read Gitea token from argument, env, or file."""
if token:
return token
token = os.environ.get("GITEA_TOKEN", "")
if token:
return token
token_path = Path.home() / ".config" / "gitea" / "token"
if token_path.exists():
return token_path.read_text().strip()
raise ValueError("No Gitea token found. Pass --token, set GITEA_TOKEN, or create ~/.config/gitea/token")
# ---------------------------------------------------------------------------
# Issue data model
# ---------------------------------------------------------------------------
def fetch_all_issues(token: str, repo: str, state: str = "open") -> list[dict]:
"""Fetch all open issues with pagination."""
issues = []
page = 1
while True:
data = _api(token, "GET", f"/repos/{repo}/issues?state={state}&limit={BATCH_SIZE}&page={page}")
if not data:
break
issues.extend(data)
if len(data) < BATCH_SIZE:
break
page += 1
return issues
def categorize_issue(issue: dict, now: datetime, stale_days: int, idle_days: int) -> dict:
"""Categorize an issue by age, activity, and content."""
created = datetime.fromisoformat(issue["created_at"].replace("Z", "+00:00"))
updated = datetime.fromisoformat(issue["updated_at"].replace("Z", "+00:00"))
age_days = (now - created).days
idle_days_actual = (now - updated).days
labels = [l["name"] for l in issue.get("labels", [])]
assignees = [a["login"] for a in issue.get("assignees", [])]
comments = issue.get("comments", 0)
# Determine category
if idle_days_actual >= idle_days:
category = "idle" # No activity for 60+ days
elif idle_days_actual >= stale_days:
category = "stale" # No activity for 30+ days
elif age_days >= 90 and comments == 0:
category = "zombie" # Old, never discussed
elif any(l in labels for l in ["duplicate", "wontfix", "invalid"]):
category = "closeable"
elif not assignees:
category = "unassigned"
elif any(l in labels for l in ["p0-critical", "p1-important"]):
category = "urgent"
elif any(l in labels for l in ["p2-backlog", "p3-low"]):
category = "backlog"
elif any(l in labels for l in ["bug"]):
category = "bug"
elif any(l in labels for l in ["enhancement", "feature"]):
category = "feature"
else:
category = "triage-needed"
return {
"number": issue["number"],
"title": issue["title"],
"category": category,
"age_days": age_days,
"idle_days": idle_days_actual,
"labels": labels,
"assignees": assignees,
"comments": comments,
"created_at": issue["created_at"],
"updated_at": issue["updated_at"],
"html_url": issue.get("html_url", ""),
}
# ---------------------------------------------------------------------------
# Triage report
# ---------------------------------------------------------------------------
def generate_report(categorized: list[dict]) -> dict:
"""Generate a triage summary report."""
by_category = {}
for issue in categorized:
cat = issue["category"]
by_category.setdefault(cat, []).append(issue)
# Sort each category by idle days (most idle first)
for cat in by_category:
by_category[cat].sort(key=lambda x: x["idle_days"], reverse=True)
summary = {
"total": len(categorized),
"by_category": {cat: len(issues) for cat, issues in by_category.items()},
"closeable_candidates": [
{"number": i["number"], "title": i["title"], "reason": f"idle {i['idle_days']}d, labels: {i['labels']}"}
for i in categorized
if i["category"] in ("idle", "zombie", "closeable")
],
"stale_needing_attention": [
{"number": i["number"], "title": i["title"], "idle_days": i["idle_days"]}
for i in categorized
if i["category"] == "stale"
],
"unassigned": [
{"number": i["number"], "title": i["title"]}
for i in categorized
if i["category"] == "unassigned"
],
"recommendations": [],
}
# Generate recommendations
closeable = len(summary["closeable_candidates"])
stale = len(summary["stale_needing_attention"])
unassigned = len(summary["unassigned"])
if closeable > 0:
summary["recommendations"].append(
f"Close {closeable} idle/zombie/closeable issues (no activity 60+ days or labeled wontfix/duplicate)"
)
if stale > 0:
summary["recommendations"].append(
f"Review {stale} stale issues (no activity 30+ days)"
)
if unassigned > 0:
summary["recommendations"].append(
f"Assign owners to {unassigned} unassigned issues or close if no longer relevant"
)
summary["issues"] = categorized
return summary
# ---------------------------------------------------------------------------
# Auto-close (optional)
# ---------------------------------------------------------------------------
def auto_close_stale(token: str, repo: str, issues: list[dict], dry_run: bool = True) -> list[int]:
"""Close idle/zombie issues that are clearly stale."""
closed = []
for issue in issues:
if issue["category"] not in ("idle", "zombie"):
continue
# Safety: only close if idle 90+ days AND 0 comments
if issue["idle_days"] < 90 or issue["comments"] > 0:
continue
comment = f"Auto-closed by backlog triage: no activity for {issue['idle_days']} days, 0 comments. Reopen if still relevant."
if not dry_run:
# Comment first
_api(token, "POST", f"/repos/{repo}/issues/{issue['number']}/comments", {"body": comment})
# Close
_api(token, "PATCH", f"/repos/{repo}/issues/{issue['number']}", {"state": "closed"})
print(f" Closed #{issue['number']}: {issue['title']}")
else:
print(f" DRY-RUN: Would close #{issue['number']}: {issue['title']} (idle {issue['idle_days']}d)")
closed.append(issue["number"])
return closed
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Backlog triage tool for Gitea repositories")
parser.add_argument("--repo", required=True, help="Repository (e.g. Timmy_Foundation/the-nexus)")
parser.add_argument("--token", default=None, help="Gitea API token")
parser.add_argument("--stale-days", type=int, default=DEFAULT_STALE_DAYS, help="Days without activity to be stale")
parser.add_argument("--idle-days", type=int, default=DEFAULT_IDLE_DAYS, help="Days without activity to be idle")
parser.add_argument("--report", default=None, help="Output report JSON path")
parser.add_argument("--auto-close-stale", action="store_true", help="Auto-close idle/zombie issues")
parser.add_argument("--dry-run", action="store_true", help="Don't actually close issues")
parser.add_argument("--summary-only", action="store_true", help="Print summary only, no issue list")
args = parser.parse_args()
token = _read_token(args.token)
print(f"Fetching issues from {args.repo}...")
issues = fetch_all_issues(token, args.repo)
print(f"Found {len(issues)} open issues")
now = datetime.now(timezone.utc)
categorized = [categorize_issue(i, now, args.stale_days, args.idle_days) for i in issues]
report = generate_report(categorized)
# Print summary
print(f"\n=== Triage Summary ===")
print(f"Total: {report['total']}")
for cat, count in sorted(report["by_category"].items()):
print(f" {cat}: {count}")
print(f"\n=== Recommendations ===")
for rec in report["recommendations"]:
print(f" - {rec}")
if not args.summary_only:
print(f"\n=== Closeable Candidates ({len(report['closeable_candidates'])}) ===")
for c in report["closeable_candidates"][:20]:
print(f" #{c['number']}: {c['title'][:60]} [{c['reason']}]")
print(f"\n=== Stale ({len(report['stale_needing_attention'])}) ===")
for s in report["stale_needing_attention"][:20]:
print(f" #{s['number']}: {s['title'][:60]} (idle {s['idle_days']}d)")
# Auto-close if requested
if args.auto_close_stale:
print(f"\n=== Auto-close {'(DRY RUN)' if args.dry_run else '(LIVE)'} ===")
closed = auto_close_stale(token, args.repo, categorized, dry_run=args.dry_run)
print(f"{'Would close' if args.dry_run else 'Closed'} {len(closed)} issues")
# Write report
if args.report:
with open(args.report, "w") as f:
json.dump(report, f, indent=2)
print(f"\nReport written to {args.report}")
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,50 @@
# Duplicate PR Prevention
## The Problem
Issue #1128 documented a cleanup of duplicate PRs. Agents then created
4+ duplicate PRs *for issue #1128 itself*. The irony was not lost on anyone.
See: #1449, #1460, #1474, #1480.
## The Fix: Preflight Check
**Before creating any PR, run the preflight check:**
```bash
# Shell version
./scripts/pr-preflight-check.sh <issue_number>
# Python version
python3 scripts/pr_preflight_check.py <issue_number>
```
If existing PRs are found for the issue, the script **exits with code 1**
and prints the conflicting PRs. DO NOT proceed to create a new PR.
## Agent Workflow
```
1. Read issue
2. Clone repo
3. Implement fix
4. Commit
5. >>> RUN pr_preflight_check.py <issue_number> <<<
6. If exit 0: safe to push and create PR
7. If exit 1: STOP — review existing PRs first
8. Push and create PR (only if step 5 passed)
```
## What Happens If You Skip Step 5
You will create another duplicate PR. The cleanup script will find it.
Someone will close it. You will have wasted compute and created noise.
## Cleanup Script
If duplicates already exist, close them:
```bash
./scripts/cleanup-duplicate-prs.sh --dry-run # preview
./scripts/cleanup-duplicate-prs.sh --close # actually close
```

70
scripts/pr-preflight-check.sh Executable file
View File

@@ -0,0 +1,70 @@
#!/usr/bin/env bash
# ═══════════════════════════════════════════════════════════════
# pr-preflight-check.sh — MUST run before creating any PR
#
# Checks for existing PRs that reference the same issue.
# Refuses to proceed if duplicates exist.
#
# Usage:
# ./scripts/pr-preflight-check.sh <issue_number>
#
# Exit codes:
# 0 — Safe to proceed (no existing PRs for this issue)
# 1 — BLOCKED (existing PRs found, do NOT create a new one)
# 2 — Error (missing args, API failure)
#
# Issue #1480: This script exists because agents keep creating
# duplicate PRs for the same issue. Running this before `git push`
# or `curl ... /pulls` prevents the problem.
# ═══════════════════════════════════════════════════════════════
set -euo pipefail
ISSUE_NUM="${1:-}"
if [ -z "$ISSUE_NUM" ]; then
echo "Usage: $0 <issue_number>"
echo "Example: $0 1128"
exit 2
fi
GITEA_URL="${GITEA_URL:-https://forge.alexanderwhitestone.com}"
GITEA_TOKEN="${GITEA_TOKEN:?Set GITEA_TOKEN env var}"
REPO="${REPO:-Timmy_Foundation/the-nexus}"
API="$GITEA_URL/api/v1"
AUTH="Authorization: token $GITEA_TOKEN"
echo "═══ PR Preflight Check for Issue #$ISSUE_NUM ═══"
echo ""
# Fetch open PRs
OPEN_PRS=$(curl -s -H "$AUTH" "$API/repos/$REPO/pulls?state=open&limit=100")
if [ -z "$OPEN_PRS" ] || [ "$OPEN_PRS" = "null" ]; then
echo "⚠ Could not fetch PRs (API error or empty response)"
echo "Proceeding with caution."
exit 0
fi
# Find PRs referencing this issue
MATCHES=$(echo "$OPEN_PRS" | jq -r ".[] | select(.title | test(\"#$ISSUE_NUM\"; \"i\") or .body // \"\" | test(\"#$ISSUE_NUM\"; \"i\")) | \" PR #\\(.number): \\(.title) [\\(.head.ref)] (\\(.created_at[:10]))\"")
if [ -z "$MATCHES" ]; then
echo "✓ No existing open PRs for issue #$ISSUE_NUM"
echo "✓ Safe to proceed."
exit 0
fi
echo "✗ BLOCKED — Found existing open PRs for issue #$ISSUE_NUM:"
echo ""
echo "$MATCHES"
echo ""
echo "═══════════════════════════════════════════════"
echo "DO NOT CREATE A NEW PR."
echo ""
echo "Options:"
echo " 1. Review and merge an existing PR"
echo " 2. Close duplicates first: ./scripts/cleanup-duplicate-prs.sh --close"
echo " 3. Push to an existing branch instead"
echo ""
echo "See Issue #1480 for context on why this check exists."
echo "═══════════════════════════════════════════════"
exit 1

View File

@@ -0,0 +1,92 @@
#!/usr/bin/env python3
"""
pr_preflight_check.py — Prevent duplicate PR creation.
Call before creating any PR:
python3 scripts/pr_preflight_check.py 1128
Returns exit code 0 if safe, 1 if blocked.
Designed for agent workflows — agents MUST call this before `curl ... /pulls`.
Issue #1480: The duplicate PR problem.
"""
import json
import os
import sys
import urllib.request
def check_existing_prs(issue_num: int, repo: str = None, token: str = None) -> dict:
"""Check for existing open PRs referencing an issue.
Returns dict with:
safe (bool): True if no duplicates found
matches (list): List of PR dicts that reference the issue
message (str): Human-readable status
"""
gitea_url = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
token = token or os.environ.get("GITEA_TOKEN", "")
repo = repo or os.environ.get("REPO", "Timmy_Foundation/the-nexus")
if not token:
token_path = os.path.expanduser("~/.config/gitea/token")
if os.path.exists(token_path):
token = open(token_path).read().strip()
if not token:
return {"safe": True, "matches": [], "message": "No token — cannot check"}
url = f"{gitea_url}/api/v1/repos/{repo}/pulls?state=open&limit=100"
req = urllib.request.Request(url, headers={"Authorization": f"token {token}"})
try:
with urllib.request.urlopen(req, timeout=10) as resp:
prs = json.loads(resp.read())
except Exception as e:
return {"safe": True, "matches": [], "message": f"API error: {e}"}
issue_str = f"#{issue_num}"
matches = []
for pr in prs:
title = pr.get("title", "")
body = pr.get("body") or ""
if issue_str in title or issue_str in body:
matches.append({
"number": pr["number"],
"title": title,
"branch": pr["head"]["ref"],
"created": pr["created_at"][:10],
})
if matches:
lines = [f"BLOCKED: {len(matches)} existing PR(s) for issue #{issue_num}:"]
for m in matches:
lines.append(f" PR #{m['number']}: {m['title']} [{m['branch']}] ({m['created']})")
lines.append("")
lines.append("DO NOT CREATE A NEW PR. Review existing ones first.")
return {"safe": False, "matches": matches, "message": "\n".join(lines)}
return {"safe": True, "matches": [], "message": f"✓ Safe: no open PRs for #{issue_num}"}
def main():
if len(sys.argv) < 2:
print("Usage: pr_preflight_check.py <issue_number> [repo]")
print("Example: pr_preflight_check.py 1128")
print(" pr_preflight_check.py 1339 Timmy_Foundation/the-nexus")
sys.exit(2)
issue_num = int(sys.argv[1])
repo = sys.argv[2] if len(sys.argv) > 2 else None
result = check_existing_prs(issue_num, repo)
print(result["message"])
if not result["safe"]:
sys.exit(1)
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -1,115 +0,0 @@
"""Tests for backlog_triage — issue categorization and report generation."""
import json
from datetime import datetime, timedelta, timezone
from pathlib import Path
import pytest
import sys
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from bin.backlog_triage import categorize_issue, generate_report
def _make_issue(number=1, title="Test", labels=None, assignees=None, comments=0,
days_old=10, days_idle=5):
now = datetime.now(timezone.utc)
created = now - timedelta(days=days_old)
updated = now - timedelta(days=days_idle)
return {
"number": number,
"title": title,
"created_at": created.isoformat().replace("+00:00", "Z"),
"updated_at": updated.isoformat().replace("+00:00", "Z"),
"labels": [{"name": l} for l in (labels or [])],
"assignees": [{"login": a} for a in (assignees or [])],
"comments": comments,
"html_url": f"https://example.com/{number}",
}
class TestCategorizeIssue:
def test_idle_issue(self):
issue = _make_issue(days_idle=70)
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
assert result["category"] == "idle"
def test_stale_issue(self):
issue = _make_issue(days_idle=45)
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
assert result["category"] == "stale"
def test_zombie_issue(self):
issue = _make_issue(days_old=100, days_idle=10, comments=0)
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
assert result["category"] == "zombie"
def test_unassigned_issue(self):
issue = _make_issue(assignees=[], days_old=5, days_idle=1)
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
assert result["category"] == "unassigned"
def test_assigned_issue(self):
issue = _make_issue(assignees=["alice"], days_old=5, days_idle=1)
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
assert result["category"] == "triage-needed"
def test_closeable_duplicate(self):
issue = _make_issue(labels=["duplicate"], days_old=5, days_idle=1)
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
assert result["category"] == "closeable"
def test_urgent_issue(self):
issue = _make_issue(labels=["p0-critical"], assignees=["bob"])
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
assert result["category"] == "urgent"
def test_backlog_issue(self):
issue = _make_issue(labels=["p2-backlog"], assignees=["bob"])
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
assert result["category"] == "backlog"
def test_bug_category(self):
issue = _make_issue(labels=["bug"], assignees=["bob"])
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
assert result["category"] == "bug"
def test_age_tracking(self):
issue = _make_issue(days_old=42, days_idle=7)
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
assert result["age_days"] >= 41
assert result["idle_days"] >= 6
class TestGenerateReport:
def test_empty_report(self):
report = generate_report([])
assert report["total"] == 0
assert report["by_category"] == {}
def test_report_categorization(self):
issues = [
_make_issue(1, "idle", days_idle=70),
_make_issue(2, "stale", days_idle=40),
_make_issue(3, "recent", assignees=["alice"]),
]
categorized = [categorize_issue(i, datetime.now(timezone.utc), 30, 60) for i in issues]
report = generate_report(categorized)
assert report["total"] == 3
assert "idle" in report["by_category"]
assert "stale" in report["by_category"]
def test_closeable_candidates(self):
issues = [
_make_issue(1, "old zombie", days_old=100, days_idle=100, comments=0),
_make_issue(2, "recent", assignees=["alice"]),
]
categorized = [categorize_issue(i, datetime.now(timezone.utc), 30, 60) for i in issues]
report = generate_report(categorized)
assert len(report["closeable_candidates"]) >= 1
assert report["closeable_candidates"][0]["number"] == 1
def test_recommendations_generated(self):
issues = [_make_issue(1, days_idle=70)]
categorized = [categorize_issue(i, datetime.now(timezone.utc), 30, 60) for i in issues]
report = generate_report(categorized)
assert len(report["recommendations"]) > 0