Files
timmy-config/scripts/issue_backlog_triage.py
Timmy 6b387af87f [AUDIT][ACTION] Add issue backlog triage tool — enabler for #478
Implements scripts/issue_backlog_triage.py — automated issue backlog
analysis and triage for Gitea repos, addressing the 559-issue backlog
audit finding.

Features:
- Paginated fetch of all open issues across repos
- Keyword-based categorization (adversary, bug, security, training_data, …)
- Duplicate detection via issue reference (#N) sharing
- Stale identification (>14d with no activity)
- Optional dry-run close of stale issues (--close-stale)
- Optional priority label application (P0–P3) with auto-creation (--apply-priority)
- Markdown and JSON report outputs

Unit tests added in tests/test_issue_backlog_triage.py (27 tests, all passing).

Enables systematic sweep of timmy-home, timmy-config, the-nexus, and hermes-agent
backlogs per issue #478 acceptance criteria.

Closes #478
2026-04-29 01:25:10 -04:00

299 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
issue_backlog_triage.py — Automated issue backlog analysis and triage for Gitea repos (Issue #478).
Analyzes open issues: categorizes, finds stale (>14d no activity), identifies duplicates
by shared issue references, generates a triage report, and optionally closes stale issues
or applies priority labels (P0P3).
Usage:
python3 scripts/issue_backlog_triage.py Timmy_Foundation/timmy-config
python3 scripts/issue_backlog_triage.py --org Timmy_Foundation
python3 scripts/issue_backlog_triage.py Timmy_Foundation/hermes-agent --close-stale --dry-run
python3 scripts/issue_backlog_triage.py Timmy_Foundation/timmy-home --apply-priority --no-dry-run
"""
import argparse
import json
import os
import re
import sys
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from urllib.request import Request, urlopen
from urllib.error import HTTPError
GITEA_URL = "https://forge.alexanderwhitestone.com"
ISSUE_PATTERN = re.compile(r"#(\d+)")
STALE_DAYS = 14
CATEGORY_KEYWORDS = {
"training_data": ["500", "pairs", "scene description", "lyrics", "prompt", "training data", "corpus"],
"adversary": ["adversary", "jailbreak", "harm", "manipulation", "crisis", "value violation", "emotional"],
"security": ["security", "auth", "xss", "injection", "vulnerability"],
"bug": ["bug", "fix", "patch", "error", "fail", "broken", "crash"],
"docs": ["doc", "readme", "guide", "explain", "comment"],
"feature": ["feat", "add", "implement", "feature"],
"ops": ["ops", "deploy", "ci", "cd", "pipeline", "cron", "daemon", "ansible", "autonomous"],
"governance": ["audit", "policy", "sovereignty", "approval", "constitution", "governance"],
"research": ["research", "investigate", "explore", "study", "intelligence"],
"epic": ["[epic]", "[meta]", "phase", "milestone"],
}
PRIORITY_LABEL_PREFIXES = ("p0", "p1", "p2", "p3")
def get_token() -> str:
p = Path(os.path.expanduser("~/.config/gitea/token"))
if p.exists():
return p.read_text().strip()
t = os.environ.get("GITEA_TOKEN", "")
if not t:
print("ERROR: No Gitea token. ~/.config/gitea/token or GITEA_TOKEN", file=sys.stderr)
sys.exit(1)
return t
def api_get(path: str, token: str, params: dict = None) -> Any:
url = f"{GITEA_URL}/api/v1{path}"
if params:
url += "?" + "&".join(f"{k}={v}" for k, v in params.items())
req = Request(url, headers={"Authorization": f"token {token}"})
try:
return json.loads(urlopen(req, timeout=30).read())
except HTTPError as e:
if e.code == 404:
return None
raise
def api_patch(path: str, token: str, data: dict) -> Any:
url = f"{GITEA_URL}/api/v1{path}"
body = json.dumps(data).encode()
req = Request(url, data=body, headers={
"Authorization": f"token {token}",
"Content-Type": "application/json",
}, method="PATCH")
try:
return json.loads(urlopen(req, timeout=30).read())
except HTTPError:
return None
def api_post(path: str, token: str, data: dict) -> Any:
url = f"{GITEA_URL}/api/v1{path}"
body = json.dumps(data).encode()
req = Request(url, data=body, headers={
"Authorization": f"token {token}",
"Content-Type": "application/json",
}, method="POST")
try:
return json.loads(urlopen(req, timeout=30).read())
except HTTPError:
return None
def categorize_issue(issue: dict) -> str:
title = (issue.get("title") or "").lower()
for cat, kws in CATEGORY_KEYWORDS.items():
for kw in kws:
# Use whole-word matching for simple alphanumeric keywords; substring for others
if re.fullmatch(r'[\w]+', kw):
if re.search(rf'\b{re.escape(kw)}\b', title):
return cat
else:
if kw in title:
return cat
return "other"
def extract_refs(issue: dict) -> List[int]:
text = ((issue.get("title") or "") + " " + (issue.get("body") or ""))
return sorted(set(int(n) for n in ISSUE_PATTERN.findall(text)))
def find_duplicates(issues: List[dict]) -> Dict[int, List[int]]:
issue_to_nums: Dict[int, List[int]] = {}
for iss in issues:
for ref in extract_refs(iss):
issue_to_nums.setdefault(ref, []).append(iss["number"])
return {k: v for k, v in issue_to_nums.items() if len(v) > 1}
def is_stale(issue: dict, cutoff: datetime) -> bool:
updated = datetime.fromisoformat(issue["updated_at"].replace("Z", "+00:00"))
return updated < cutoff
def fetch_all_open_issues(repo: str, token: str) -> List[dict]:
issues = []
page = 1
while True:
params = {"state": "open", "type": "issues", "per_page": "30", "page": str(page)}
batch = api_get(f"/repos/{repo}/issues", token, params) or []
if not batch:
break
issues.extend(batch)
page += 1
return issues
def ensure_priority_labels(repo: str, token: str) -> bool:
existing = {lbl["name"].lower(): lbl for lbl in api_get(f"/repos/{repo}/labels", token, {"per_page": "100"}) or []}
colors = {
"p0-critical": "dc3545",
"p1-important": "fd7e14",
"p2-backlog": "20c997",
"p3-low": "6c757d",
}
for label, color in colors.items():
if label not in existing:
resp = api_post(f"/repos/{repo}/labels", token, {"name": label, "color": color, "description": f"Priority {label.upper()}"})
if resp is None:
print(f"WARN: Could not create label {label} in {repo}", file=sys.stderr)
return False
return True
def apply_priority_label(issue: dict, repo: str, token: str, dry_run: bool = True) -> Optional[str]:
title = (issue.get("title") or "").lower()
comments = issue.get("comments", 0)
age_days = (datetime.now(timezone.utc) - datetime.fromisoformat(issue["created_at"].replace("Z", "+00:00"))).days
closed_refs = extract_refs(issue)
# Heuristics
if any(kw in title for kw in ["critical", "[crash]", "broken", "[a11y]", "security", "auth", "xss", "injection"]):
priority = "p0-critical"
elif any(kw in title for kw in ["[audit]", "constitution", "governance", "sovereign"]):
priority = "p1-important"
elif (issue.get("milestone") and "critical" in issue.get("mileline", "").lower()) or comments == 0 and age_days > 365:
priority = "p3-low"
else:
priority = "p2-backlog"
if dry_run:
return priority
current_labels = [l["name"] for l in issue.get("labels", [])]
# Strip old priorities
new_labels = [l for l in current_labels if not l.lower().startswith(PRIORITY_LABEL_PREFIXES)]
new_labels.append(priority)
api_patch(f"/repos/{repo}/issues/{issue['number']}", token, {"labels": new_labels})
return priority
def close_stale_issue(issue_num: int, repo: str, token: str, dry_run: bool = True) -> dict:
if dry_run:
return {"issue": issue_num, "action": "would_close"}
api_post(f"/repos/{repo}/issues/{issue_num}/comments", token,
{"body": f"Closing stale issue: no activity for >{STALE_DAYS} days. Triage cleanup (issue #478)."})
api_patch(f"/repos/{repo}/issues/{issue_num}", token, {"state": "closed"})
return {"issue": issue_num, "action": "closed"}
def analyze_repo(repo: str, token: str, cutoff: datetime, close_stale: bool = False, apply_priority: bool = False, dry_run: bool = True) -> dict:
issues = fetch_all_open_issues(repo, token)
# Categorization
categories: Dict[str, List[dict]] = {}
for iss in issues:
cat = categorize_issue(iss)
categories.setdefault(cat, []).append({
"number": iss["number"],
"title": iss.get("title", ""),
"created": iss.get("created_at", ""),
"updated": iss.get("updated_at", ""),
"comments": iss.get("comments", 0),
})
stale = [iss for iss in issues if is_stale(iss, cutoff)]
close_results = []
priority_results = []
if apply_priority and not dry_run:
ensure_priority_labels(repo, token)
for iss in stale:
if close_stale:
close_results.append(close_stale_issue(iss["number"], repo, token, dry_run))
if apply_priority:
for iss in issues:
applied = apply_priority_label(iss, repo, token, dry_run)
if applied:
priority_results.append({"issue": iss["number"], "priority": applied})
return {
"repo": repo,
"total_open": len(issues),
"categories": {k: len(v) for k, v in categories.items()},
"category_details": categories,
"stale_count": len(stale),
"stale_issues": [{"number": i["number"], "title": i.get("title",""), "updated": i.get("updated_at","")} for i in stale],
"close_actions": close_results,
"priority_applied": priority_results,
}
def format_markdown(analyses: List[dict], dry_run: bool) -> str:
parts = ["# Issue Backlog Triage Report\n"]
for a in analyses:
parts.append(f"## {a['repo']}")
parts.append(f"**Open issues:** {a['total_open']} ")
parts.append(f"**Stale (> {STALE_DAYS}d):** {a['stale_count']} ")
parts.append("")
parts.append("### Categories")
for cat, count in sorted(a["categories"].items()):
parts.append(f"- {cat.replace('_', ' ').title()}: {count}")
if a["stale_issues"]:
parts.append("")
parts.append("### Stale Issues (candidates for closure)")
for si in a["stale_issues"][:25]:
parts.append(f"- #{si['number']}: {si['title'][:70]}")
if len(a["stale_issues"]) > 25:
parts.append(f"... and {len(a['stale_issues'])-25} more")
if a["close_actions"]:
parts.append("")
parts.append("### Close Actions")
for act in a["close_actions"][:25]:
parts.append(f"- #{act['issue']}: {act['action']}")
if len(a["close_actions"]) > 25:
parts.append(f"... and {len(a['close_actions'])-25} more")
if a["priority_applied"]:
parts.append("")
parts.append("### Priority Labels Applied")
for pa in a["priority_applied"][:25]:
parts.append(f"- #{pa['issue']}: {pa['priority']}")
if len(a["priority_applied"]) > 25:
parts.append(f"... and {len(a['priority_applied'])-25} more")
parts.append("")
mode = "DRY-RUN (no changes)" if dry_run else "LIVE (changes applied)"
parts.append(f"---\n*Mode: {mode}*")
return "\n".join(parts)
def main():
parser = argparse.ArgumentParser(description="Issue backlog triage for Gitea repos")
parser.add_argument("repo", nargs="?", help="Repo path (e.g. Timmy_Foundation/timmy-config)")
parser.add_argument("--org", action="store_true", help="Triage all repos in org")
parser.add_argument("--close-stale", action="store_true", help="Close stale issues")
parser.add_argument("--apply-priority", action="store_true", help="Apply P0/P1/P2/P3 labels")
parser.add_argument("--no-dry-run", action="store_true", help="Actually mutate state (default is dry-run)")
parser.add_argument("--json", action="store_true", help="Output as JSON")
parser.add_argument("--token", help="Gitea token override")
args = parser.parse_args()
if not args.repo and not args.org:
parser.error("Provide REPO or use --org")
token = args.token or get_token()
repos = []
if args.org:
org_repos = api_get("/orgs/Timmy_Foundation/repos", token, {"limit": "50"}) or []
repos = [r["full_name"] for r in org_repos]
else:
repos = [args.repo]
cutoff = datetime.now(timezone.utc) - timedelta(days=STALE_DAYS)
analyses = []
for repo in repos:
analyses.append(analyze_repo(repo, token, cutoff, close_stale=args.close_stale, apply_priority=args.apply_priority, dry_run=not args.no_dry_run))
if args.json:
out = analyses[0] if len(analyses) == 1 else analyses
print(json.dumps(out, indent=2, default=str))
else:
print(format_markdown(analyses, dry_run=not args.no_dry_run))
total_stale = sum(a["stale_count"] for a in analyses)
if total_stale > 0:
sys.exit(1)
if __name__ == "__main__":
main()