Compare commits

..

1 Commits

Author SHA1 Message Date
dc210580e2 feat: add safe stale branch cleanup to duplicate PR manager
Some checks failed
CI / test (pull_request) Failing after 56s
CI / validate (pull_request) Failing after 1m0s
Review Approval Gate / verify-review (pull_request) Successful in 6s
2026-04-15 02:48:59 +00:00
3 changed files with 59 additions and 410 deletions

View File

@@ -1,287 +0,0 @@
#!/usr/bin/env python3
"""
Issue Backlog Manager — Triage, categorize, and manage Gitea issue backlogs.
Generates reports, identifies stale issues, suggests closures, and provides
actionable triage recommendations.
Usage:
python bin/issue_backlog_manager.py timmy-home # Full report
python bin/issue_backlog_manager.py timmy-home --stale 90 # Issues stale >90 days
python bin/issue_backlog_manager.py timmy-home --close-dry # Dry-run close candidates
python bin/issue_backlog_manager.py timmy-home --json # JSON output
"""
import json
import os
import re
import sys
from collections import Counter, defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
try:
import urllib.request
except ImportError:
print("Error: urllib required")
sys.exit(1)
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
GITEA_BASE = os.environ.get("GITEA_API_BASE", "https://forge.alexanderwhitestone.com/api/v1")
TOKEN_PATH = os.environ.get("GITEA_TOKEN_PATH", str(Path.home() / ".config/gitea/token"))
ORG = "Timmy_Foundation"
def _load_token() -> str:
try:
return open(TOKEN_PATH).read().strip()
except FileNotFoundError:
print(f"Token not found at {TOKEN_PATH}", file=sys.stderr)
sys.exit(1)
def api_get(path: str, token: str) -> Any:
req = urllib.request.Request(f"{GITEA_BASE}{path}")
req.add_header("Authorization", f"token {token}")
return json.loads(urllib.request.urlopen(req, timeout=30).read())
# ---------------------------------------------------------------------------
# Issue fetching
# ---------------------------------------------------------------------------
def fetch_all_open_issues(repo: str, token: str) -> list[dict]:
"""Fetch all open issues for a repo (paginated)."""
issues = []
page = 1
while True:
batch = api_get(f"/repos/{ORG}/{repo}/issues?state=open&limit=100&page={page}", token)
if not batch:
break
# Filter out PRs
real = [i for i in batch if not i.get("pull_request")]
issues.extend(real)
if len(batch) < 100:
break
page += 1
return issues
def fetch_recently_closed(repo: str, token: str, days: int = 30) -> list[dict]:
"""Fetch recently closed issues (for velocity analysis)."""
since = (datetime.now(timezone.utc) - timedelta(days=days)).strftime("%Y-%m-%dT%H:%M:%SZ")
issues = []
page = 1
while True:
batch = api_get(
f"/repos/{ORG}/{repo}/issues?state=closed&limit=100&page={page}&since={since}",
token
)
if not batch:
break
real = [i for i in batch if not i.get("pull_request")]
issues.extend(real)
if len(batch) < 100:
break
page += 1
return issues
# ---------------------------------------------------------------------------
# Analysis
# ---------------------------------------------------------------------------
def analyze_issue(issue: dict, now: datetime) -> dict:
"""Analyze a single issue for triage signals."""
created = datetime.fromisoformat(issue["created_at"].replace("Z", "+00:00"))
updated = datetime.fromisoformat(issue["updated_at"].replace("Z", "+00:00"))
age_days = (now - created).days
stale_days = (now - updated).days
labels = [l["name"] for l in issue.get("labels", [])]
has_assignee = bool(issue.get("assignees"))
has_pr_ref = bool(re.search(r"#\d+|PR|pull", issue.get("body", ""), re.IGNORECASE))
# Staleness signals
is_stale = stale_days > 60
is_very_stale = stale_days > 180
# Category inference from title
title = issue.get("title", "").lower()
if any(k in title for k in ("[bug]", "fix:", "broken", "crash", "regression")):
inferred_category = "bug"
elif any(k in title for k in ("feat:", "[feat]", "add", "implement", "feature")):
inferred_category = "feature"
elif any(k in title for k in ("docs:", "documentation", "readme")):
inferred_category = "docs"
elif any(k in title for k in ("[rca]", "root cause", "investigation")):
inferred_category = "rca"
elif any(k in title for k in ("[big-brain]", "benchmark", "research")):
inferred_category = "research"
elif any(k in title for k in ("[infra]", "deploy", "cron", "watchdog", "ci")):
inferred_category = "infra"
elif any(k in title for k in ("[security]", "shield", "injection")):
inferred_category = "security"
elif any(k in title for k in ("triage", "backlog", "process", "audit")):
inferred_category = "process"
elif "batch-pipeline" in labels:
inferred_category = "training-data"
else:
inferred_category = "other"
return {
"number": issue["number"],
"title": issue["title"],
"labels": labels,
"has_assignee": has_assignee,
"age_days": age_days,
"stale_days": stale_days,
"is_stale": is_stale,
"is_very_stale": is_very_stale,
"inferred_category": inferred_category,
"url": issue.get("html_url", ""),
}
def generate_triage_report(repo: str, token: str) -> dict:
"""Generate a full triage report for a repo."""
now = datetime.now(timezone.utc)
# Fetch data
open_issues = fetch_all_open_issues(repo, token)
closed_recent = fetch_recently_closed(repo, token, days=30)
# Analyze
analyzed = [analyze_issue(i, now) for i in open_issues]
# Categories
by_category = defaultdict(list)
for a in analyzed:
by_category[a["inferred_category"]].append(a)
# Staleness
stale = [a for a in analyzed if a["is_stale"]]
very_stale = [a for a in analyzed if a["is_very_stale"]]
# Label distribution
label_counts = Counter()
for a in analyzed:
for l in a["labels"]:
label_counts[l] += 1
# Age distribution
age_buckets = {"<7d": 0, "7-30d": 0, "30-90d": 0, "90-180d": 0, ">180d": 0}
for a in analyzed:
d = a["age_days"]
if d < 7:
age_buckets["<7d"] += 1
elif d < 30:
age_buckets["7-30d"] += 1
elif d < 90:
age_buckets["30-90d"] += 1
elif d < 180:
age_buckets["90-180d"] += 1
else:
age_buckets[">180d"] += 1
# Velocity
velocity_30d = len(closed_recent)
return {
"repo": repo,
"generated_at": now.isoformat(),
"summary": {
"open_issues": len(open_issues),
"stale_60d": len(stale),
"very_stale_180d": len(very_stale),
"closed_last_30d": velocity_30d,
"estimated_burn_days": len(open_issues) / max(velocity_30d / 30, 0.1),
},
"by_category": {k: len(v) for k, v in by_category.items()},
"age_distribution": age_buckets,
"top_labels": dict(label_counts.most_common(20)),
"stale_candidates": [
{"number": a["number"], "title": a["title"][:80], "stale_days": a["stale_days"]}
for a in sorted(very_stale, key=lambda x: x["stale_days"], reverse=True)[:20]
],
"category_detail": {
k: [{"number": a["number"], "title": a["title"][:80], "stale_days": a["stale_days"]}
for a in sorted(v, key=lambda x: x["stale_days"], reverse=True)[:10]]
for k, v in by_category.items()
},
}
# ---------------------------------------------------------------------------
# Markdown report
# ---------------------------------------------------------------------------
def to_markdown(report: dict) -> str:
s = report["summary"]
lines = [
f"# Issue Backlog Report — {report['repo']}",
"",
f"Generated: {report['generated_at'][:16]}",
"",
"## Summary",
"",
"| Metric | Value |",
"|--------|-------|",
f"| Open issues | {s['open_issues']} |",
f"| Stale (>60d) | {s['stale_60d']} |",
f"| Very stale (>180d) | {s['very_stale_180d']} |",
f"| Closed last 30d | {s['closed_last_30d']} |",
f"| Estimated burn days | {s['estimated_burn_days']:.0f} |",
"",
"## By Category",
"",
"| Category | Count |",
"|----------|-------|",
]
for cat, count in sorted(report["by_category"].items(), key=lambda x: -x[1]):
lines.append(f"| {cat} | {count} |")
lines.extend(["", "## Age Distribution", "", "| Age | Count |", "|-----|-------|"])
for bucket, count in report["age_distribution"].items():
lines.append(f"| {bucket} | {count} |")
if report["stale_candidates"]:
lines.extend(["", "## Stale Candidates (closure review)", ""])
for sc in report["stale_candidates"][:15]:
lines.append(f"- #{sc['number']}: {sc['title']} (stale {sc['stale_days']}d)")
lines.extend(["", "## Top Labels", ""])
for label, count in list(report["top_labels"].items())[:10]:
lines.append(f"- {label}: {count}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
import argparse
parser = argparse.ArgumentParser(description="Issue Backlog Manager")
parser.add_argument("repo", help="Repository name (e.g., timmy-home)")
parser.add_argument("--json", action="store_true", help="JSON output")
parser.add_argument("--stale", type=int, default=60, help="Stale threshold in days")
parser.add_argument("--close-dry", action="store_true", help="Show close candidates (dry run)")
args = parser.parse_args()
token = _load_token()
report = generate_triage_report(args.repo, token)
if args.json:
print(json.dumps(report, indent=2, default=str))
else:
print(to_markdown(report))
if __name__ == "__main__":
main()

View File

@@ -168,3 +168,62 @@ else
fi
log "Script complete"
# ─── Stale Branch Cleanup ─────────────────────────────────
# Clean up branches from closed (unmerged) PRs and merged PRs
log "Checking for stale branches from closed/merged PRs..."
# Get all open PRs to avoid deleting active branches
OPEN_BRANCHES=$(curl -s -H "$AUTH" "$API/repos/$REPO/pulls?state=open&limit=100" | jq -r '.[] | .head.ref' | sort -u)
# Get all closed PRs (last 100)
CLOSED_PRS=$(curl -s -H "$AUTH" "$API/repos/$REPO/pulls?state=closed&limit=100")
if [ -n "$CLOSED_PRS" ] && [ "$CLOSED_PRS" != "null" ]; then
STALE_BRANCHES=$(echo "$CLOSED_PRS" | jq -r '.[] | select(.merged == false) | .head.ref' | sort -u)
MERGED_BRANCHES=$(echo "$CLOSED_PRS" | jq -r '.[] | select(.merged == true) | .head.ref' | sort -u)
STALE_COUNT=0
for branch in $STALE_BRANCHES; do
# Skip main/master/develop
case "$branch" in main|master|develop|HEAD) continue ;; esac
# SAFETY CHECK: Skip if branch is still used by an open PR
if echo "$OPEN_BRANCHES" | grep -q "^$branch$"; then
log "Skipping branch '$branch' - still has an open PR"
continue
fi
if [ "$DRY_RUN" = "true" ]; then
log "DRY RUN: Would delete stale branch '$branch' (from closed unmerged PR)"
else
curl -s -X DELETE -H "$AUTH" "$API/repos/$REPO/branches/$branch" > /dev/null 2>&1 || true
log "Deleted stale branch: $branch"
fi
STALE_COUNT=$((STALE_COUNT + 1))
done
MERGED_COUNT=0
for branch in $MERGED_BRANCHES; do
case "$branch" in main|master|develop|HEAD) continue ;; esac
if echo "$OPEN_BRANCHES" | grep -q "^$branch$"; then
log "Skipping branch '$branch' - still has an open PR"
continue
fi
if [ "$DRY_RUN" = "true" ]; then
log "DRY RUN: Would delete merged branch '$branch'"
else
curl -s -X DELETE -H "$AUTH" "$API/repos/$REPO/branches/$branch" > /dev/null 2>&1 || true
log "Deleted merged branch: $branch"
fi
MERGED_COUNT=$((MERGED_COUNT + 1))
done
log "Stale branch cleanup:"
log " Closed (unmerged) branches: $STALE_COUNT"
log " Merged branches: $MERGED_COUNT"
else
log "Could not fetch closed PRs for branch cleanup"
fi

