#!/usr/bin/env python3 """ issue_backlog_triage.py — Automated issue backlog analysis and triage for Gitea repos (Issue #478). Analyzes open issues: categorizes, finds stale (>14d no activity), identifies duplicates by shared issue references, generates a triage report, and optionally closes stale issues or applies priority labels (P0–P3). Usage: python3 scripts/issue_backlog_triage.py Timmy_Foundation/timmy-config python3 scripts/issue_backlog_triage.py --org Timmy_Foundation python3 scripts/issue_backlog_triage.py Timmy_Foundation/hermes-agent --close-stale --dry-run python3 scripts/issue_backlog_triage.py Timmy_Foundation/timmy-home --apply-priority --no-dry-run """ import argparse import json import os import re import sys from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from urllib.request import Request, urlopen from urllib.error import HTTPError GITEA_URL = "https://forge.alexanderwhitestone.com" ISSUE_PATTERN = re.compile(r"#(\d+)") STALE_DAYS = 14 CATEGORY_KEYWORDS = { "training_data": ["500", "pairs", "scene description", "lyrics", "prompt", "training data", "corpus"], "adversary": ["adversary", "jailbreak", "harm", "manipulation", "crisis", "value violation", "emotional"], "security": ["security", "auth", "xss", "injection", "vulnerability"], "bug": ["bug", "fix", "patch", "error", "fail", "broken", "crash"], "docs": ["doc", "readme", "guide", "explain", "comment"], "feature": ["feat", "add", "implement", "feature"], "ops": ["ops", "deploy", "ci", "cd", "pipeline", "cron", "daemon", "ansible", "autonomous"], "governance": ["audit", "policy", "sovereignty", "approval", "constitution", "governance"], "research": ["research", "investigate", "explore", "study", "intelligence"], "epic": ["[epic]", "[meta]", "phase", "milestone"], } PRIORITY_LABEL_PREFIXES = ("p0", "p1", "p2", "p3") def get_token() -> str: p = Path(os.path.expanduser("~/.config/gitea/token")) if p.exists(): return p.read_text().strip() t = os.environ.get("GITEA_TOKEN", "") if not t: print("ERROR: No Gitea token. ~/.config/gitea/token or GITEA_TOKEN", file=sys.stderr) sys.exit(1) return t def api_get(path: str, token: str, params: dict = None) -> Any: url = f"{GITEA_URL}/api/v1{path}" if params: url += "?" + "&".join(f"{k}={v}" for k, v in params.items()) req = Request(url, headers={"Authorization": f"token {token}"}) try: return json.loads(urlopen(req, timeout=30).read()) except HTTPError as e: if e.code == 404: return None raise def api_patch(path: str, token: str, data: dict) -> Any: url = f"{GITEA_URL}/api/v1{path}" body = json.dumps(data).encode() req = Request(url, data=body, headers={ "Authorization": f"token {token}", "Content-Type": "application/json", }, method="PATCH") try: return json.loads(urlopen(req, timeout=30).read()) except HTTPError: return None def api_post(path: str, token: str, data: dict) -> Any: url = f"{GITEA_URL}/api/v1{path}" body = json.dumps(data).encode() req = Request(url, data=body, headers={ "Authorization": f"token {token}", "Content-Type": "application/json", }, method="POST") try: return json.loads(urlopen(req, timeout=30).read()) except HTTPError: return None def categorize_issue(issue: dict) -> str: title = (issue.get("title") or "").lower() for cat, kws in CATEGORY_KEYWORDS.items(): for kw in kws: # Use whole-word matching for simple alphanumeric keywords; substring for others if re.fullmatch(r'[\w]+', kw): if re.search(rf'\b{re.escape(kw)}\b', title): return cat else: if kw in title: return cat return "other" def extract_refs(issue: dict) -> List[int]: text = ((issue.get("title") or "") + " " + (issue.get("body") or "")) return sorted(set(int(n) for n in ISSUE_PATTERN.findall(text))) def find_duplicates(issues: List[dict]) -> Dict[int, List[int]]: issue_to_nums: Dict[int, List[int]] = {} for iss in issues: for ref in extract_refs(iss): issue_to_nums.setdefault(ref, []).append(iss["number"]) return {k: v for k, v in issue_to_nums.items() if len(v) > 1} def is_stale(issue: dict, cutoff: datetime) -> bool: updated = datetime.fromisoformat(issue["updated_at"].replace("Z", "+00:00")) return updated < cutoff def fetch_all_open_issues(repo: str, token: str) -> List[dict]: issues = [] page = 1 while True: params = {"state": "open", "type": "issues", "per_page": "30", "page": str(page)} batch = api_get(f"/repos/{repo}/issues", token, params) or [] if not batch: break issues.extend(batch) page += 1 return issues def ensure_priority_labels(repo: str, token: str) -> bool: existing = {lbl["name"].lower(): lbl for lbl in api_get(f"/repos/{repo}/labels", token, {"per_page": "100"}) or []} colors = { "p0-critical": "dc3545", "p1-important": "fd7e14", "p2-backlog": "20c997", "p3-low": "6c757d", } for label, color in colors.items(): if label not in existing: resp = api_post(f"/repos/{repo}/labels", token, {"name": label, "color": color, "description": f"Priority {label.upper()}"}) if resp is None: print(f"WARN: Could not create label {label} in {repo}", file=sys.stderr) return False return True def apply_priority_label(issue: dict, repo: str, token: str, dry_run: bool = True) -> Optional[str]: title = (issue.get("title") or "").lower() comments = issue.get("comments", 0) age_days = (datetime.now(timezone.utc) - datetime.fromisoformat(issue["created_at"].replace("Z", "+00:00"))).days closed_refs = extract_refs(issue) # Heuristics if any(kw in title for kw in ["critical", "[crash]", "broken", "[a11y]", "security", "auth", "xss", "injection"]): priority = "p0-critical" elif any(kw in title for kw in ["[audit]", "constitution", "governance", "sovereign"]): priority = "p1-important" elif (issue.get("milestone") and "critical" in issue.get("mileline", "").lower()) or comments == 0 and age_days > 365: priority = "p3-low" else: priority = "p2-backlog" if dry_run: return priority current_labels = [l["name"] for l in issue.get("labels", [])] # Strip old priorities new_labels = [l for l in current_labels if not l.lower().startswith(PRIORITY_LABEL_PREFIXES)] new_labels.append(priority) api_patch(f"/repos/{repo}/issues/{issue['number']}", token, {"labels": new_labels}) return priority def close_stale_issue(issue_num: int, repo: str, token: str, dry_run: bool = True) -> dict: if dry_run: return {"issue": issue_num, "action": "would_close"} api_post(f"/repos/{repo}/issues/{issue_num}/comments", token, {"body": f"Closing stale issue: no activity for >{STALE_DAYS} days. Triage cleanup (issue #478)."}) api_patch(f"/repos/{repo}/issues/{issue_num}", token, {"state": "closed"}) return {"issue": issue_num, "action": "closed"} def analyze_repo(repo: str, token: str, cutoff: datetime, close_stale: bool = False, apply_priority: bool = False, dry_run: bool = True) -> dict: issues = fetch_all_open_issues(repo, token) # Categorization categories: Dict[str, List[dict]] = {} for iss in issues: cat = categorize_issue(iss) categories.setdefault(cat, []).append({ "number": iss["number"], "title": iss.get("title", ""), "created": iss.get("created_at", ""), "updated": iss.get("updated_at", ""), "comments": iss.get("comments", 0), }) stale = [iss for iss in issues if is_stale(iss, cutoff)] close_results = [] priority_results = [] if apply_priority and not dry_run: ensure_priority_labels(repo, token) for iss in stale: if close_stale: close_results.append(close_stale_issue(iss["number"], repo, token, dry_run)) if apply_priority: for iss in issues: applied = apply_priority_label(iss, repo, token, dry_run) if applied: priority_results.append({"issue": iss["number"], "priority": applied}) return { "repo": repo, "total_open": len(issues), "categories": {k: len(v) for k, v in categories.items()}, "category_details": categories, "stale_count": len(stale), "stale_issues": [{"number": i["number"], "title": i.get("title",""), "updated": i.get("updated_at","")} for i in stale], "close_actions": close_results, "priority_applied": priority_results, } def format_markdown(analyses: List[dict], dry_run: bool) -> str: parts = ["# Issue Backlog Triage Report\n"] for a in analyses: parts.append(f"## {a['repo']}") parts.append(f"**Open issues:** {a['total_open']} ") parts.append(f"**Stale (> {STALE_DAYS}d):** {a['stale_count']} ") parts.append("") parts.append("### Categories") for cat, count in sorted(a["categories"].items()): parts.append(f"- {cat.replace('_', ' ').title()}: {count}") if a["stale_issues"]: parts.append("") parts.append("### Stale Issues (candidates for closure)") for si in a["stale_issues"][:25]: parts.append(f"- #{si['number']}: {si['title'][:70]}") if len(a["stale_issues"]) > 25: parts.append(f"... and {len(a['stale_issues'])-25} more") if a["close_actions"]: parts.append("") parts.append("### Close Actions") for act in a["close_actions"][:25]: parts.append(f"- #{act['issue']}: {act['action']}") if len(a["close_actions"]) > 25: parts.append(f"... and {len(a['close_actions'])-25} more") if a["priority_applied"]: parts.append("") parts.append("### Priority Labels Applied") for pa in a["priority_applied"][:25]: parts.append(f"- #{pa['issue']}: {pa['priority']}") if len(a["priority_applied"]) > 25: parts.append(f"... and {len(a['priority_applied'])-25} more") parts.append("") mode = "DRY-RUN (no changes)" if dry_run else "LIVE (changes applied)" parts.append(f"---\n*Mode: {mode}*") return "\n".join(parts) def main(): parser = argparse.ArgumentParser(description="Issue backlog triage for Gitea repos") parser.add_argument("repo", nargs="?", help="Repo path (e.g. Timmy_Foundation/timmy-config)") parser.add_argument("--org", action="store_true", help="Triage all repos in org") parser.add_argument("--close-stale", action="store_true", help="Close stale issues") parser.add_argument("--apply-priority", action="store_true", help="Apply P0/P1/P2/P3 labels") parser.add_argument("--no-dry-run", action="store_true", help="Actually mutate state (default is dry-run)") parser.add_argument("--json", action="store_true", help="Output as JSON") parser.add_argument("--token", help="Gitea token override") args = parser.parse_args() if not args.repo and not args.org: parser.error("Provide REPO or use --org") token = args.token or get_token() repos = [] if args.org: org_repos = api_get("/orgs/Timmy_Foundation/repos", token, {"limit": "50"}) or [] repos = [r["full_name"] for r in org_repos] else: repos = [args.repo] cutoff = datetime.now(timezone.utc) - timedelta(days=STALE_DAYS) analyses = [] for repo in repos: analyses.append(analyze_repo(repo, token, cutoff, close_stale=args.close_stale, apply_priority=args.apply_priority, dry_run=not args.no_dry_run)) if args.json: out = analyses[0] if len(analyses) == 1 else analyses print(json.dumps(out, indent=2, default=str)) else: print(format_markdown(analyses, dry_run=not args.no_dry_run)) total_stale = sum(a["stale_count"] for a in analyses) if total_stale > 0: sys.exit(1) if __name__ == "__main__": main()