Compare commits

...

1 Commits

Author SHA1 Message Date
da7f017b34 [AUDIT][ACTION] Add issue backlog triage tool — enabler for #478
Some checks failed
Smoke Test / smoke (pull_request) Failing after 21s
Architecture Lint / Linter Tests (pull_request) Successful in 22s
Validate Config / YAML Lint (pull_request) Failing after 13s
Validate Config / JSON Validate (pull_request) Successful in 15s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 48s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Shell Script Lint (pull_request) Failing after 51s
Validate Config / Cron Syntax Check (pull_request) Successful in 10s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 10s
Validate Config / Playbook Schema Validation (pull_request) Successful in 20s
Architecture Lint / Lint Repository (pull_request) Failing after 18s
PR Checklist / pr-checklist (pull_request) Successful in 2m46s
Implements scripts/issue_backlog_triage.py — automated issue backlog
analysis and triage for Gitea repos, addressing the 559-issue backlog
audit finding.

Features:
- Paginated fetch of all open issues across repos
- Keyword-based categorization (adversary, bug, security, training_data, …)
- Duplicate detection via issue reference (#N) sharing
- Stale identification (>14d with no activity)
- Optional dry-run close of stale issues (--close-stale)
- Optional priority label application (P0–P3) with auto-creation (--apply-priority)
- Markdown and JSON report outputs

Unit tests added in tests/test_issue_backlog_triage.py (27 tests, all passing).

Enables systematic sweep of timmy-home, timmy-config, the-nexus, and hermes-agent
backlogs per issue #478 acceptance criteria.

Closes #478
2026-04-26 14:54:15 -04:00
2 changed files with 452 additions and 0 deletions

View File

@@ -0,0 +1,298 @@
#!/usr/bin/env python3
"""
issue_backlog_triage.py — Automated issue backlog analysis and triage for Gitea repos (Issue #478).
Analyzes open issues: categorizes, finds stale (>14d no activity), identifies duplicates
by shared issue references, generates a triage report, and optionally closes stale issues
or applies priority labels (P0P3).
Usage:
python3 scripts/issue_backlog_triage.py Timmy_Foundation/timmy-config
python3 scripts/issue_backlog_triage.py --org Timmy_Foundation
python3 scripts/issue_backlog_triage.py Timmy_Foundation/hermes-agent --close-stale --dry-run
python3 scripts/issue_backlog_triage.py Timmy_Foundation/timmy-home --apply-priority --no-dry-run
"""
import argparse
import json
import os
import re
import sys
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from urllib.request import Request, urlopen
from urllib.error import HTTPError
GITEA_URL = "https://forge.alexanderwhitestone.com"
ISSUE_PATTERN = re.compile(r"#(\d+)")
STALE_DAYS = 14
CATEGORY_KEYWORDS = {
"training_data": ["500", "pairs", "scene description", "lyrics", "prompt", "training data", "corpus"],
"adversary": ["adversary", "jailbreak", "harm", "manipulation", "crisis", "value violation", "emotional"],
"security": ["security", "auth", "xss", "injection", "vulnerability"],
"bug": ["bug", "fix", "patch", "error", "fail", "broken", "crash"],
"docs": ["doc", "readme", "guide", "explain", "comment"],
"feature": ["feat", "add", "implement", "feature"],
"ops": ["ops", "deploy", "ci", "cd", "pipeline", "cron", "daemon", "ansible", "autonomous"],
"governance": ["audit", "policy", "sovereignty", "approval", "constitution", "governance"],
"research": ["research", "investigate", "explore", "study", "intelligence"],
"epic": ["[epic]", "[meta]", "phase", "milestone"],
}
PRIORITY_LABEL_PREFIXES = ("p0", "p1", "p2", "p3")
def get_token() -> str:
p = Path(os.path.expanduser("~/.config/gitea/token"))
if p.exists():
return p.read_text().strip()
t = os.environ.get("GITEA_TOKEN", "")
if not t:
print("ERROR: No Gitea token. ~/.config/gitea/token or GITEA_TOKEN", file=sys.stderr)
sys.exit(1)
return t
def api_get(path: str, token: str, params: dict = None) -> Any:
url = f"{GITEA_URL}/api/v1{path}"
if params:
url += "?" + "&".join(f"{k}={v}" for k, v in params.items())
req = Request(url, headers={"Authorization": f"token {token}"})
try:
return json.loads(urlopen(req, timeout=30).read())
except HTTPError as e:
if e.code == 404:
return None
raise
def api_patch(path: str, token: str, data: dict) -> Any:
url = f"{GITEA_URL}/api/v1{path}"
body = json.dumps(data).encode()
req = Request(url, data=body, headers={
"Authorization": f"token {token}",
"Content-Type": "application/json",
}, method="PATCH")
try:
return json.loads(urlopen(req, timeout=30).read())
except HTTPError:
return None
def api_post(path: str, token: str, data: dict) -> Any:
url = f"{GITEA_URL}/api/v1{path}"
body = json.dumps(data).encode()
req = Request(url, data=body, headers={
"Authorization": f"token {token}",
"Content-Type": "application/json",
}, method="POST")
try:
return json.loads(urlopen(req, timeout=30).read())
except HTTPError:
return None
def categorize_issue(issue: dict) -> str:
title = (issue.get("title") or "").lower()
for cat, kws in CATEGORY_KEYWORDS.items():
for kw in kws:
# Use whole-word matching for simple alphanumeric keywords; substring for others
if re.fullmatch(r'[\w]+', kw):
if re.search(rf'\b{re.escape(kw)}\b', title):
return cat
else:
if kw in title:
return cat
return "other"
def extract_refs(issue: dict) -> List[int]:
text = ((issue.get("title") or "") + " " + (issue.get("body") or ""))
return sorted(set(int(n) for n in ISSUE_PATTERN.findall(text)))
def find_duplicates(issues: List[dict]) -> Dict[int, List[int]]:
issue_to_nums: Dict[int, List[int]] = {}
for iss in issues:
for ref in extract_refs(iss):
issue_to_nums.setdefault(ref, []).append(iss["number"])
return {k: v for k, v in issue_to_nums.items() if len(v) > 1}
def is_stale(issue: dict, cutoff: datetime) -> bool:
updated = datetime.fromisoformat(issue["updated_at"].replace("Z", "+00:00"))
return updated < cutoff
def fetch_all_open_issues(repo: str, token: str) -> List[dict]:
issues = []
page = 1
while True:
params = {"state": "open", "type": "issues", "per_page": "30", "page": str(page)}
batch = api_get(f"/repos/{repo}/issues", token, params) or []
if not batch:
break
issues.extend(batch)
page += 1
return issues
def ensure_priority_labels(repo: str, token: str) -> bool:
existing = {lbl["name"].lower(): lbl for lbl in api_get(f"/repos/{repo}/labels", token, {"per_page": "100"}) or []}
colors = {
"p0-critical": "dc3545",
"p1-important": "fd7e14",
"p2-backlog": "20c997",
"p3-low": "6c757d",
}
for label, color in colors.items():
if label not in existing:
resp = api_post(f"/repos/{repo}/labels", token, {"name": label, "color": color, "description": f"Priority {label.upper()}"})
if resp is None:
print(f"WARN: Could not create label {label} in {repo}", file=sys.stderr)
return False
return True
def apply_priority_label(issue: dict, repo: str, token: str, dry_run: bool = True) -> Optional[str]:
title = (issue.get("title") or "").lower()
comments = issue.get("comments", 0)
age_days = (datetime.now(timezone.utc) - datetime.fromisoformat(issue["created_at"].replace("Z", "+00:00"))).days
closed_refs = extract_refs(issue)
# Heuristics
if any(kw in title for kw in ["critical", "[crash]", "broken", "[a11y]", "security", "auth", "xss", "injection"]):
priority = "p0-critical"
elif any(kw in title for kw in ["[audit]", "constitution", "governance", "sovereign"]):
priority = "p1-important"
elif (issue.get("milestone") and "critical" in issue.get("mileline", "").lower()) or comments == 0 and age_days > 365:
priority = "p3-low"
else:
priority = "p2-backlog"
if dry_run:
return priority
current_labels = [l["name"] for l in issue.get("labels", [])]
# Strip old priorities
new_labels = [l for l in current_labels if not l.lower().startswith(PRIORITY_LABEL_PREFIXES)]
new_labels.append(priority)
api_patch(f"/repos/{repo}/issues/{issue['number']}", token, {"labels": new_labels})
return priority
def close_stale_issue(issue_num: int, repo: str, token: str, dry_run: bool = True) -> dict:
if dry_run:
return {"issue": issue_num, "action": "would_close"}
api_post(f"/repos/{repo}/issues/{issue_num}/comments", token,
{"body": f"Closing stale issue: no activity for >{STALE_DAYS} days. Triage cleanup (issue #478)."})
api_patch(f"/repos/{repo}/issues/{issue_num}", token, {"state": "closed"})
return {"issue": issue_num, "action": "closed"}
def analyze_repo(repo: str, token: str, cutoff: datetime, close_stale: bool = False, apply_priority: bool = False, dry_run: bool = True) -> dict:
issues = fetch_all_open_issues(repo, token)
# Categorization
categories: Dict[str, List[dict]] = {}
for iss in issues:
cat = categorize_issue(iss)
categories.setdefault(cat, []).append({
"number": iss["number"],
"title": iss.get("title", ""),
"created": iss.get("created_at", ""),
"updated": iss.get("updated_at", ""),
"comments": iss.get("comments", 0),
})
stale = [iss for iss in issues if is_stale(iss, cutoff)]
close_results = []
priority_results = []
if apply_priority and not dry_run:
ensure_priority_labels(repo, token)
for iss in stale:
if close_stale:
close_results.append(close_stale_issue(iss["number"], repo, token, dry_run))
if apply_priority:
for iss in issues:
applied = apply_priority_label(iss, repo, token, dry_run)
if applied:
priority_results.append({"issue": iss["number"], "priority": applied})
return {
"repo": repo,
"total_open": len(issues),
"categories": {k: len(v) for k, v in categories.items()},
"category_details": categories,
"stale_count": len(stale),
"stale_issues": [{"number": i["number"], "title": i.get("title",""), "updated": i.get("updated_at","")} for i in stale],
"close_actions": close_results,
"priority_applied": priority_results,
}
def format_markdown(analyses: List[dict], dry_run: bool) -> str:
parts = ["# Issue Backlog Triage Report\n"]
for a in analyses:
parts.append(f"## {a['repo']}")
parts.append(f"**Open issues:** {a['total_open']} ")
parts.append(f"**Stale (> {STALE_DAYS}d):** {a['stale_count']} ")
parts.append("")
parts.append("### Categories")
for cat, count in sorted(a["categories"].items()):
parts.append(f"- {cat.replace('_', ' ').title()}: {count}")
if a["stale_issues"]:
parts.append("")
parts.append("### Stale Issues (candidates for closure)")
for si in a["stale_issues"][:25]:
parts.append(f"- #{si['number']}: {si['title'][:70]}")
if len(a["stale_issues"]) > 25:
parts.append(f"... and {len(a['stale_issues'])-25} more")
if a["close_actions"]:
parts.append("")
parts.append("### Close Actions")
for act in a["close_actions"][:25]:
parts.append(f"- #{act['issue']}: {act['action']}")
if len(a["close_actions"]) > 25:
parts.append(f"... and {len(a['close_actions'])-25} more")
if a["priority_applied"]:
parts.append("")
parts.append("### Priority Labels Applied")
for pa in a["priority_applied"][:25]:
parts.append(f"- #{pa['issue']}: {pa['priority']}")
if len(a["priority_applied"]) > 25:
parts.append(f"... and {len(a['priority_applied'])-25} more")
parts.append("")
mode = "DRY-RUN (no changes)" if dry_run else "LIVE (changes applied)"
parts.append(f"---\n*Mode: {mode}*")
return "\n".join(parts)
def main():
parser = argparse.ArgumentParser(description="Issue backlog triage for Gitea repos")
parser.add_argument("repo", nargs="?", help="Repo path (e.g. Timmy_Foundation/timmy-config)")
parser.add_argument("--org", action="store_true", help="Triage all repos in org")
parser.add_argument("--close-stale", action="store_true", help="Close stale issues")
parser.add_argument("--apply-priority", action="store_true", help="Apply P0/P1/P2/P3 labels")
parser.add_argument("--no-dry-run", action="store_true", help="Actually mutate state (default is dry-run)")
parser.add_argument("--json", action="store_true", help="Output as JSON")
parser.add_argument("--token", help="Gitea token override")
args = parser.parse_args()
if not args.repo and not args.org:
parser.error("Provide REPO or use --org")
token = args.token or get_token()
repos = []
if args.org:
org_repos = api_get("/orgs/Timmy_Foundation/repos", token, {"limit": "50"}) or []
repos = [r["full_name"] for r in org_repos]
else:
repos = [args.repo]
cutoff = datetime.now(timezone.utc) - timedelta(days=STALE_DAYS)
analyses = []
for repo in repos:
analyses.append(analyze_repo(repo, token, cutoff, close_stale=args.close_stale, apply_priority=args.apply_priority, dry_run=not args.no_dry_run))
if args.json:
out = analyses[0] if len(analyses) == 1 else analyses
print(json.dumps(out, indent=2, default=str))
else:
print(format_markdown(analyses, dry_run=not args.no_dry_run))
total_stale = sum(a["stale_count"] for a in analyses)
if total_stale > 0:
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,154 @@
#!/usr/bin/env python3
"""Tests for issue_backlog_triage.py — Issue #478."""
import json
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "scripts"))
from datetime import datetime, timezone, timedelta
from issue_backlog_triage import (
categorize_issue,
extract_refs,
find_duplicates,
is_stale,
STALE_DAYS,
)
class TestCategorize:
def test_training_data(self):
issue = {"title": "feat: 500 emotional weather pairs (#603)"}
assert categorize_issue(issue) == "training_data"
def test_scene_description(self):
issue = {"title": "Scene Descriptions: Jazz — 100 Lyrics→Visual"}
assert categorize_issue(issue) == "training_data"
def test_adversary(self):
issue = {"title": "Adversary: Jailbreak Generator — 1K Prompts"}
assert categorize_issue(issue) == "adversary"
def test_bug(self):
issue = {"title": "fix: broken import in cli.py"}
assert categorize_issue(issue) == "bug"
def test_feature(self):
issue = {"title": "feat: add token budget tracker"}
assert categorize_issue(issue) == "feature"
def test_docs(self):
issue = {"title": "docs: update README with new config format"}
assert categorize_issue(issue) == "docs"
def test_ops(self):
issue = {"title": "ops: deploy config to VPS"}
assert categorize_issue(issue) == "ops"
def test_security(self):
issue = {"title": "security: fix XSS in gallery panel"}
assert categorize_issue(issue) == "security"
def test_governance(self):
issue = {"title": "[AUDIT] Triage the backlog"}
assert categorize_issue(issue) == "governance"
def test_research(self):
issue = {"title": "research: investigate model drift"}
assert categorize_issue(issue) == "research"
def test_epic(self):
issue = {"title": "[EPIC] Contraction sweep across all repos"}
assert categorize_issue(issue) == "epic"
def test_other(self):
issue = {"title": "chore: cleanup whitespace"}
assert categorize_issue(issue) == "other"
def test_case_insensitive(self):
issue = {"title": "FIX: resolve import error"}
assert categorize_issue(issue) == "bug"
def test_empty_title(self):
issue = {"title": ""}
assert categorize_issue(issue) == "other"
def test_none_title(self):
issue = {}
assert categorize_issue(issue) == "other"
class TestExtractRefs:
def test_single_ref(self):
issue = {"title": "Fix #123", "body": "Closes #123"}
assert extract_refs(issue) == [123]
def test_multiple_refs(self):
issue = {"title": "Fix #123", "body": "Related to #456 and #789"}
assert extract_refs(issue) == [123, 456, 789]
def test_deduplication(self):
issue = {"title": "#100", "body": "Fixes #100"}
assert extract_refs(issue) == [100]
def test_no_refs(self):
issue = {"title": "No issue here", "body": "Just an issue"}
assert extract_refs(issue) == []
def test_empty_body(self):
issue = {"title": "Fix #42", "body": None}
assert extract_refs(issue) == [42]
def test_numeric_like_text_not_refs(self):
issue = {"title": "Version 2.0 release", "body": "See build #1234"}
assert extract_refs(issue) == [1234]
class TestFindDuplicates:
def test_no_duplicates(self):
issues = [{"number": 1, "title": "Fix #10", "body": ""},
{"number": 2, "title": "Fix #11", "body": ""}]
assert find_duplicates(issues) == {}
def test_duplicates_found(self):
issues = [{"number": 1, "title": "Fix #10", "body": ""},
{"number": 2, "title": "Also fix #10", "body": ""}]
dupes = find_duplicates(issues)
assert 10 in dupes
assert dupes[10] == [1, 2]
def test_triple_duplicate(self):
issues = [{"number": 1, "title": "#42", "body": ""},
{"number": 2, "title": "#42", "body": ""},
{"number": 3, "title": "#42", "body": ""}]
dupes = find_duplicates(issues)
assert len(dupes[42]) == 3
def test_partial_overlap(self):
issues = [{"number": 1, "title": "#10 #20", "body": ""},
{"number": 2, "title": "#10", "body": ""}]
dupes = find_duplicates(issues)
assert 10 in dupes
assert 20 not in dupes
class TestIsStale:
def test_fresh_issue(self):
now = datetime.now(timezone.utc)
issue = {
"number": 1,
"title": "Fresh",
"updated_at": now.isoformat(),
"created_at": now.isoformat(),
}
assert not is_stale(issue, now - timedelta(days=STALE_DAYS))
def test_old_issue(self):
old = datetime.now(timezone.utc) - timedelta(days=STALE_DAYS + 1)
issue = {
"number": 2,
"title": "Old",
"updated_at": old.isoformat(),
"created_at": old.isoformat(),
}
assert is_stale(issue, datetime.now(timezone.utc) - timedelta(days=STALE_DAYS))