Compare commits
1 Commits
fix/issue-
...
fix/1480-d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
25b26c536d |
@@ -1,277 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
backlog_triage.py — Triage open issues in a Gitea repository.
|
||||
|
||||
Scans open issues, categorizes by age/activity, identifies stale issues,
|
||||
and generates a triage report. Optionally auto-closes stale issues.
|
||||
|
||||
Usage:
|
||||
python3 bin/backlog_triage.py --repo Timmy_Foundation/the-nexus
|
||||
python3 bin/backlog_triage.py --repo Timmy_Foundation/the-nexus --stale-days 60 --report out.json
|
||||
python3 bin/backlog_triage.py --repo Timmy_Foundation/the-nexus --auto-close-stale --dry-run
|
||||
"""
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Configuration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
DEFAULT_STALE_DAYS = 30
|
||||
DEFAULT_IDLE_DAYS = 60
|
||||
BATCH_SIZE = 50 # Gitea API page size
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# API helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _api(token: str, method: str, path: str, data: dict = None) -> dict:
|
||||
"""Make a Gitea API call."""
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
headers = {"Authorization": f"token {token}", "Content-Type": "application/json"}
|
||||
body = json.dumps(data).encode() if data else None
|
||||
req = urllib.request.Request(url, data=body, headers=headers, method=method)
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read()) if resp.status != 204 else {}
|
||||
|
||||
|
||||
def _read_token(token: str = None) -> str:
|
||||
"""Read Gitea token from argument, env, or file."""
|
||||
if token:
|
||||
return token
|
||||
token = os.environ.get("GITEA_TOKEN", "")
|
||||
if token:
|
||||
return token
|
||||
token_path = Path.home() / ".config" / "gitea" / "token"
|
||||
if token_path.exists():
|
||||
return token_path.read_text().strip()
|
||||
raise ValueError("No Gitea token found. Pass --token, set GITEA_TOKEN, or create ~/.config/gitea/token")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Issue data model
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def fetch_all_issues(token: str, repo: str, state: str = "open") -> list[dict]:
|
||||
"""Fetch all open issues with pagination."""
|
||||
issues = []
|
||||
page = 1
|
||||
while True:
|
||||
data = _api(token, "GET", f"/repos/{repo}/issues?state={state}&limit={BATCH_SIZE}&page={page}")
|
||||
if not data:
|
||||
break
|
||||
issues.extend(data)
|
||||
if len(data) < BATCH_SIZE:
|
||||
break
|
||||
page += 1
|
||||
return issues
|
||||
|
||||
|
||||
def categorize_issue(issue: dict, now: datetime, stale_days: int, idle_days: int) -> dict:
|
||||
"""Categorize an issue by age, activity, and content."""
|
||||
created = datetime.fromisoformat(issue["created_at"].replace("Z", "+00:00"))
|
||||
updated = datetime.fromisoformat(issue["updated_at"].replace("Z", "+00:00"))
|
||||
age_days = (now - created).days
|
||||
idle_days_actual = (now - updated).days
|
||||
|
||||
labels = [l["name"] for l in issue.get("labels", [])]
|
||||
assignees = [a["login"] for a in issue.get("assignees", [])]
|
||||
comments = issue.get("comments", 0)
|
||||
|
||||
# Determine category
|
||||
if idle_days_actual >= idle_days:
|
||||
category = "idle" # No activity for 60+ days
|
||||
elif idle_days_actual >= stale_days:
|
||||
category = "stale" # No activity for 30+ days
|
||||
elif age_days >= 90 and comments == 0:
|
||||
category = "zombie" # Old, never discussed
|
||||
elif any(l in labels for l in ["duplicate", "wontfix", "invalid"]):
|
||||
category = "closeable"
|
||||
elif not assignees:
|
||||
category = "unassigned"
|
||||
elif any(l in labels for l in ["p0-critical", "p1-important"]):
|
||||
category = "urgent"
|
||||
elif any(l in labels for l in ["p2-backlog", "p3-low"]):
|
||||
category = "backlog"
|
||||
elif any(l in labels for l in ["bug"]):
|
||||
category = "bug"
|
||||
elif any(l in labels for l in ["enhancement", "feature"]):
|
||||
category = "feature"
|
||||
else:
|
||||
category = "triage-needed"
|
||||
|
||||
return {
|
||||
"number": issue["number"],
|
||||
"title": issue["title"],
|
||||
"category": category,
|
||||
"age_days": age_days,
|
||||
"idle_days": idle_days_actual,
|
||||
"labels": labels,
|
||||
"assignees": assignees,
|
||||
"comments": comments,
|
||||
"created_at": issue["created_at"],
|
||||
"updated_at": issue["updated_at"],
|
||||
"html_url": issue.get("html_url", ""),
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Triage report
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def generate_report(categorized: list[dict]) -> dict:
|
||||
"""Generate a triage summary report."""
|
||||
by_category = {}
|
||||
for issue in categorized:
|
||||
cat = issue["category"]
|
||||
by_category.setdefault(cat, []).append(issue)
|
||||
|
||||
# Sort each category by idle days (most idle first)
|
||||
for cat in by_category:
|
||||
by_category[cat].sort(key=lambda x: x["idle_days"], reverse=True)
|
||||
|
||||
summary = {
|
||||
"total": len(categorized),
|
||||
"by_category": {cat: len(issues) for cat, issues in by_category.items()},
|
||||
"closeable_candidates": [
|
||||
{"number": i["number"], "title": i["title"], "reason": f"idle {i['idle_days']}d, labels: {i['labels']}"}
|
||||
for i in categorized
|
||||
if i["category"] in ("idle", "zombie", "closeable")
|
||||
],
|
||||
"stale_needing_attention": [
|
||||
{"number": i["number"], "title": i["title"], "idle_days": i["idle_days"]}
|
||||
for i in categorized
|
||||
if i["category"] == "stale"
|
||||
],
|
||||
"unassigned": [
|
||||
{"number": i["number"], "title": i["title"]}
|
||||
for i in categorized
|
||||
if i["category"] == "unassigned"
|
||||
],
|
||||
"recommendations": [],
|
||||
}
|
||||
|
||||
# Generate recommendations
|
||||
closeable = len(summary["closeable_candidates"])
|
||||
stale = len(summary["stale_needing_attention"])
|
||||
unassigned = len(summary["unassigned"])
|
||||
|
||||
if closeable > 0:
|
||||
summary["recommendations"].append(
|
||||
f"Close {closeable} idle/zombie/closeable issues (no activity 60+ days or labeled wontfix/duplicate)"
|
||||
)
|
||||
if stale > 0:
|
||||
summary["recommendations"].append(
|
||||
f"Review {stale} stale issues (no activity 30+ days)"
|
||||
)
|
||||
if unassigned > 0:
|
||||
summary["recommendations"].append(
|
||||
f"Assign owners to {unassigned} unassigned issues or close if no longer relevant"
|
||||
)
|
||||
|
||||
summary["issues"] = categorized
|
||||
return summary
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Auto-close (optional)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def auto_close_stale(token: str, repo: str, issues: list[dict], dry_run: bool = True) -> list[int]:
|
||||
"""Close idle/zombie issues that are clearly stale."""
|
||||
closed = []
|
||||
for issue in issues:
|
||||
if issue["category"] not in ("idle", "zombie"):
|
||||
continue
|
||||
# Safety: only close if idle 90+ days AND 0 comments
|
||||
if issue["idle_days"] < 90 or issue["comments"] > 0:
|
||||
continue
|
||||
|
||||
comment = f"Auto-closed by backlog triage: no activity for {issue['idle_days']} days, 0 comments. Reopen if still relevant."
|
||||
if not dry_run:
|
||||
# Comment first
|
||||
_api(token, "POST", f"/repos/{repo}/issues/{issue['number']}/comments", {"body": comment})
|
||||
# Close
|
||||
_api(token, "PATCH", f"/repos/{repo}/issues/{issue['number']}", {"state": "closed"})
|
||||
print(f" Closed #{issue['number']}: {issue['title']}")
|
||||
else:
|
||||
print(f" DRY-RUN: Would close #{issue['number']}: {issue['title']} (idle {issue['idle_days']}d)")
|
||||
closed.append(issue["number"])
|
||||
|
||||
return closed
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Backlog triage tool for Gitea repositories")
|
||||
parser.add_argument("--repo", required=True, help="Repository (e.g. Timmy_Foundation/the-nexus)")
|
||||
parser.add_argument("--token", default=None, help="Gitea API token")
|
||||
parser.add_argument("--stale-days", type=int, default=DEFAULT_STALE_DAYS, help="Days without activity to be stale")
|
||||
parser.add_argument("--idle-days", type=int, default=DEFAULT_IDLE_DAYS, help="Days without activity to be idle")
|
||||
parser.add_argument("--report", default=None, help="Output report JSON path")
|
||||
parser.add_argument("--auto-close-stale", action="store_true", help="Auto-close idle/zombie issues")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Don't actually close issues")
|
||||
parser.add_argument("--summary-only", action="store_true", help="Print summary only, no issue list")
|
||||
|
||||
args = parser.parse_args()
|
||||
token = _read_token(args.token)
|
||||
|
||||
print(f"Fetching issues from {args.repo}...")
|
||||
issues = fetch_all_issues(token, args.repo)
|
||||
print(f"Found {len(issues)} open issues")
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
categorized = [categorize_issue(i, now, args.stale_days, args.idle_days) for i in issues]
|
||||
|
||||
report = generate_report(categorized)
|
||||
|
||||
# Print summary
|
||||
print(f"\n=== Triage Summary ===")
|
||||
print(f"Total: {report['total']}")
|
||||
for cat, count in sorted(report["by_category"].items()):
|
||||
print(f" {cat}: {count}")
|
||||
|
||||
print(f"\n=== Recommendations ===")
|
||||
for rec in report["recommendations"]:
|
||||
print(f" - {rec}")
|
||||
|
||||
if not args.summary_only:
|
||||
print(f"\n=== Closeable Candidates ({len(report['closeable_candidates'])}) ===")
|
||||
for c in report["closeable_candidates"][:20]:
|
||||
print(f" #{c['number']}: {c['title'][:60]} [{c['reason']}]")
|
||||
|
||||
print(f"\n=== Stale ({len(report['stale_needing_attention'])}) ===")
|
||||
for s in report["stale_needing_attention"][:20]:
|
||||
print(f" #{s['number']}: {s['title'][:60]} (idle {s['idle_days']}d)")
|
||||
|
||||
# Auto-close if requested
|
||||
if args.auto_close_stale:
|
||||
print(f"\n=== Auto-close {'(DRY RUN)' if args.dry_run else '(LIVE)'} ===")
|
||||
closed = auto_close_stale(token, args.repo, categorized, dry_run=args.dry_run)
|
||||
print(f"{'Would close' if args.dry_run else 'Closed'} {len(closed)} issues")
|
||||
|
||||
# Write report
|
||||
if args.report:
|
||||
with open(args.report, "w") as f:
|
||||
json.dump(report, f, indent=2)
|
||||
print(f"\nReport written to {args.report}")
|
||||
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
50
docs/DUPLICATE_PR_PREVENTION.md
Normal file
50
docs/DUPLICATE_PR_PREVENTION.md
Normal file
@@ -0,0 +1,50 @@
|
||||
# Duplicate PR Prevention
|
||||
|
||||
## The Problem
|
||||
|
||||
Issue #1128 documented a cleanup of duplicate PRs. Agents then created
|
||||
4+ duplicate PRs *for issue #1128 itself*. The irony was not lost on anyone.
|
||||
|
||||
See: #1449, #1460, #1474, #1480.
|
||||
|
||||
## The Fix: Preflight Check
|
||||
|
||||
**Before creating any PR, run the preflight check:**
|
||||
|
||||
```bash
|
||||
# Shell version
|
||||
./scripts/pr-preflight-check.sh <issue_number>
|
||||
|
||||
# Python version
|
||||
python3 scripts/pr_preflight_check.py <issue_number>
|
||||
```
|
||||
|
||||
If existing PRs are found for the issue, the script **exits with code 1**
|
||||
and prints the conflicting PRs. DO NOT proceed to create a new PR.
|
||||
|
||||
## Agent Workflow
|
||||
|
||||
```
|
||||
1. Read issue
|
||||
2. Clone repo
|
||||
3. Implement fix
|
||||
4. Commit
|
||||
5. >>> RUN pr_preflight_check.py <issue_number> <<<
|
||||
6. If exit 0: safe to push and create PR
|
||||
7. If exit 1: STOP — review existing PRs first
|
||||
8. Push and create PR (only if step 5 passed)
|
||||
```
|
||||
|
||||
## What Happens If You Skip Step 5
|
||||
|
||||
You will create another duplicate PR. The cleanup script will find it.
|
||||
Someone will close it. You will have wasted compute and created noise.
|
||||
|
||||
## Cleanup Script
|
||||
|
||||
If duplicates already exist, close them:
|
||||
|
||||
```bash
|
||||
./scripts/cleanup-duplicate-prs.sh --dry-run # preview
|
||||
./scripts/cleanup-duplicate-prs.sh --close # actually close
|
||||
```
|
||||
70
scripts/pr-preflight-check.sh
Executable file
70
scripts/pr-preflight-check.sh
Executable file
@@ -0,0 +1,70 @@
|
||||
#!/usr/bin/env bash
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
# pr-preflight-check.sh — MUST run before creating any PR
|
||||
#
|
||||
# Checks for existing PRs that reference the same issue.
|
||||
# Refuses to proceed if duplicates exist.
|
||||
#
|
||||
# Usage:
|
||||
# ./scripts/pr-preflight-check.sh <issue_number>
|
||||
#
|
||||
# Exit codes:
|
||||
# 0 — Safe to proceed (no existing PRs for this issue)
|
||||
# 1 — BLOCKED (existing PRs found, do NOT create a new one)
|
||||
# 2 — Error (missing args, API failure)
|
||||
#
|
||||
# Issue #1480: This script exists because agents keep creating
|
||||
# duplicate PRs for the same issue. Running this before `git push`
|
||||
# or `curl ... /pulls` prevents the problem.
|
||||
# ═══════════════════════════════════════════════════════════════
|
||||
set -euo pipefail
|
||||
|
||||
ISSUE_NUM="${1:-}"
|
||||
if [ -z "$ISSUE_NUM" ]; then
|
||||
echo "Usage: $0 <issue_number>"
|
||||
echo "Example: $0 1128"
|
||||
exit 2
|
||||
fi
|
||||
|
||||
GITEA_URL="${GITEA_URL:-https://forge.alexanderwhitestone.com}"
|
||||
GITEA_TOKEN="${GITEA_TOKEN:?Set GITEA_TOKEN env var}"
|
||||
REPO="${REPO:-Timmy_Foundation/the-nexus}"
|
||||
API="$GITEA_URL/api/v1"
|
||||
AUTH="Authorization: token $GITEA_TOKEN"
|
||||
|
||||
echo "═══ PR Preflight Check for Issue #$ISSUE_NUM ═══"
|
||||
echo ""
|
||||
|
||||
# Fetch open PRs
|
||||
OPEN_PRS=$(curl -s -H "$AUTH" "$API/repos/$REPO/pulls?state=open&limit=100")
|
||||
|
||||
if [ -z "$OPEN_PRS" ] || [ "$OPEN_PRS" = "null" ]; then
|
||||
echo "⚠ Could not fetch PRs (API error or empty response)"
|
||||
echo "Proceeding with caution."
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# Find PRs referencing this issue
|
||||
MATCHES=$(echo "$OPEN_PRS" | jq -r ".[] | select(.title | test(\"#$ISSUE_NUM\"; \"i\") or .body // \"\" | test(\"#$ISSUE_NUM\"; \"i\")) | \" PR #\\(.number): \\(.title) [\\(.head.ref)] (\\(.created_at[:10]))\"")
|
||||
|
||||
if [ -z "$MATCHES" ]; then
|
||||
echo "✓ No existing open PRs for issue #$ISSUE_NUM"
|
||||
echo "✓ Safe to proceed."
|
||||
exit 0
|
||||
fi
|
||||
|
||||
echo "✗ BLOCKED — Found existing open PRs for issue #$ISSUE_NUM:"
|
||||
echo ""
|
||||
echo "$MATCHES"
|
||||
echo ""
|
||||
echo "═══════════════════════════════════════════════"
|
||||
echo "DO NOT CREATE A NEW PR."
|
||||
echo ""
|
||||
echo "Options:"
|
||||
echo " 1. Review and merge an existing PR"
|
||||
echo " 2. Close duplicates first: ./scripts/cleanup-duplicate-prs.sh --close"
|
||||
echo " 3. Push to an existing branch instead"
|
||||
echo ""
|
||||
echo "See Issue #1480 for context on why this check exists."
|
||||
echo "═══════════════════════════════════════════════"
|
||||
exit 1
|
||||
92
scripts/pr_preflight_check.py
Normal file
92
scripts/pr_preflight_check.py
Normal file
@@ -0,0 +1,92 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
pr_preflight_check.py — Prevent duplicate PR creation.
|
||||
|
||||
Call before creating any PR:
|
||||
python3 scripts/pr_preflight_check.py 1128
|
||||
|
||||
Returns exit code 0 if safe, 1 if blocked.
|
||||
Designed for agent workflows — agents MUST call this before `curl ... /pulls`.
|
||||
|
||||
Issue #1480: The duplicate PR problem.
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import urllib.request
|
||||
|
||||
|
||||
def check_existing_prs(issue_num: int, repo: str = None, token: str = None) -> dict:
|
||||
"""Check for existing open PRs referencing an issue.
|
||||
|
||||
Returns dict with:
|
||||
safe (bool): True if no duplicates found
|
||||
matches (list): List of PR dicts that reference the issue
|
||||
message (str): Human-readable status
|
||||
"""
|
||||
gitea_url = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
token = token or os.environ.get("GITEA_TOKEN", "")
|
||||
repo = repo or os.environ.get("REPO", "Timmy_Foundation/the-nexus")
|
||||
|
||||
if not token:
|
||||
token_path = os.path.expanduser("~/.config/gitea/token")
|
||||
if os.path.exists(token_path):
|
||||
token = open(token_path).read().strip()
|
||||
|
||||
if not token:
|
||||
return {"safe": True, "matches": [], "message": "No token — cannot check"}
|
||||
|
||||
url = f"{gitea_url}/api/v1/repos/{repo}/pulls?state=open&limit=100"
|
||||
req = urllib.request.Request(url, headers={"Authorization": f"token {token}"})
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=10) as resp:
|
||||
prs = json.loads(resp.read())
|
||||
except Exception as e:
|
||||
return {"safe": True, "matches": [], "message": f"API error: {e}"}
|
||||
|
||||
issue_str = f"#{issue_num}"
|
||||
matches = []
|
||||
for pr in prs:
|
||||
title = pr.get("title", "")
|
||||
body = pr.get("body") or ""
|
||||
if issue_str in title or issue_str in body:
|
||||
matches.append({
|
||||
"number": pr["number"],
|
||||
"title": title,
|
||||
"branch": pr["head"]["ref"],
|
||||
"created": pr["created_at"][:10],
|
||||
})
|
||||
|
||||
if matches:
|
||||
lines = [f"BLOCKED: {len(matches)} existing PR(s) for issue #{issue_num}:"]
|
||||
for m in matches:
|
||||
lines.append(f" PR #{m['number']}: {m['title']} [{m['branch']}] ({m['created']})")
|
||||
lines.append("")
|
||||
lines.append("DO NOT CREATE A NEW PR. Review existing ones first.")
|
||||
return {"safe": False, "matches": matches, "message": "\n".join(lines)}
|
||||
|
||||
return {"safe": True, "matches": [], "message": f"✓ Safe: no open PRs for #{issue_num}"}
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: pr_preflight_check.py <issue_number> [repo]")
|
||||
print("Example: pr_preflight_check.py 1128")
|
||||
print(" pr_preflight_check.py 1339 Timmy_Foundation/the-nexus")
|
||||
sys.exit(2)
|
||||
|
||||
issue_num = int(sys.argv[1])
|
||||
repo = sys.argv[2] if len(sys.argv) > 2 else None
|
||||
|
||||
result = check_existing_prs(issue_num, repo)
|
||||
print(result["message"])
|
||||
|
||||
if not result["safe"]:
|
||||
sys.exit(1)
|
||||
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,115 +0,0 @@
|
||||
"""Tests for backlog_triage — issue categorization and report generation."""
|
||||
import json
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
import sys
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
from bin.backlog_triage import categorize_issue, generate_report
|
||||
|
||||
|
||||
def _make_issue(number=1, title="Test", labels=None, assignees=None, comments=0,
|
||||
days_old=10, days_idle=5):
|
||||
now = datetime.now(timezone.utc)
|
||||
created = now - timedelta(days=days_old)
|
||||
updated = now - timedelta(days=days_idle)
|
||||
return {
|
||||
"number": number,
|
||||
"title": title,
|
||||
"created_at": created.isoformat().replace("+00:00", "Z"),
|
||||
"updated_at": updated.isoformat().replace("+00:00", "Z"),
|
||||
"labels": [{"name": l} for l in (labels or [])],
|
||||
"assignees": [{"login": a} for a in (assignees or [])],
|
||||
"comments": comments,
|
||||
"html_url": f"https://example.com/{number}",
|
||||
}
|
||||
|
||||
|
||||
class TestCategorizeIssue:
|
||||
def test_idle_issue(self):
|
||||
issue = _make_issue(days_idle=70)
|
||||
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
|
||||
assert result["category"] == "idle"
|
||||
|
||||
def test_stale_issue(self):
|
||||
issue = _make_issue(days_idle=45)
|
||||
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
|
||||
assert result["category"] == "stale"
|
||||
|
||||
def test_zombie_issue(self):
|
||||
issue = _make_issue(days_old=100, days_idle=10, comments=0)
|
||||
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
|
||||
assert result["category"] == "zombie"
|
||||
|
||||
def test_unassigned_issue(self):
|
||||
issue = _make_issue(assignees=[], days_old=5, days_idle=1)
|
||||
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
|
||||
assert result["category"] == "unassigned"
|
||||
|
||||
def test_assigned_issue(self):
|
||||
issue = _make_issue(assignees=["alice"], days_old=5, days_idle=1)
|
||||
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
|
||||
assert result["category"] == "triage-needed"
|
||||
|
||||
def test_closeable_duplicate(self):
|
||||
issue = _make_issue(labels=["duplicate"], days_old=5, days_idle=1)
|
||||
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
|
||||
assert result["category"] == "closeable"
|
||||
|
||||
def test_urgent_issue(self):
|
||||
issue = _make_issue(labels=["p0-critical"], assignees=["bob"])
|
||||
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
|
||||
assert result["category"] == "urgent"
|
||||
|
||||
def test_backlog_issue(self):
|
||||
issue = _make_issue(labels=["p2-backlog"], assignees=["bob"])
|
||||
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
|
||||
assert result["category"] == "backlog"
|
||||
|
||||
def test_bug_category(self):
|
||||
issue = _make_issue(labels=["bug"], assignees=["bob"])
|
||||
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
|
||||
assert result["category"] == "bug"
|
||||
|
||||
def test_age_tracking(self):
|
||||
issue = _make_issue(days_old=42, days_idle=7)
|
||||
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
|
||||
assert result["age_days"] >= 41
|
||||
assert result["idle_days"] >= 6
|
||||
|
||||
|
||||
class TestGenerateReport:
|
||||
def test_empty_report(self):
|
||||
report = generate_report([])
|
||||
assert report["total"] == 0
|
||||
assert report["by_category"] == {}
|
||||
|
||||
def test_report_categorization(self):
|
||||
issues = [
|
||||
_make_issue(1, "idle", days_idle=70),
|
||||
_make_issue(2, "stale", days_idle=40),
|
||||
_make_issue(3, "recent", assignees=["alice"]),
|
||||
]
|
||||
categorized = [categorize_issue(i, datetime.now(timezone.utc), 30, 60) for i in issues]
|
||||
report = generate_report(categorized)
|
||||
assert report["total"] == 3
|
||||
assert "idle" in report["by_category"]
|
||||
assert "stale" in report["by_category"]
|
||||
|
||||
def test_closeable_candidates(self):
|
||||
issues = [
|
||||
_make_issue(1, "old zombie", days_old=100, days_idle=100, comments=0),
|
||||
_make_issue(2, "recent", assignees=["alice"]),
|
||||
]
|
||||
categorized = [categorize_issue(i, datetime.now(timezone.utc), 30, 60) for i in issues]
|
||||
report = generate_report(categorized)
|
||||
assert len(report["closeable_candidates"]) >= 1
|
||||
assert report["closeable_candidates"][0]["number"] == 1
|
||||
|
||||
def test_recommendations_generated(self):
|
||||
issues = [_make_issue(1, days_idle=70)]
|
||||
categorized = [categorize_issue(i, datetime.now(timezone.utc), 30, 60) for i in issues]
|
||||
report = generate_report(categorized)
|
||||
assert len(report["recommendations"]) > 0
|
||||
Reference in New Issue
Block a user