Files
the-nexus/bin/backlog_manager.py
Alexander Whitestone 49d9d43d13
Some checks failed
CI / test (pull_request) Failing after 1m5s
CI / validate (pull_request) Failing after 1m3s
Review Approval Gate / verify-review (pull_request) Failing after 7s
fix: #1459
- Add backlog manager tool for timmy-home
- Add documentation for backlog management
- Tools for analyzing, triaging, and cleaning up issues

Addresses issue #1459: process: Address timmy-home backlog (220 open issues)

Features:
1. Analyze backlog - Get statistics and insights
2. Generate reports - Detailed markdown reports
3. Bulk operations - Add labels, assign issues, close stale issues
4. Triage support - Identify unlabeled/unassigned issues

Usage:
- python bin/backlog_manager.py --analyze
- python bin/backlog_manager.py --report
- python bin/backlog_manager.py --add-label 123 bug
- python bin/backlog_manager.py --assign 123 @username
- python bin/backlog_manager.py --bulk-close-stale 90

Files added:
- bin/backlog_manager.py: Backlog management tool
- docs/backlog-manager.md: Documentation
2026-04-15 01:21:23 -04:00

354 lines
15 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Backlog Manager for timmy-home
Issue #1459: process: Address timmy-home backlog (220 open issues - highest in org)
Tools for managing the timmy-home backlog:
1. Triage issues (assign labels, assignees)
2. Identify stale issues
3. Generate reports
4. Bulk operations
"""
import json
import os
import sys
import urllib.request
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
# Configuration
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
TOKEN_PATH = os.path.expanduser("~/.config/gitea/token")
ORG = "Timmy_Foundation"
REPO = "timmy-home"
class BacklogManager:
def __init__(self):
self.token = self._load_token()
def _load_token(self) -> str:
"""Load Gitea API token."""
try:
with open(TOKEN_PATH, "r") as f:
return f.read().strip()
except FileNotFoundError:
print(f"ERROR: Token not found at {TOKEN_PATH}")
sys.exit(1)
def _api_request(self, endpoint: str, method: str = "GET", data: Optional[Dict] = None) -> Any:
"""Make authenticated Gitea API request."""
url = f"{GITEA_BASE}{endpoint}"
headers = {
"Authorization": f"token {self.token}",
"Content-Type": "application/json"
}
req = urllib.request.Request(url, headers=headers, method=method)
if data:
req.data = json.dumps(data).encode()
try:
with urllib.request.urlopen(req) as resp:
if resp.status == 204: # No content
return {"status": "success", "code": resp.status}
return json.loads(resp.read())
except urllib.error.HTTPError as e:
error_body = e.read().decode() if e.fp else "No error body"
print(f"API Error {e.code}: {error_body}")
return {"error": e.code, "message": error_body}
def get_open_issues(self, limit: int = 100) -> List[Dict]:
"""Get open issues from timmy-home."""
endpoint = f"/repos/{ORG}/{REPO}/issues?state=open&limit={limit}"
issues = self._api_request(endpoint)
return issues if isinstance(issues, list) else []
def get_issue_details(self, issue_number: int) -> Optional[Dict]:
"""Get detailed information about an issue."""
endpoint = f"/repos/{ORG}/{REPO}/issues/{issue_number}"
return self._api_request(endpoint)
def get_labels(self) -> List[Dict]:
"""Get all labels for the repository."""
endpoint = f"/repos/{ORG}/{REPO}/labels"
labels = self._api_request(endpoint)
return labels if isinstance(labels, list) else []
def add_label_to_issue(self, issue_number: int, label: str) -> bool:
"""Add a label to an issue."""
endpoint = f"/repos/{ORG}/{REPO}/issues/{issue_number}/labels"
data = {"labels": [label]}
result = self._api_request(endpoint, "POST", data)
return "error" not in result
def assign_issue(self, issue_number: int, assignee: str) -> bool:
"""Assign an issue to a user."""
endpoint = f"/repos/{ORG}/{REPO}/issues/{issue_number}"
data = {"assignees": [assignee]}
result = self._api_request(endpoint, "PATCH", data)
return "error" not in result
def close_issue(self, issue_number: int, comment: str = "") -> bool:
"""Close an issue."""
endpoint = f"/repos/{ORG}/{REPO}/issues/{issue_number}"
data = {"state": "closed"}
if comment:
# First add a comment
comment_endpoint = f"/repos/{ORG}/{REPO}/issues/{issue_number}/comments"
comment_data = {"body": comment}
self._api_request(comment_endpoint, "POST", comment_data)
result = self._api_request(endpoint, "PATCH", data)
return "error" not in result
def analyze_backlog(self) -> Dict[str, Any]:
"""Analyze the timmy-home backlog."""
print("Analyzing timmy-home backlog...")
# Get all open issues
issues = self.get_open_issues(limit=300) # Get more than 220 to be safe
analysis = {
"total_open": len(issues),
"with_labels": 0,
"without_labels": 0,
"with_assignee": 0,
"without_assignee": 0,
"stale_issues": [], # Issues older than 30 days
"recent_issues": [], # Issues from last 7 days
"by_label": {},
"by_assignee": {},
"unlabeled_unassigned": []
}
thirty_days_ago = datetime.now() - timedelta(days=30)
seven_days_ago = datetime.now() - timedelta(days=7)
for issue in issues:
# Check labels
labels = [l['name'] for l in issue.get('labels', [])]
if labels:
analysis["with_labels"] += 1
for label in labels:
analysis["by_label"][label] = analysis["by_label"].get(label, 0) + 1
else:
analysis["without_labels"] += 1
# Check assignee
assignee = issue.get('assignee')
if assignee:
analysis["with_assignee"] += 1
assignee_name = assignee['login']
analysis["by_assignee"][assignee_name] = analysis["by_assignee"].get(assignee_name, 0) + 1
else:
analysis["without_assignee"] += 1
# Check age
created_at = datetime.fromisoformat(issue['created_at'].replace('Z', '+00:00'))
if created_at < thirty_days_ago:
analysis["stale_issues"].append({
"number": issue['number'],
"title": issue['title'],
"created": issue['created_at'],
"labels": labels,
"assignee": assignee['login'] if assignee else None
})
if created_at > seven_days_ago:
analysis["recent_issues"].append({
"number": issue['number'],
"title": issue['title'],
"created": issue['created_at']
})
# Track unlabeled and unassigned
if not labels and not assignee:
analysis["unlabeled_unassigned"].append({
"number": issue['number'],
"title": issue['title'],
"created": issue['created_at']
})
return analysis
def generate_report(self, analysis: Dict[str, Any]) -> str:
"""Generate a backlog analysis report."""
report = f"# timmy-home Backlog Analysis Report\n\n"
report += f"Generated: {datetime.now().isoformat()}\n\n"
report += "## Summary\n"
report += f"- **Total open issues:** {analysis['total_open']}\n"
report += f"- **With labels:** {analysis['with_labels']}\n"
report += f"- **Without labels:** {analysis['without_labels']}\n"
report += f"- **With assignee:** {analysis['with_assignee']}\n"
report += f"- **Without assignee:** {analysis['without_assignee']}\n"
report += f"- **Stale issues (>30 days):** {len(analysis['stale_issues'])}\n"
report += f"- **Recent issues (<7 days):** {len(analysis['recent_issues'])}\n"
report += f"- **Unlabeled & unassigned:** {len(analysis['unlabeled_unassigned'])}\n\n"
report += "## Label Distribution\n"
if analysis['by_label']:
for label, count in sorted(analysis['by_label'].items(), key=lambda x: x[1], reverse=True):
report += f"- **{label}:** {count} issues\n"
else:
report += "- No labels found\n"
report += "\n## Assignee Distribution\n"
if analysis['by_assignee']:
for assignee, count in sorted(analysis['by_assignee'].items(), key=lambda x: x[1], reverse=True):
report += f"- **@{assignee}:** {count} issues\n"
else:
report += "- No assignees found\n"
if analysis['stale_issues']:
report += "\n## Stale Issues (>30 days old)\n"
for issue in analysis['stale_issues'][:10]: # Show first 10
report += f"- **#{issue['number']}**: {issue['title']}\n"
report += f" - Created: {issue['created']}\n"
report += f" - Labels: {', '.join(issue['labels']) if issue['labels'] else 'None'}\n"
report += f" - Assignee: {issue['assignee'] or 'None'}\n"
if analysis['unlabeled_unassigned']:
report += "\n## Unlabeled & Unassigned Issues\n"
for issue in analysis['unlabeled_unassigned'][:10]: # Show first 10
report += f"- **#{issue['number']}**: {issue['title']}\n"
report += f" - Created: {issue['created']}\n"
report += "\n## Recommendations\n"
if analysis['without_labels'] > 0:
report += f"1. **Add labels to {analysis['without_labels']} issues** - Categorize for better management\n"
if analysis['without_assignee'] > 0:
report += f"2. **Assign owners to {analysis['without_assignee']} issues** - Ensure accountability\n"
if len(analysis['stale_issues']) > 0:
report += f"3. **Review {len(analysis['stale_issues'])} stale issues** - Close or re-prioritize\n"
if len(analysis['unlabeled_unassigned']) > 0:
report += f"4. **Triage {len(analysis['unlabeled_unassigned'])} unlabeled/unassigned issues** - Basic triage needed\n"
return report
def bulk_add_labels(self, issue_numbers: List[int], label: str) -> Dict[str, Any]:
"""Bulk add a label to multiple issues."""
results = {"success": [], "failed": []}
for issue_number in issue_numbers:
if self.add_label_to_issue(issue_number, label):
results["success"].append(issue_number)
else:
results["failed"].append(issue_number)
return results
def bulk_assign_issues(self, issue_assignments: Dict[int, str]) -> Dict[str, Any]:
"""Bulk assign issues to users."""
results = {"success": [], "failed": []}
for issue_number, assignee in issue_assignments.items():
if self.assign_issue(issue_number, assignee):
results["success"].append(issue_number)
else:
results["failed"].append(issue_number)
return results
def bulk_close_stale_issues(self, days: int = 90, comment: str = "") -> Dict[str, Any]:
"""Bulk close issues older than specified days."""
issues = self.get_open_issues(limit=300)
cutoff_date = datetime.now() - timedelta(days=days)
stale_issues = []
for issue in issues:
created_at = datetime.fromisoformat(issue['created_at'].replace('Z', '+00:00'))
if created_at < cutoff_date:
stale_issues.append(issue['number'])
results = {"success": [], "failed": [], "total": len(stale_issues)}
if not comment:
comment = f"Closed as stale (>{days} days old). Reopen if still relevant."
for issue_number in stale_issues:
if self.close_issue(issue_number, comment):
results["success"].append(issue_number)
else:
results["failed"].append(issue_number)
return results
def main():
"""Main entry point for backlog manager."""
import argparse
parser = argparse.ArgumentParser(description="timmy-home Backlog Manager")
parser.add_argument("--analyze", action="store_true", help="Analyze backlog")
parser.add_argument("--report", action="store_true", help="Generate report")
parser.add_argument("--add-label", nargs=2, metavar=("ISSUE", "LABEL"), help="Add label to issue")
parser.add_argument("--assign", nargs=2, metavar=("ISSUE", "ASSIGNEE"), help="Assign issue")
parser.add_argument("--close", nargs=1, metavar=("ISSUE",), help="Close issue")
parser.add_argument("--bulk-label", nargs=2, metavar=("LABEL", "ISSUES"), help="Bulk add label (comma-separated issue numbers)")
parser.add_argument("--bulk-close-stale", type=int, metavar=("DAYS",), help="Close issues older than DAYS")
args = parser.parse_args()
manager = BacklogManager()
if args.analyze or args.report:
analysis = manager.analyze_backlog()
if args.report:
report = manager.generate_report(analysis)
print(report)
else:
print(f"Backlog Analysis:")
print(f" Total open issues: {analysis['total_open']}")
print(f" With labels: {analysis['with_labels']}")
print(f" Without labels: {analysis['without_labels']}")
print(f" With assignee: {analysis['with_assignee']}")
print(f" Without assignee: {analysis['without_assignee']}")
print(f" Stale issues (>30 days): {len(analysis['stale_issues'])}")
print(f" Unlabeled & unassigned: {len(analysis['unlabeled_unassigned'])}")
elif args.add_label:
issue_number, label = args.add_label
if manager.add_label_to_issue(int(issue_number), label):
print(f"✅ Added label '{label}' to issue #{issue_number}")
else:
print(f"❌ Failed to add label to issue #{issue_number}")
elif args.assign:
issue_number, assignee = args.assign
if manager.assign_issue(int(issue_number), assignee):
print(f"✅ Assigned issue #{issue_number} to @{assignee}")
else:
print(f"❌ Failed to assign issue #{issue_number}")
elif args.close:
issue_number = args.close[0]
if manager.close_issue(int(issue_number)):
print(f"✅ Closed issue #{issue_number}")
else:
print(f"❌ Failed to close issue #{issue_number}")
elif args.bulk_label:
label, issues_str = args.bulk_label
issue_numbers = [int(n.strip()) for n in issues_str.split(",")]
results = manager.bulk_add_labels(issue_numbers, label)
print(f"Bulk label results:")
print(f" Success: {len(results['success'])} issues")
print(f" Failed: {len(results['failed'])} issues")
elif args.bulk_close_stale:
days = args.bulk_close_stale
results = manager.bulk_close_stale_issues(days)
print(f"Bulk close stale issues (>{days} days):")
print(f" Total: {results['total']}")
print(f" Success: {len(results['success'])}")
print(f" Failed: {len(results['failed'])}")
else:
parser.print_help()
if __name__ == "__main__":
main()