View File

@@ -1,123 +0,0 @@
"""Tests for issue backlog manager."""
import json
from datetime import datetime, timezone, timedelta
from unittest.mock import patch, MagicMock
import pytest
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "bin"))
from issue_backlog_manager import analyze_issue, to_markdown
@pytest.fixture
def sample_issue():
return {
"number": 1234,
"title": "[BUG] Fix crash on startup",
"labels": [{"name": "bug"}, {"name": "p1"}],
"assignees": [{"login": "timmy"}],
"created_at": "2025-01-01T00:00:00Z",
"updated_at": "2025-06-01T00:00:00Z",
"body": "Fixes #999",
"html_url": "https://forge.example.com/...",
}
class TestAnalyzeIssue:
def test_categorizes_bug(self, sample_issue):
now = datetime(2026, 4, 14, tzinfo=timezone.utc)
result = analyze_issue(sample_issue, now)
assert result["inferred_category"] == "bug"
def test_categorizes_feature(self, sample_issue):
sample_issue["title"] = "feat: Add new widget"
now = datetime(2026, 4, 14, tzinfo=timezone.utc)
result = analyze_issue(sample_issue, now)
assert result["inferred_category"] == "feature"
def test_categorizes_docs(self, sample_issue):
sample_issue["title"] = "docs: Update README"
now = datetime(2026, 4, 14, tzinfo=timezone.utc)
result = analyze_issue(sample_issue, now)
assert result["inferred_category"] == "docs"
def test_categorizes_training_data(self, sample_issue):
sample_issue["title"] = "Some issue"
sample_issue["labels"] = [{"name": "batch-pipeline"}]
now = datetime(2026, 4, 14, tzinfo=timezone.utc)
result = analyze_issue(sample_issue, now)
assert result["inferred_category"] == "training-data"
def test_detects_staleness(self, sample_issue):
# Updated 300 days ago
sample_issue["updated_at"] = "2025-06-01T00:00:00Z"
now = datetime(2026, 4, 14, tzinfo=timezone.utc)
result = analyze_issue(sample_issue, now)
assert result["is_stale"] is True
assert result["stale_days"] > 200
def test_detects_not_stale(self, sample_issue):
sample_issue["updated_at"] = "2026-04-10T00:00:00Z"
now = datetime(2026, 4, 14, tzinfo=timezone.utc)
result = analyze_issue(sample_issue, now)
assert result["is_stale"] is False
def test_age_days(self, sample_issue):
sample_issue["created_at"] = "2026-01-01T00:00:00Z"
now = datetime(2026, 4, 14, tzinfo=timezone.utc)
result = analyze_issue(sample_issue, now)
assert result["age_days"] > 100
def test_has_assignee(self, sample_issue):
now = datetime(2026, 4, 14, tzinfo=timezone.utc)
result = analyze_issue(sample_issue, now)
assert result["has_assignee"] is True
def test_no_assignee(self, sample_issue):
sample_issue["assignees"] = []
now = datetime(2026, 4, 14, tzinfo=timezone.utc)
result = analyze_issue(sample_issue, now)
assert result["has_assignee"] is False
def test_extracts_number(self, sample_issue):
now = datetime(2026, 4, 14, tzinfo=timezone.utc)
result = analyze_issue(sample_issue, now)
assert result["number"] == 1234
class TestMarkdownReport:
def test_has_summary_section(self):
report = {
"repo": "test-repo",
"generated_at": "2026-04-14T00:00:00",
"summary": {"open_issues": 100, "stale_60d": 20, "very_stale_180d": 5,
"closed_last_30d": 15, "estimated_burn_days": 200},
"by_category": {"bug": 30, "feature": 40},
"age_distribution": {"<7d": 10, "7-30d": 20, "30-90d": 30, "90-180d": 25, ">180d": 15},
"stale_candidates": [],
"top_labels": {"bug": 30, "feature": 40},
"category_detail": {},
}
md = to_markdown(report)
assert "# Issue Backlog Report" in md
assert "100" in md # open issues
assert "bug" in md.lower()
def test_shows_stale_candidates(self):
report = {
"repo": "test",
"generated_at": "2026-04-14",
"summary": {"open_issues": 1, "stale_60d": 1, "very_stale_180d": 1,
"closed_last_30d": 0, "estimated_burn_days": 999},
"by_category": {},
"age_distribution": {},
"stale_candidates": [{"number": 99, "title": "Old issue", "stale_days": 500}],
"top_labels": {},
"category_detail": {},
}
md = to_markdown(report)
assert "#99" in md
assert "500" in md