#!/usr/bin/env python3 """ Priority Rebalancer — Re-evaluate issue priorities based on accumulated data. Reads pipeline outputs, knowledge store, and Gitea issues to suggest priority changes based on what the fleet has learned. Usage: python3 scripts/priority_rebalancer.py --org Timmy_Foundation python3 scripts/priority_rebalancer.py --org Timmy_Foundation --repo compounding-intelligence python3 scripts/priority_rebalancer.py --org Timmy_Foundation --dry-run python3 scripts/priority_rebalancer.py --org Timmy_Foundation --apply Output: metrics/priority_report.json — full analysis metrics/priority_suggestions.md — human-readable suggestions """ import argparse import json import os import sys from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Dict, List, Any, Optional, Tuple from dataclasses import dataclass, field, asdict from collections import Counter, defaultdict import urllib.request import urllib.error # ============================================================ # Data Models # ============================================================ @dataclass class IssueScore: issue_id: int repo: str title: str current_labels: List[str] current_priority: Optional[str] suggested_priority: Optional[str] score: float reasons: List[str] age_days: int comment_count: int assignee: Optional[str] dependencies: List[str] = field(default_factory=list) blocking: List[str] = field(default_factory=list) @dataclass class PipelineSignal: source: str # "knowledge", "metrics", "sessions", "staleness" signal_type: str # "stale_knowledge", "high_error_rate", "missing_coverage", etc. weight: float # 0.0 - 1.0 detail: str affected_repos: List[str] = field(default_factory=list) affected_issues: List[int] = field(default_factory=list) # ============================================================ # Gitea API Client # ============================================================ class GiteaClient: def __init__(self, base_url: str, token: str): self.base_url = base_url.rstrip("/") self.token = token def _request(self, path: str, params: Dict = None) -> Any: url = f"{self.base_url}/api/v1{path}" if params: qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None) url += f"?{qs}" req = urllib.request.Request(url) req.add_header("Authorization", f"token {self.token}") req.add_header("Content-Type", "application/json") try: with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) except urllib.error.HTTPError as e: print(f"API error {e.code} for {path}: {e.read().decode()[:200]}", file=sys.stderr) return None def get_org_repos(self, org: str) -> List[Dict]: repos = [] page = 1 while True: batch = self._request(f"/orgs/{org}/repos", {"limit": 50, "page": page}) if not batch: break repos.extend(batch) if len(batch) < 50: break page += 1 return repos def get_issues(self, org: str, repo: str, state: str = "open") -> List[Dict]: issues = [] page = 1 while True: batch = self._request(f"/repos/{org}/{repo}/issues", {"state": state, "limit": 50, "page": page, "type": "issues"}) if not batch: break issues.extend(batch) if len(batch) < 50: break page += 1 return issues def add_label_to_issue(self, org: str, repo: str, issue_num: int, label_ids: List[int]) -> bool: url = f"{self.base_url}/api/v1/repos/{org}/{repo}/issues/{issue_num}/labels" data = json.dumps({"labels": label_ids}).encode() req = urllib.request.Request(url, data=data, method="POST") req.add_header("Authorization", f"token {self.token}") req.add_header("Content-Type", "application/json") try: with urllib.request.urlopen(req, timeout=15) as resp: return resp.status == 200 except Exception: return False def remove_label_from_issue(self, org: str, repo: str, issue_num: int, label_id: int) -> bool: url = f"{self.base_url}/api/v1/repos/{org}/{repo}/issues/{issue_num}/labels/{label_id}" req = urllib.request.Request(url, method="DELETE") req.add_header("Authorization", f"token {self.token}") try: with urllib.request.urlopen(req, timeout=15) as resp: return resp.status == 200 except Exception: return False def get_repo_labels(self, org: str, repo: str) -> List[Dict]: labels = [] page = 1 while True: batch = self._request(f"/repos/{org}/{repo}/labels", {"limit": 50, "page": page}) if not batch: break labels.extend(batch) if len(batch) < 50: break page += 1 return labels def add_comment(self, org: str, repo: str, issue_num: int, body: str) -> bool: url = f"{self.base_url}/api/v1/repos/{org}/{repo}/issues/{issue_num}/comments" data = json.dumps({"body": body}).encode() req = urllib.request.Request(url, data=data, method="POST") req.add_header("Authorization", f"token {self.token}") req.add_header("Content-Type", "application/json") try: with urllib.request.urlopen(req, timeout=15) as resp: return resp.status == 201 except Exception: return False # ============================================================ # Pipeline Signal Collectors # ============================================================ def collect_knowledge_signals(knowledge_dir: str) -> List[PipelineSignal]: """Analyze knowledge store for coverage gaps and staleness.""" signals = [] index_path = os.path.join(knowledge_dir, "index.json") if not os.path.exists(index_path): signals.append(PipelineSignal( source="knowledge", signal_type="missing_index", weight=0.8, detail="knowledge/index.json not found — no knowledge base exists" )) return signals try: with open(index_path) as f: data = json.load(f) except (json.JSONDecodeError, IOError) as e: signals.append(PipelineSignal( source="knowledge", signal_type="corrupt_index", weight=0.9, detail=f"knowledge/index.json is corrupt: {e}" )) return signals facts = data.get("facts", []) total = len(facts) if total == 0: signals.append(PipelineSignal( source="knowledge", signal_type="empty_knowledge", weight=0.7, detail="Knowledge store has 0 facts — harvester not running or not finding sessions" )) return signals # Check staleness stale_count = 0 missing_source = 0 for fact in facts: status = fact.get("status", "unknown") if status == "stale": stale_count += 1 elif status in ("missing_source", "no_source"): missing_source += 1 if stale_count > 0: signals.append(PipelineSignal( source="knowledge", signal_type="stale_knowledge", weight=min(1.0, stale_count / max(1, total)), detail=f"{stale_count}/{total} facts are stale (source files changed)" )) if missing_source > 0: signals.append(PipelineSignal( source="knowledge", signal_type="missing_sources", weight=min(1.0, missing_source / max(1, total)), detail=f"{missing_source}/{total} facts have missing source files" )) # Coverage by repo repo_counts = Counter(f.get("repo", "unknown") for f in facts) if len(repo_counts) < 3: signals.append(PipelineSignal( source="knowledge", signal_type="low_coverage", weight=0.5, detail=f"Knowledge covers only {len(repo_counts)} repos — expand harvester scope", affected_repos=list(repo_counts.keys()) )) return signals def collect_staleness_signals(scripts_dir: str, knowledge_dir: str) -> List[PipelineSignal]: """Run staleness checker if available.""" signals = [] checker = os.path.join(scripts_dir, "knowledge_staleness_check.py") index_path = os.path.join(knowledge_dir, "index.json") if not os.path.exists(checker) or not os.path.exists(index_path): return signals try: import subprocess result = subprocess.run( ["python3", checker, "--index", index_path, "--json"], capture_output=True, text=True, timeout=30 ) if result.returncode == 0: data = json.loads(result.stdout) stale = data.get("stale_count", 0) total = data.get("total", 0) if stale > 0: signals.append(PipelineSignal( source="staleness", signal_type="stale_knowledge", weight=min(1.0, stale / max(1, total)), detail=f"Staleness checker found {stale}/{total} stale entries" )) except Exception: pass return signals def collect_metrics_signals(metrics_dir: str) -> List[PipelineSignal]: """Analyze metrics directory for pipeline health.""" signals = [] if not os.path.isdir(metrics_dir): return signals files = os.listdir(metrics_dir) if len(files) <= 1: # Only .gitkeep signals.append(PipelineSignal( source="metrics", signal_type="no_metrics", weight=0.6, detail="Metrics directory is empty — measurer pipeline not producing output" )) return signals # ============================================================ # Priority Scoring Engine # ============================================================ PRIORITY_LEVELS = ["P0", "P1", "P2", "P3", "none"] PRIORITY_LABELS = {"p0": "P0", "p1": "P1", "p2": "P2", "p3": "P3", "priority:critical": "P0", "priority:high": "P1", "priority:medium": "P2", "priority:low": "P3"} def extract_priority(labels: List[str]) -> Optional[str]: """Extract priority level from issue labels.""" for label in labels: lower = label.lower().strip() if lower in PRIORITY_LABELS: return PRIORITY_LABELS[lower] return None def compute_issue_score( issue: Dict, repo: str, signals: List[PipelineSignal], now: datetime ) -> IssueScore: """Compute priority score for a single issue.""" labels = [l.get("name", "") if isinstance(l, dict) else l for l in issue.get("labels", [])] title = issue.get("title", "") issue_id = issue.get("number", 0) current_priority = extract_priority(labels) # Parse dates created_str = issue.get("created_at", "") try: created = datetime.fromisoformat(created_str.replace("Z", "+00:00")) age_days = (now - created.replace(tzinfo=None)).days except (ValueError, AttributeError): age_days = 0 comment_count = issue.get("comments", 0) assignee = None assignees = issue.get("assignees") or [] if assignees: assignee = assignees[0].get("login") if isinstance(assignees[0], dict) else str(assignees[0]) # Base score score = 0.0 reasons = [] # Age factor: older issues drift down unless actively discussed if age_days > 90 and comment_count < 2: score -= 15 reasons.append(f"Dormant: {age_days} days old with only {comment_count} comments") elif age_days > 30: score -= 5 reasons.append(f"Aging: {age_days} days old") # Activity factor: recent discussion suggests urgency if comment_count > 5: score += 10 reasons.append(f"Active discussion: {comment_count} comments") elif comment_count > 0: score += 3 # Assignment factor: unassigned issues need triage if not assignee: score += 5 reasons.append("Unassigned — needs triage") # Pipeline signal alignment for signal in signals: title_lower = title.lower() if signal.signal_type == "stale_knowledge" and "stale" in title_lower: score += signal.weight * 20 reasons.append(f"Matches signal: {signal.detail}") elif signal.signal_type == "empty_knowledge" and ("harvester" in title_lower or "knowledge" in title_lower): score += signal.weight * 25 reasons.append(f"Critical gap: {signal.detail}") elif signal.signal_type == "no_metrics" and "measur" in title_lower: score += signal.weight * 15 reasons.append(f"Pipeline gap: {signal.detail}") elif signal.signal_type == "low_coverage" and any(r.lower() in title_lower for r in signal.affected_repos): score += signal.weight * 10 reasons.append(f"Coverage gap: {signal.detail}") # Keyword boosts keyword_scores = { "broken": 20, "bug": 15, "fix": 10, "error": 12, "fail": 15, "security": 25, "auth": 20, "data loss": 30, "crash": 25, "blocker": 20, "urgent": 15, "critical": 15, "epic": 8, "feature": -3, "nice to have": -10, "someday": -15 } title_lower = title.lower() for keyword, boost in keyword_scores.items(): if keyword in title_lower: score += boost if boost > 0: reasons.append(f"Keyword match: '{keyword}' (+{boost})") # Label-based adjustments for label in labels: lower = label.lower() if lower == "pipeline": score += 5 # Pipeline issues are infrastructure elif lower == "bug": score += 12 elif lower == "enhancement": score -= 2 elif lower == "documentation": score -= 5 elif "epic" in lower: score += 3 # Determine suggested priority if score >= 40: suggested = "P0" elif score >= 25: suggested = "P1" elif score >= 10: suggested = "P2" elif score >= 0: suggested = "P3" else: suggested = None # Consider closing or deprioritizing return IssueScore( issue_id=issue_id, repo=repo, title=title, current_labels=labels, current_priority=current_priority, suggested_priority=suggested, score=round(score, 1), reasons=reasons if reasons else ["No strong signals"], age_days=age_days, comment_count=comment_count, assignee=assignee ) # ============================================================ # Report Generation # ============================================================ def generate_report( scores: List[IssueScore], signals: List[PipelineSignal], org: str, repos_scanned: List[str] ) -> Dict[str, Any]: """Generate the full priority report.""" now = datetime.now(timezone.utc).isoformat() # Categorize changes upgrades = [s for s in scores if s.suggested_priority and s.current_priority and PRIORITY_LEVELS.index(s.suggested_priority) < PRIORITY_LEVELS.index(s.current_priority)] downgrades = [s for s in scores if s.suggested_priority and s.current_priority and PRIORITY_LEVELS.index(s.suggested_priority) > PRIORITY_LEVELS.index(s.current_priority)] new_assignments = [s for s in scores if s.suggested_priority and not s.current_priority] no_change = [s for s in scores if s.suggested_priority == s.current_priority] return { "generated_at": now, "org": org, "repos_scanned": repos_scanned, "total_issues": len(scores), "signals": [asdict(s) for s in signals], "summary": { "suggested_upgrades": len(upgrades), "suggested_downgrades": len(downgrades), "suggested_new_priorities": len(new_assignments), "unchanged": len(no_change) }, "top_priority": [asdict(s) for s in sorted(scores, key=lambda x: x.score, reverse=True)[:10]], "upgrades": [asdict(s) for s in upgrades], "downgrades": [asdict(s) for s in downgrades], "new_assignments": [asdict(s) for s in new_assignments], "all_scores": [asdict(s) for s in sorted(scores, key=lambda x: x.score, reverse=True)] } def generate_markdown_report(report: Dict[str, Any]) -> str: """Generate human-readable markdown report.""" lines = [] lines.append("# Priority Rebalancer Report") lines.append(f"**Generated:** {report['generated_at']}") lines.append(f"**Org:** {report['org']}") lines.append(f"**Repos scanned:** {', '.join(report['repos_scanned'])}") lines.append(f"**Issues analyzed:** {report['total_issues']}") lines.append("") # Signals if report["signals"]: lines.append("## Pipeline Signals") for sig in report["signals"]: weight_bar = "█" * int(sig["weight"] * 10) + "░" * (10 - int(sig["weight"] * 10)) lines.append(f"- [{weight_bar}] **{sig['source']}/{sig['signal_type']}** — {sig['detail']}") lines.append("") # Summary s = report["summary"] lines.append("## Summary") lines.append(f"- Suggested upgrades: **{s['suggested_upgrades']}**") lines.append(f"- Suggested downgrades: **{s['suggested_downgrades']}**") lines.append(f"- New priority assignments: **{s['suggested_new_priorities']}**") lines.append(f"- Unchanged: **{s['unchanged']}**") lines.append("") # Top 10 lines.append("## Top 10 by Score") for i, item in enumerate(report["top_priority"][:10], 1): cur = item["current_priority"] or "none" sug = item["suggested_priority"] or "none" arrow = "↑" if PRIORITY_LEVELS.index(sug) < PRIORITY_LEVELS.index(cur) else "↓" if PRIORITY_LEVELS.index(sug) > PRIORITY_LEVELS.index(cur) else "→" lines.append(f"{i}. **[{item['repo']}#{item['issue_id']}]** {item['title']}") lines.append(f" Score: {item['score']} | Current: {cur} {arrow} Suggested: {sug}") lines.append(f" Reasons: {'; '.join(item['reasons'][:3])}") lines.append("") # Upgrades if report["upgrades"]: lines.append("## Suggested Upgrades") for item in report["upgrades"]: lines.append(f"- **[{item['repo']}#{item['issue_id']}]** {item['title']}") lines.append(f" {item['current_priority']} → {item['suggested_priority']} (score: {item['score']})") lines.append(f" {'; '.join(item['reasons'][:2])}") lines.append("") # Downgrades if report["downgrades"]: lines.append("## Suggested Downgrades") for item in report["downgrades"]: lines.append(f"- **[{item['repo']}#{item['issue_id']}]** {item['title']}") lines.append(f" {item['current_priority']} → {item['suggested_priority']} (score: {item['score']})") lines.append(f" {'; '.join(item['reasons'][:2])}") lines.append("") # New assignments if report["new_assignments"]: lines.append("## New Priority Suggestions (currently unlabelled)") for item in report["new_assignments"][:20]: lines.append(f"- **[{item['repo']}#{item['issue_id']}]** {item['title']}") lines.append(f" Suggested: {item['suggested_priority']} (score: {item['score']})") lines.append(f" {'; '.join(item['reasons'][:2])}") lines.append("") return "\n".join(lines) # ============================================================ # Main # ============================================================ def main(): parser = argparse.ArgumentParser(description="Priority Rebalancer — re-score issues based on pipeline data") parser.add_argument("--org", default="Timmy_Foundation", help="Gitea org name") parser.add_argument("--repo", help="Single repo to analyze (default: all)") parser.add_argument("--base-url", default="https://forge.alexanderwhitestone.com", help="Gitea base URL") parser.add_argument("--knowledge-dir", default=None, help="Path to knowledge directory") parser.add_argument("--metrics-dir", default=None, help="Path to metrics directory") parser.add_argument("--scripts-dir", default=None, help="Path to scripts directory") parser.add_argument("--output-dir", default=None, help="Path to output directory") parser.add_argument("--dry-run", action="store_true", help="Show what would change without applying") parser.add_argument("--apply", action="store_true", help="Apply priority changes via API") parser.add_argument("--json", action="store_true", help="Output JSON instead of markdown") args = parser.parse_args() # Resolve paths relative to script location script_dir = Path(__file__).parent repo_root = script_dir.parent knowledge_dir = args.knowledge_dir or str(repo_root / "knowledge") metrics_dir = args.metrics_dir or str(repo_root / "metrics") scripts_dir = args.scripts_dir or str(repo_root / "scripts") output_dir = args.output_dir or str(repo_root / "metrics") # Get token token = os.environ.get("GITEA_TOKEN") if not token: token_path = os.path.expanduser("~/.config/gitea/token") if os.path.exists(token_path): with open(token_path) as f: token = f.read().strip() if not token: print("Error: No Gitea token found. Set GITEA_TOKEN or create ~/.config/gitea/token", file=sys.stderr) sys.exit(1) client = GiteaClient(args.base_url, token) now = datetime.utcnow() # Collect pipeline signals print("Collecting pipeline signals...", file=sys.stderr) signals = [] signals.extend(collect_knowledge_signals(knowledge_dir)) signals.extend(collect_staleness_signals(scripts_dir, knowledge_dir)) signals.extend(collect_metrics_signals(metrics_dir)) print(f" Found {len(signals)} signals", file=sys.stderr) # Get repos if args.repo: repos = [{"name": args.repo}] else: repos = client.get_org_repos(args.org) repo_names = [r["name"] for r in repos] print(f"Scanning {len(repo_names)} repos: {', '.join(repo_names[:5])}{'...' if len(repo_names) > 5 else ''}", file=sys.stderr) # Score all issues all_scores = [] for repo in repos: repo_name = repo["name"] issues = client.get_issues(args.org, repo_name) print(f" {repo_name}: {len(issues)} open issues", file=sys.stderr) for issue in issues: if issue.get("pull_request"): continue score = compute_issue_score(issue, repo_name, signals, now) all_scores.append(score) # Generate report report = generate_report(all_scores, signals, args.org, repo_names) # Output os.makedirs(output_dir, exist_ok=True) if args.json: print(json.dumps(report, indent=2)) else: md = generate_markdown_report(report) print(md) # Write files report_path = os.path.join(output_dir, "priority_report.json") with open(report_path, "w") as f: json.dump(report, f, indent=2) print(f"\nFull report: {report_path}", file=sys.stderr) md_path = os.path.join(output_dir, "priority_suggestions.md") with open(md_path, "w") as f: f.write(generate_markdown_report(report)) print(f"Suggestions: {md_path}", file=sys.stderr) # Apply changes if requested if args.apply: print("\nApplying priority changes...", file=sys.stderr) applied = 0 # Get label IDs for priority labels priority_label_map = {} for repo_name in repo_names: labels = client.get_repo_labels(args.org, repo_name) for label in labels: name = label.get("name", "").lower() if name in ("p0", "p1", "p2", "p3"): priority_label_map[(repo_name, name)] = label["id"] for score in all_scores: if score.suggested_priority and score.suggested_priority != score.current_priority: sug_lower = score.suggested_priority.lower() label_id = priority_label_map.get((score.repo, sug_lower)) if label_id: ok = client.add_label_to_issue(args.org, score.repo, score.issue_id, [label_id]) if ok: applied += 1 print(f" Applied: [{score.repo}#{score.issue_id}] → {score.suggested_priority}", file=sys.stderr) # Add comment explaining the change comment = f"**Priority Rebalancer** suggested: **{score.suggested_priority}** (was: {score.current_priority or 'none'})\n\n" comment += f"Score: {score.score}\n" comment += f"Reasons:\n" for r in score.reasons[:5]: comment += f"- {r}\n" client.add_comment(args.org, score.repo, score.issue_id, comment) print(f"Applied {applied} priority changes", file=sys.stderr) elif args.dry_run: print(f"\nDry run — {report['summary']['suggested_upgrades'] + report['summary']['suggested_downgrades'] + report['summary']['suggested_new_priorities']} changes would be applied", file=sys.stderr) if __name__ == "__main__": main()