Migrates hardcoded ~/.timmy, ~/.config, and Path.home() references across the autonomous infrastructure stack to use environment variables with sensible defaults: - scripts/autonomous_issue_creator.py: - DEFAULT_TOKEN_FILE → XDG_CONFIG_HOME fallback - DEFAULT_FAILOVER_STATUS → TIMMY_HOME fallback - scripts/failover_monitor.py: - STATUS_FILE → TIMMY_HOME fallback - scripts/dynamic_dispatch_optimizer.py: - STATUS_FILE, SPEC_FILE, OUTPUT_FILE → TIMMY_HOME fallback - scripts/backlog_cleanup.py: - token path → XDG_CONFIG_HOME fallback - scripts/backlog_triage.py: - TOKEN_PATH → XDG_CONFIG_HOME fallback - scripts/burn_lane_issue_audit.py: - DEFAULT_TOKEN_PATH → XDG_CONFIG_HOME fallback - scripts/cross-repo-qa.py: - GITEA_TOKEN_PATH → XDG_CONFIG_HOME fallback This makes the Phase-6 buildings (self-healing fleet, autonomous issue creation, community pipeline, global mesh) portable across different user accounts and deployment environments.
254 lines
9.8 KiB
Python
Executable File
254 lines
9.8 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Weekly Backlog Triage for timmy-home
|
|
Issue #685: [OPS] timmy-home backlog reduced from 220 to 50 — triage cadence needed
|
|
|
|
Run this script weekly to maintain backlog visibility.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import urllib.request
|
|
from datetime import datetime, timedelta
|
|
from typing import Any, Dict, List
|
|
|
|
# Configuration
|
|
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
|
|
TOKEN_PATH = os.path.join(os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")), "gitea", "token")
|
|
ORG = "Timmy_Foundation"
|
|
REPO = "timmy-home"
|
|
|
|
|
|
class BacklogTriage:
|
|
"""Weekly backlog triage for timmy-home."""
|
|
|
|
def __init__(self):
|
|
self.token = self._load_token()
|
|
|
|
def _load_token(self) -> str:
|
|
"""Load Gitea API token."""
|
|
try:
|
|
with open(TOKEN_PATH, "r") as f:
|
|
return f.read().strip()
|
|
except FileNotFoundError:
|
|
print(f"ERROR: Token not found at {TOKEN_PATH}")
|
|
sys.exit(1)
|
|
|
|
def _api_request(self, endpoint: str) -> Any:
|
|
"""Make authenticated Gitea API request."""
|
|
url = f"{GITEA_BASE}{endpoint}"
|
|
headers = {"Authorization": f"token {self.token}"}
|
|
|
|
req = urllib.request.Request(url, headers=headers)
|
|
|
|
try:
|
|
with urllib.request.urlopen(req) as resp:
|
|
return json.loads(resp.read())
|
|
except urllib.error.HTTPError as e:
|
|
if e.code == 404:
|
|
return None
|
|
error_body = e.read().decode() if e.fp else "No error body"
|
|
print(f"API Error {e.code}: {error_body}")
|
|
return None
|
|
|
|
def get_open_issues(self) -> List[Dict]:
|
|
"""Get all open issues."""
|
|
endpoint = f"/repos/{ORG}/{REPO}/issues?state=open&limit=200"
|
|
issues = self._api_request(endpoint)
|
|
return issues if isinstance(issues, list) else []
|
|
|
|
def analyze_backlog(self, issues: List[Dict]) -> Dict[str, Any]:
|
|
"""Analyze the backlog."""
|
|
analysis = {
|
|
"total_open": len(issues),
|
|
"unassigned": 0,
|
|
"unlabeled": 0,
|
|
"batch_pipeline": 0,
|
|
"by_label": {},
|
|
"by_assignee": {},
|
|
"by_age": {
|
|
"0-7_days": 0,
|
|
"8-30_days": 0,
|
|
"31-90_days": 0,
|
|
"90+_days": 0
|
|
},
|
|
"stale_issues": [],
|
|
"unassigned_unlabeled": []
|
|
}
|
|
|
|
cutoff_date = datetime.now() - timedelta(days=30)
|
|
|
|
for issue in issues:
|
|
# Skip PRs
|
|
if 'pull_request' in issue:
|
|
continue
|
|
|
|
# Check assignment
|
|
if not issue.get('assignee'):
|
|
analysis["unassigned"] += 1
|
|
|
|
# Check labels
|
|
labels = [l['name'] for l in issue.get('labels', [])]
|
|
if not labels:
|
|
analysis["unlabeled"] += 1
|
|
else:
|
|
for label in labels:
|
|
analysis["by_label"][label] = analysis["by_label"].get(label, 0) + 1
|
|
|
|
# Check assignee
|
|
assignee = issue.get('assignee')
|
|
if assignee:
|
|
assignee_name = assignee['login']
|
|
analysis["by_assignee"][assignee_name] = analysis["by_assignee"].get(assignee_name, 0) + 1
|
|
|
|
# Check if batch-pipeline issue
|
|
if 'batch-pipeline' in labels:
|
|
analysis["batch_pipeline"] += 1
|
|
|
|
# Check age
|
|
created_at = datetime.fromisoformat(issue['created_at'].replace('Z', '+00:00'))
|
|
age_days = (datetime.now() - created_at).days
|
|
|
|
if age_days <= 7:
|
|
analysis["by_age"]["0-7_days"] += 1
|
|
elif age_days <= 30:
|
|
analysis["by_age"]["8-30_days"] += 1
|
|
elif age_days <= 90:
|
|
analysis["by_age"]["31-90_days"] += 1
|
|
else:
|
|
analysis["by_age"]["90+_days"] += 1
|
|
|
|
# Check if stale (>30 days old and no labels/assignee)
|
|
if age_days > 30 and not labels and not issue.get('assignee'):
|
|
analysis["stale_issues"].append({
|
|
"number": issue['number'],
|
|
"title": issue['title'],
|
|
"created": issue['created_at'],
|
|
"age_days": age_days
|
|
})
|
|
|
|
# Check if unassigned and unlabeled
|
|
if not issue.get('assignee') and not labels:
|
|
analysis["unassigned_unlabeled"].append({
|
|
"number": issue['number'],
|
|
"title": issue['title'],
|
|
"created": issue['created_at']
|
|
})
|
|
|
|
return analysis
|
|
|
|
def generate_report(self, analysis: Dict[str, Any]) -> str:
|
|
"""Generate a triage report."""
|
|
report = f"# timmy-home Weekly Backlog Triage\n\n"
|
|
report += f"Generated: {datetime.now().isoformat()}\n\n"
|
|
|
|
report += "## Summary\n"
|
|
report += f"- **Total open issues:** {analysis['total_open']}\n"
|
|
report += f"- **Unassigned:** {analysis['unassigned']}\n"
|
|
report += f"- **Unlabeled:** {analysis['unlabeled']}\n"
|
|
report += f"- **Batch-pipeline issues:** {analysis['batch_pipeline']}\n"
|
|
report += f"- **Stale issues (>30 days, no labels/assignee):** {len(analysis['stale_issues'])}\n"
|
|
report += f"- **Unassigned + Unlabeled:** {len(analysis['unassigned_unlabeled'])}\n\n"
|
|
|
|
report += "## Age Distribution\n"
|
|
for age_range, count in analysis['by_age'].items():
|
|
report += f"- **{age_range}:** {count} issues\n"
|
|
|
|
report += "\n## Label Distribution\n"
|
|
if analysis['by_label']:
|
|
for label, count in sorted(analysis['by_label'].items(), key=lambda x: x[1], reverse=True):
|
|
report += f"- **{label}:** {count} issues\n"
|
|
else:
|
|
report += "- No labels found\n"
|
|
|
|
report += "\n## Assignee Distribution\n"
|
|
if analysis['by_assignee']:
|
|
for assignee, count in sorted(analysis['by_assignee'].items(), key=lambda x: x[1], reverse=True):
|
|
report += f"- **@{assignee}:** {count} issues\n"
|
|
else:
|
|
report += "- No assignees found\n"
|
|
|
|
if analysis['stale_issues']:
|
|
report += "\n## Stale Issues (>30 days, no labels/assignee)\n"
|
|
report += "These issues should be triaged or closed:\n"
|
|
for issue in analysis['stale_issues'][:10]: # Show first 10
|
|
report += f"- **#{issue['number']}**: {issue['title']}\n"
|
|
report += f" - Age: {issue['age_days']} days\n"
|
|
report += f" - Created: {issue['created']}\n"
|
|
|
|
if analysis['unassigned_unlabeled']:
|
|
report += "\n## Unassigned + Unlabeled Issues\n"
|
|
report += "These issues need labels and/or assignees:\n"
|
|
for issue in analysis['unassigned_unlabeled'][:10]: # Show first 10
|
|
report += f"- **#{issue['number']}**: {issue['title']}\n"
|
|
report += f" - Created: {issue['created']}\n"
|
|
|
|
report += "\n## Recommendations\n"
|
|
if analysis['unassigned'] > 0:
|
|
report += f"1. **Assign owners to {analysis['unassigned']} issues** - Ensure accountability\n"
|
|
if analysis['unlabeled'] > 0:
|
|
report += f"2. **Add labels to {analysis['unlabeled']} issues** - Categorize for management\n"
|
|
if len(analysis['stale_issues']) > 0:
|
|
report += f"3. **Triage {len(analysis['stale_issues'])} stale issues** - Close or re-prioritize\n"
|
|
if len(analysis['unassigned_unlabeled']) > 0:
|
|
report += f"4. **Address {len(analysis['unassigned_unlabeled'])} unassigned/unlabeled issues** - Basic triage needed\n"
|
|
|
|
return report
|
|
|
|
def generate_cron_entry(self) -> str:
|
|
"""Generate cron entry for weekly triage."""
|
|
cron_entry = """# Weekly timmy-home backlog triage
|
|
# Run every Monday at 9:00 AM
|
|
0 9 * * 1 cd /path/to/timmy-home && python3 scripts/backlog_triage.py --report > /var/log/timmy-home-triage-$(date +\\%Y\\%m\\%d).log 2>&1
|
|
|
|
# Or run directly:
|
|
# python3 scripts/backlog_triage.py --report"""
|
|
|
|
return cron_entry
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Weekly Backlog Triage for timmy-home")
|
|
parser.add_argument("--analyze", action="store_true", help="Analyze backlog")
|
|
parser.add_argument("--report", action="store_true", help="Generate report")
|
|
parser.add_argument("--cron", action="store_true", help="Generate cron entry")
|
|
parser.add_argument("--json", action="store_true", help="Output JSON")
|
|
|
|
args = parser.parse_args()
|
|
|
|
triage = BacklogTriage()
|
|
|
|
if args.analyze or args.report or args.json:
|
|
issues = triage.get_open_issues()
|
|
analysis = triage.analyze_backlog(issues)
|
|
|
|
if args.json:
|
|
print(json.dumps(analysis, indent=2))
|
|
elif args.report:
|
|
report = triage.generate_report(analysis)
|
|
print(report)
|
|
else:
|
|
# Default: show summary
|
|
print(f"timmy-home Backlog Analysis:")
|
|
print(f" Total open issues: {analysis['total_open']}")
|
|
print(f" Unassigned: {analysis['unassigned']}")
|
|
print(f" Unlabeled: {analysis['unlabeled']}")
|
|
print(f" Batch-pipeline: {analysis['batch_pipeline']}")
|
|
print(f" Stale issues: {len(analysis['stale_issues'])}")
|
|
print(f" Unassigned + Unlabeled: {len(analysis['unassigned_unlabeled'])}")
|
|
|
|
elif args.cron:
|
|
# Generate cron entry
|
|
cron_entry = triage.generate_cron_entry()
|
|
print(cron_entry)
|
|
|
|
else:
|
|
parser.print_help()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |