Monthly pipeline tool that: - Reads knowledge store, metrics, and staleness data - Scores all open issues across the org - Suggests priority upgrades/downgrades based on accumulated signals - Generates JSON + markdown reports - Optional --apply mode to push changes via Gitea API Signals detected: - Stale/missing knowledge entries - Empty knowledge store - Missing metrics output - Low repo coverage - Issue age, activity, assignment status - Keyword/label analysis Usage: python3 scripts/priority_rebalancer.py --org Timmy_Foundation python3 scripts/priority_rebalancer.py --org Timmy_Foundation --apply python3 scripts/priority_rebalancer.py --org Timmy_Foundation --json 23 tests, all passing.
683 lines
25 KiB
Python
683 lines
25 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Priority Rebalancer — Re-evaluate issue priorities based on accumulated data.
|
|
|
|
Reads pipeline outputs, knowledge store, and Gitea issues to suggest
|
|
priority changes based on what the fleet has learned.
|
|
|
|
Usage:
|
|
python3 scripts/priority_rebalancer.py --org Timmy_Foundation
|
|
python3 scripts/priority_rebalancer.py --org Timmy_Foundation --repo compounding-intelligence
|
|
python3 scripts/priority_rebalancer.py --org Timmy_Foundation --dry-run
|
|
python3 scripts/priority_rebalancer.py --org Timmy_Foundation --apply
|
|
|
|
Output:
|
|
metrics/priority_report.json — full analysis
|
|
metrics/priority_suggestions.md — human-readable suggestions
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
from datetime import datetime, timezone, timedelta
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
from dataclasses import dataclass, field, asdict
|
|
from collections import Counter, defaultdict
|
|
import urllib.request
|
|
import urllib.error
|
|
|
|
|
|
# ============================================================
|
|
# Data Models
|
|
# ============================================================
|
|
|
|
@dataclass
|
|
class IssueScore:
|
|
issue_id: int
|
|
repo: str
|
|
title: str
|
|
current_labels: List[str]
|
|
current_priority: Optional[str]
|
|
suggested_priority: Optional[str]
|
|
score: float
|
|
reasons: List[str]
|
|
age_days: int
|
|
comment_count: int
|
|
assignee: Optional[str]
|
|
dependencies: List[str] = field(default_factory=list)
|
|
blocking: List[str] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class PipelineSignal:
|
|
source: str # "knowledge", "metrics", "sessions", "staleness"
|
|
signal_type: str # "stale_knowledge", "high_error_rate", "missing_coverage", etc.
|
|
weight: float # 0.0 - 1.0
|
|
detail: str
|
|
affected_repos: List[str] = field(default_factory=list)
|
|
affected_issues: List[int] = field(default_factory=list)
|
|
|
|
|
|
# ============================================================
|
|
# Gitea API Client
|
|
# ============================================================
|
|
|
|
class GiteaClient:
|
|
def __init__(self, base_url: str, token: str):
|
|
self.base_url = base_url.rstrip("/")
|
|
self.token = token
|
|
|
|
def _request(self, path: str, params: Dict = None) -> Any:
|
|
url = f"{self.base_url}/api/v1{path}"
|
|
if params:
|
|
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
|
|
url += f"?{qs}"
|
|
|
|
req = urllib.request.Request(url)
|
|
req.add_header("Authorization", f"token {self.token}")
|
|
req.add_header("Content-Type", "application/json")
|
|
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
return json.loads(resp.read().decode())
|
|
except urllib.error.HTTPError as e:
|
|
print(f"API error {e.code} for {path}: {e.read().decode()[:200]}", file=sys.stderr)
|
|
return None
|
|
|
|
def get_org_repos(self, org: str) -> List[Dict]:
|
|
repos = []
|
|
page = 1
|
|
while True:
|
|
batch = self._request(f"/orgs/{org}/repos", {"limit": 50, "page": page})
|
|
if not batch:
|
|
break
|
|
repos.extend(batch)
|
|
if len(batch) < 50:
|
|
break
|
|
page += 1
|
|
return repos
|
|
|
|
def get_issues(self, org: str, repo: str, state: str = "open") -> List[Dict]:
|
|
issues = []
|
|
page = 1
|
|
while True:
|
|
batch = self._request(f"/repos/{org}/{repo}/issues",
|
|
{"state": state, "limit": 50, "page": page, "type": "issues"})
|
|
if not batch:
|
|
break
|
|
issues.extend(batch)
|
|
if len(batch) < 50:
|
|
break
|
|
page += 1
|
|
return issues
|
|
|
|
def add_label_to_issue(self, org: str, repo: str, issue_num: int, label_ids: List[int]) -> bool:
|
|
url = f"{self.base_url}/api/v1/repos/{org}/{repo}/issues/{issue_num}/labels"
|
|
data = json.dumps({"labels": label_ids}).encode()
|
|
req = urllib.request.Request(url, data=data, method="POST")
|
|
req.add_header("Authorization", f"token {self.token}")
|
|
req.add_header("Content-Type", "application/json")
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=15) as resp:
|
|
return resp.status == 200
|
|
except Exception:
|
|
return False
|
|
|
|
def remove_label_from_issue(self, org: str, repo: str, issue_num: int, label_id: int) -> bool:
|
|
url = f"{self.base_url}/api/v1/repos/{org}/{repo}/issues/{issue_num}/labels/{label_id}"
|
|
req = urllib.request.Request(url, method="DELETE")
|
|
req.add_header("Authorization", f"token {self.token}")
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=15) as resp:
|
|
return resp.status == 200
|
|
except Exception:
|
|
return False
|
|
|
|
def get_repo_labels(self, org: str, repo: str) -> List[Dict]:
|
|
labels = []
|
|
page = 1
|
|
while True:
|
|
batch = self._request(f"/repos/{org}/{repo}/labels", {"limit": 50, "page": page})
|
|
if not batch:
|
|
break
|
|
labels.extend(batch)
|
|
if len(batch) < 50:
|
|
break
|
|
page += 1
|
|
return labels
|
|
|
|
def add_comment(self, org: str, repo: str, issue_num: int, body: str) -> bool:
|
|
url = f"{self.base_url}/api/v1/repos/{org}/{repo}/issues/{issue_num}/comments"
|
|
data = json.dumps({"body": body}).encode()
|
|
req = urllib.request.Request(url, data=data, method="POST")
|
|
req.add_header("Authorization", f"token {self.token}")
|
|
req.add_header("Content-Type", "application/json")
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=15) as resp:
|
|
return resp.status == 201
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
# ============================================================
|
|
# Pipeline Signal Collectors
|
|
# ============================================================
|
|
|
|
def collect_knowledge_signals(knowledge_dir: str) -> List[PipelineSignal]:
|
|
"""Analyze knowledge store for coverage gaps and staleness."""
|
|
signals = []
|
|
index_path = os.path.join(knowledge_dir, "index.json")
|
|
|
|
if not os.path.exists(index_path):
|
|
signals.append(PipelineSignal(
|
|
source="knowledge",
|
|
signal_type="missing_index",
|
|
weight=0.8,
|
|
detail="knowledge/index.json not found — no knowledge base exists"
|
|
))
|
|
return signals
|
|
|
|
try:
|
|
with open(index_path) as f:
|
|
data = json.load(f)
|
|
except (json.JSONDecodeError, IOError) as e:
|
|
signals.append(PipelineSignal(
|
|
source="knowledge",
|
|
signal_type="corrupt_index",
|
|
weight=0.9,
|
|
detail=f"knowledge/index.json is corrupt: {e}"
|
|
))
|
|
return signals
|
|
|
|
facts = data.get("facts", [])
|
|
total = len(facts)
|
|
|
|
if total == 0:
|
|
signals.append(PipelineSignal(
|
|
source="knowledge",
|
|
signal_type="empty_knowledge",
|
|
weight=0.7,
|
|
detail="Knowledge store has 0 facts — harvester not running or not finding sessions"
|
|
))
|
|
return signals
|
|
|
|
# Check staleness
|
|
stale_count = 0
|
|
missing_source = 0
|
|
for fact in facts:
|
|
status = fact.get("status", "unknown")
|
|
if status == "stale":
|
|
stale_count += 1
|
|
elif status in ("missing_source", "no_source"):
|
|
missing_source += 1
|
|
|
|
if stale_count > 0:
|
|
signals.append(PipelineSignal(
|
|
source="knowledge",
|
|
signal_type="stale_knowledge",
|
|
weight=min(1.0, stale_count / max(1, total)),
|
|
detail=f"{stale_count}/{total} facts are stale (source files changed)"
|
|
))
|
|
|
|
if missing_source > 0:
|
|
signals.append(PipelineSignal(
|
|
source="knowledge",
|
|
signal_type="missing_sources",
|
|
weight=min(1.0, missing_source / max(1, total)),
|
|
detail=f"{missing_source}/{total} facts have missing source files"
|
|
))
|
|
|
|
# Coverage by repo
|
|
repo_counts = Counter(f.get("repo", "unknown") for f in facts)
|
|
if len(repo_counts) < 3:
|
|
signals.append(PipelineSignal(
|
|
source="knowledge",
|
|
signal_type="low_coverage",
|
|
weight=0.5,
|
|
detail=f"Knowledge covers only {len(repo_counts)} repos — expand harvester scope",
|
|
affected_repos=list(repo_counts.keys())
|
|
))
|
|
|
|
return signals
|
|
|
|
|
|
def collect_staleness_signals(scripts_dir: str, knowledge_dir: str) -> List[PipelineSignal]:
|
|
"""Run staleness checker if available."""
|
|
signals = []
|
|
checker = os.path.join(scripts_dir, "knowledge_staleness_check.py")
|
|
index_path = os.path.join(knowledge_dir, "index.json")
|
|
|
|
if not os.path.exists(checker) or not os.path.exists(index_path):
|
|
return signals
|
|
|
|
try:
|
|
import subprocess
|
|
result = subprocess.run(
|
|
["python3", checker, "--index", index_path, "--json"],
|
|
capture_output=True, text=True, timeout=30
|
|
)
|
|
if result.returncode == 0:
|
|
data = json.loads(result.stdout)
|
|
stale = data.get("stale_count", 0)
|
|
total = data.get("total", 0)
|
|
if stale > 0:
|
|
signals.append(PipelineSignal(
|
|
source="staleness",
|
|
signal_type="stale_knowledge",
|
|
weight=min(1.0, stale / max(1, total)),
|
|
detail=f"Staleness checker found {stale}/{total} stale entries"
|
|
))
|
|
except Exception:
|
|
pass
|
|
|
|
return signals
|
|
|
|
|
|
def collect_metrics_signals(metrics_dir: str) -> List[PipelineSignal]:
|
|
"""Analyze metrics directory for pipeline health."""
|
|
signals = []
|
|
|
|
if not os.path.isdir(metrics_dir):
|
|
return signals
|
|
|
|
files = os.listdir(metrics_dir)
|
|
if len(files) <= 1: # Only .gitkeep
|
|
signals.append(PipelineSignal(
|
|
source="metrics",
|
|
signal_type="no_metrics",
|
|
weight=0.6,
|
|
detail="Metrics directory is empty — measurer pipeline not producing output"
|
|
))
|
|
|
|
return signals
|
|
|
|
|
|
# ============================================================
|
|
# Priority Scoring Engine
|
|
# ============================================================
|
|
|
|
PRIORITY_LEVELS = ["P0", "P1", "P2", "P3", "none"]
|
|
PRIORITY_LABELS = {"p0": "P0", "p1": "P1", "p2": "P2", "p3": "P3",
|
|
"priority:critical": "P0", "priority:high": "P1",
|
|
"priority:medium": "P2", "priority:low": "P3"}
|
|
|
|
|
|
def extract_priority(labels: List[str]) -> Optional[str]:
|
|
"""Extract priority level from issue labels."""
|
|
for label in labels:
|
|
lower = label.lower().strip()
|
|
if lower in PRIORITY_LABELS:
|
|
return PRIORITY_LABELS[lower]
|
|
return None
|
|
|
|
|
|
def compute_issue_score(
|
|
issue: Dict,
|
|
repo: str,
|
|
signals: List[PipelineSignal],
|
|
now: datetime
|
|
) -> IssueScore:
|
|
"""Compute priority score for a single issue."""
|
|
|
|
labels = [l.get("name", "") if isinstance(l, dict) else l for l in issue.get("labels", [])]
|
|
title = issue.get("title", "")
|
|
issue_id = issue.get("number", 0)
|
|
current_priority = extract_priority(labels)
|
|
|
|
# Parse dates
|
|
created_str = issue.get("created_at", "")
|
|
try:
|
|
created = datetime.fromisoformat(created_str.replace("Z", "+00:00"))
|
|
age_days = (now - created.replace(tzinfo=None)).days
|
|
except (ValueError, AttributeError):
|
|
age_days = 0
|
|
|
|
comment_count = issue.get("comments", 0)
|
|
assignee = None
|
|
assignees = issue.get("assignees") or []
|
|
if assignees:
|
|
assignee = assignees[0].get("login") if isinstance(assignees[0], dict) else str(assignees[0])
|
|
|
|
# Base score
|
|
score = 0.0
|
|
reasons = []
|
|
|
|
# Age factor: older issues drift down unless actively discussed
|
|
if age_days > 90 and comment_count < 2:
|
|
score -= 15
|
|
reasons.append(f"Dormant: {age_days} days old with only {comment_count} comments")
|
|
elif age_days > 30:
|
|
score -= 5
|
|
reasons.append(f"Aging: {age_days} days old")
|
|
|
|
# Activity factor: recent discussion suggests urgency
|
|
if comment_count > 5:
|
|
score += 10
|
|
reasons.append(f"Active discussion: {comment_count} comments")
|
|
elif comment_count > 0:
|
|
score += 3
|
|
|
|
# Assignment factor: unassigned issues need triage
|
|
if not assignee:
|
|
score += 5
|
|
reasons.append("Unassigned — needs triage")
|
|
|
|
# Pipeline signal alignment
|
|
for signal in signals:
|
|
title_lower = title.lower()
|
|
if signal.signal_type == "stale_knowledge" and "stale" in title_lower:
|
|
score += signal.weight * 20
|
|
reasons.append(f"Matches signal: {signal.detail}")
|
|
elif signal.signal_type == "empty_knowledge" and ("harvester" in title_lower or "knowledge" in title_lower):
|
|
score += signal.weight * 25
|
|
reasons.append(f"Critical gap: {signal.detail}")
|
|
elif signal.signal_type == "no_metrics" and "measur" in title_lower:
|
|
score += signal.weight * 15
|
|
reasons.append(f"Pipeline gap: {signal.detail}")
|
|
elif signal.signal_type == "low_coverage" and any(r.lower() in title_lower for r in signal.affected_repos):
|
|
score += signal.weight * 10
|
|
reasons.append(f"Coverage gap: {signal.detail}")
|
|
|
|
# Keyword boosts
|
|
keyword_scores = {
|
|
"broken": 20, "bug": 15, "fix": 10, "error": 12, "fail": 15,
|
|
"security": 25, "auth": 20, "data loss": 30, "crash": 25,
|
|
"blocker": 20, "urgent": 15, "critical": 15,
|
|
"epic": 8, "feature": -3, "nice to have": -10, "someday": -15
|
|
}
|
|
title_lower = title.lower()
|
|
for keyword, boost in keyword_scores.items():
|
|
if keyword in title_lower:
|
|
score += boost
|
|
if boost > 0:
|
|
reasons.append(f"Keyword match: '{keyword}' (+{boost})")
|
|
|
|
# Label-based adjustments
|
|
for label in labels:
|
|
lower = label.lower()
|
|
if lower == "pipeline":
|
|
score += 5 # Pipeline issues are infrastructure
|
|
elif lower == "bug":
|
|
score += 12
|
|
elif lower == "enhancement":
|
|
score -= 2
|
|
elif lower == "documentation":
|
|
score -= 5
|
|
elif "epic" in lower:
|
|
score += 3
|
|
|
|
# Determine suggested priority
|
|
if score >= 40:
|
|
suggested = "P0"
|
|
elif score >= 25:
|
|
suggested = "P1"
|
|
elif score >= 10:
|
|
suggested = "P2"
|
|
elif score >= 0:
|
|
suggested = "P3"
|
|
else:
|
|
suggested = None # Consider closing or deprioritizing
|
|
|
|
return IssueScore(
|
|
issue_id=issue_id,
|
|
repo=repo,
|
|
title=title,
|
|
current_labels=labels,
|
|
current_priority=current_priority,
|
|
suggested_priority=suggested,
|
|
score=round(score, 1),
|
|
reasons=reasons if reasons else ["No strong signals"],
|
|
age_days=age_days,
|
|
comment_count=comment_count,
|
|
assignee=assignee
|
|
)
|
|
|
|
|
|
# ============================================================
|
|
# Report Generation
|
|
# ============================================================
|
|
|
|
def generate_report(
|
|
scores: List[IssueScore],
|
|
signals: List[PipelineSignal],
|
|
org: str,
|
|
repos_scanned: List[str]
|
|
) -> Dict[str, Any]:
|
|
"""Generate the full priority report."""
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
|
|
# Categorize changes
|
|
upgrades = [s for s in scores if s.suggested_priority and s.current_priority and
|
|
PRIORITY_LEVELS.index(s.suggested_priority) < PRIORITY_LEVELS.index(s.current_priority)]
|
|
downgrades = [s for s in scores if s.suggested_priority and s.current_priority and
|
|
PRIORITY_LEVELS.index(s.suggested_priority) > PRIORITY_LEVELS.index(s.current_priority)]
|
|
new_assignments = [s for s in scores if s.suggested_priority and not s.current_priority]
|
|
no_change = [s for s in scores if s.suggested_priority == s.current_priority]
|
|
|
|
return {
|
|
"generated_at": now,
|
|
"org": org,
|
|
"repos_scanned": repos_scanned,
|
|
"total_issues": len(scores),
|
|
"signals": [asdict(s) for s in signals],
|
|
"summary": {
|
|
"suggested_upgrades": len(upgrades),
|
|
"suggested_downgrades": len(downgrades),
|
|
"suggested_new_priorities": len(new_assignments),
|
|
"unchanged": len(no_change)
|
|
},
|
|
"top_priority": [asdict(s) for s in sorted(scores, key=lambda x: x.score, reverse=True)[:10]],
|
|
"upgrades": [asdict(s) for s in upgrades],
|
|
"downgrades": [asdict(s) for s in downgrades],
|
|
"new_assignments": [asdict(s) for s in new_assignments],
|
|
"all_scores": [asdict(s) for s in sorted(scores, key=lambda x: x.score, reverse=True)]
|
|
}
|
|
|
|
|
|
def generate_markdown_report(report: Dict[str, Any]) -> str:
|
|
"""Generate human-readable markdown report."""
|
|
lines = []
|
|
lines.append("# Priority Rebalancer Report")
|
|
lines.append(f"**Generated:** {report['generated_at']}")
|
|
lines.append(f"**Org:** {report['org']}")
|
|
lines.append(f"**Repos scanned:** {', '.join(report['repos_scanned'])}")
|
|
lines.append(f"**Issues analyzed:** {report['total_issues']}")
|
|
lines.append("")
|
|
|
|
# Signals
|
|
if report["signals"]:
|
|
lines.append("## Pipeline Signals")
|
|
for sig in report["signals"]:
|
|
weight_bar = "█" * int(sig["weight"] * 10) + "░" * (10 - int(sig["weight"] * 10))
|
|
lines.append(f"- [{weight_bar}] **{sig['source']}/{sig['signal_type']}** — {sig['detail']}")
|
|
lines.append("")
|
|
|
|
# Summary
|
|
s = report["summary"]
|
|
lines.append("## Summary")
|
|
lines.append(f"- Suggested upgrades: **{s['suggested_upgrades']}**")
|
|
lines.append(f"- Suggested downgrades: **{s['suggested_downgrades']}**")
|
|
lines.append(f"- New priority assignments: **{s['suggested_new_priorities']}**")
|
|
lines.append(f"- Unchanged: **{s['unchanged']}**")
|
|
lines.append("")
|
|
|
|
# Top 10
|
|
lines.append("## Top 10 by Score")
|
|
for i, item in enumerate(report["top_priority"][:10], 1):
|
|
cur = item["current_priority"] or "none"
|
|
sug = item["suggested_priority"] or "none"
|
|
arrow = "↑" if PRIORITY_LEVELS.index(sug) < PRIORITY_LEVELS.index(cur) else "↓" if PRIORITY_LEVELS.index(sug) > PRIORITY_LEVELS.index(cur) else "→"
|
|
lines.append(f"{i}. **[{item['repo']}#{item['issue_id']}]** {item['title']}")
|
|
lines.append(f" Score: {item['score']} | Current: {cur} {arrow} Suggested: {sug}")
|
|
lines.append(f" Reasons: {'; '.join(item['reasons'][:3])}")
|
|
lines.append("")
|
|
|
|
# Upgrades
|
|
if report["upgrades"]:
|
|
lines.append("## Suggested Upgrades")
|
|
for item in report["upgrades"]:
|
|
lines.append(f"- **[{item['repo']}#{item['issue_id']}]** {item['title']}")
|
|
lines.append(f" {item['current_priority']} → {item['suggested_priority']} (score: {item['score']})")
|
|
lines.append(f" {'; '.join(item['reasons'][:2])}")
|
|
lines.append("")
|
|
|
|
# Downgrades
|
|
if report["downgrades"]:
|
|
lines.append("## Suggested Downgrades")
|
|
for item in report["downgrades"]:
|
|
lines.append(f"- **[{item['repo']}#{item['issue_id']}]** {item['title']}")
|
|
lines.append(f" {item['current_priority']} → {item['suggested_priority']} (score: {item['score']})")
|
|
lines.append(f" {'; '.join(item['reasons'][:2])}")
|
|
lines.append("")
|
|
|
|
# New assignments
|
|
if report["new_assignments"]:
|
|
lines.append("## New Priority Suggestions (currently unlabelled)")
|
|
for item in report["new_assignments"][:20]:
|
|
lines.append(f"- **[{item['repo']}#{item['issue_id']}]** {item['title']}")
|
|
lines.append(f" Suggested: {item['suggested_priority']} (score: {item['score']})")
|
|
lines.append(f" {'; '.join(item['reasons'][:2])}")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ============================================================
|
|
# Main
|
|
# ============================================================
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Priority Rebalancer — re-score issues based on pipeline data")
|
|
parser.add_argument("--org", default="Timmy_Foundation", help="Gitea org name")
|
|
parser.add_argument("--repo", help="Single repo to analyze (default: all)")
|
|
parser.add_argument("--base-url", default="https://forge.alexanderwhitestone.com", help="Gitea base URL")
|
|
parser.add_argument("--knowledge-dir", default=None, help="Path to knowledge directory")
|
|
parser.add_argument("--metrics-dir", default=None, help="Path to metrics directory")
|
|
parser.add_argument("--scripts-dir", default=None, help="Path to scripts directory")
|
|
parser.add_argument("--output-dir", default=None, help="Path to output directory")
|
|
parser.add_argument("--dry-run", action="store_true", help="Show what would change without applying")
|
|
parser.add_argument("--apply", action="store_true", help="Apply priority changes via API")
|
|
parser.add_argument("--json", action="store_true", help="Output JSON instead of markdown")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Resolve paths relative to script location
|
|
script_dir = Path(__file__).parent
|
|
repo_root = script_dir.parent
|
|
|
|
knowledge_dir = args.knowledge_dir or str(repo_root / "knowledge")
|
|
metrics_dir = args.metrics_dir or str(repo_root / "metrics")
|
|
scripts_dir = args.scripts_dir or str(repo_root / "scripts")
|
|
output_dir = args.output_dir or str(repo_root / "metrics")
|
|
|
|
# Get token
|
|
token = os.environ.get("GITEA_TOKEN")
|
|
if not token:
|
|
token_path = os.path.expanduser("~/.config/gitea/token")
|
|
if os.path.exists(token_path):
|
|
with open(token_path) as f:
|
|
token = f.read().strip()
|
|
|
|
if not token:
|
|
print("Error: No Gitea token found. Set GITEA_TOKEN or create ~/.config/gitea/token", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
client = GiteaClient(args.base_url, token)
|
|
now = datetime.utcnow()
|
|
|
|
# Collect pipeline signals
|
|
print("Collecting pipeline signals...", file=sys.stderr)
|
|
signals = []
|
|
signals.extend(collect_knowledge_signals(knowledge_dir))
|
|
signals.extend(collect_staleness_signals(scripts_dir, knowledge_dir))
|
|
signals.extend(collect_metrics_signals(metrics_dir))
|
|
print(f" Found {len(signals)} signals", file=sys.stderr)
|
|
|
|
# Get repos
|
|
if args.repo:
|
|
repos = [{"name": args.repo}]
|
|
else:
|
|
repos = client.get_org_repos(args.org)
|
|
|
|
repo_names = [r["name"] for r in repos]
|
|
print(f"Scanning {len(repo_names)} repos: {', '.join(repo_names[:5])}{'...' if len(repo_names) > 5 else ''}", file=sys.stderr)
|
|
|
|
# Score all issues
|
|
all_scores = []
|
|
for repo in repos:
|
|
repo_name = repo["name"]
|
|
issues = client.get_issues(args.org, repo_name)
|
|
print(f" {repo_name}: {len(issues)} open issues", file=sys.stderr)
|
|
|
|
for issue in issues:
|
|
if issue.get("pull_request"):
|
|
continue
|
|
score = compute_issue_score(issue, repo_name, signals, now)
|
|
all_scores.append(score)
|
|
|
|
# Generate report
|
|
report = generate_report(all_scores, signals, args.org, repo_names)
|
|
|
|
# Output
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
if args.json:
|
|
print(json.dumps(report, indent=2))
|
|
else:
|
|
md = generate_markdown_report(report)
|
|
print(md)
|
|
|
|
# Write files
|
|
report_path = os.path.join(output_dir, "priority_report.json")
|
|
with open(report_path, "w") as f:
|
|
json.dump(report, f, indent=2)
|
|
print(f"\nFull report: {report_path}", file=sys.stderr)
|
|
|
|
md_path = os.path.join(output_dir, "priority_suggestions.md")
|
|
with open(md_path, "w") as f:
|
|
f.write(generate_markdown_report(report))
|
|
print(f"Suggestions: {md_path}", file=sys.stderr)
|
|
|
|
# Apply changes if requested
|
|
if args.apply:
|
|
print("\nApplying priority changes...", file=sys.stderr)
|
|
applied = 0
|
|
|
|
# Get label IDs for priority labels
|
|
priority_label_map = {}
|
|
for repo_name in repo_names:
|
|
labels = client.get_repo_labels(args.org, repo_name)
|
|
for label in labels:
|
|
name = label.get("name", "").lower()
|
|
if name in ("p0", "p1", "p2", "p3"):
|
|
priority_label_map[(repo_name, name)] = label["id"]
|
|
|
|
for score in all_scores:
|
|
if score.suggested_priority and score.suggested_priority != score.current_priority:
|
|
sug_lower = score.suggested_priority.lower()
|
|
label_id = priority_label_map.get((score.repo, sug_lower))
|
|
if label_id:
|
|
ok = client.add_label_to_issue(args.org, score.repo, score.issue_id, [label_id])
|
|
if ok:
|
|
applied += 1
|
|
print(f" Applied: [{score.repo}#{score.issue_id}] → {score.suggested_priority}", file=sys.stderr)
|
|
|
|
# Add comment explaining the change
|
|
comment = f"**Priority Rebalancer** suggested: **{score.suggested_priority}** (was: {score.current_priority or 'none'})\n\n"
|
|
comment += f"Score: {score.score}\n"
|
|
comment += f"Reasons:\n"
|
|
for r in score.reasons[:5]:
|
|
comment += f"- {r}\n"
|
|
client.add_comment(args.org, score.repo, score.issue_id, comment)
|
|
|
|
print(f"Applied {applied} priority changes", file=sys.stderr)
|
|
|
|
elif args.dry_run:
|
|
print(f"\nDry run — {report['summary']['suggested_upgrades'] + report['summary']['suggested_downgrades'] + report['summary']['suggested_new_priorities']} changes would be applied", file=sys.stderr)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|