Files
compounding-intelligence/scripts/priority_rebalancer.py
Alexander Whitestone 341abab2a0 feat: Priority Rebalancer — re-score issues from pipeline data (#174)
Monthly pipeline tool that:
- Reads knowledge store, metrics, and staleness data
- Scores all open issues across the org
- Suggests priority upgrades/downgrades based on accumulated signals
- Generates JSON + markdown reports
- Optional --apply mode to push changes via Gitea API

Signals detected:
- Stale/missing knowledge entries
- Empty knowledge store
- Missing metrics output
- Low repo coverage
- Issue age, activity, assignment status
- Keyword/label analysis

Usage:
  python3 scripts/priority_rebalancer.py --org Timmy_Foundation
  python3 scripts/priority_rebalancer.py --org Timmy_Foundation --apply
  python3 scripts/priority_rebalancer.py --org Timmy_Foundation --json

23 tests, all passing.
2026-04-15 10:52:51 -04:00

683 lines
25 KiB
Python

#!/usr/bin/env python3
"""
Priority Rebalancer — Re-evaluate issue priorities based on accumulated data.
Reads pipeline outputs, knowledge store, and Gitea issues to suggest
priority changes based on what the fleet has learned.
Usage:
python3 scripts/priority_rebalancer.py --org Timmy_Foundation
python3 scripts/priority_rebalancer.py --org Timmy_Foundation --repo compounding-intelligence
python3 scripts/priority_rebalancer.py --org Timmy_Foundation --dry-run
python3 scripts/priority_rebalancer.py --org Timmy_Foundation --apply
Output:
metrics/priority_report.json — full analysis
metrics/priority_suggestions.md — human-readable suggestions
"""
import argparse
import json
import os
import sys
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, field, asdict
from collections import Counter, defaultdict
import urllib.request
import urllib.error
# ============================================================
# Data Models
# ============================================================
@dataclass
class IssueScore:
issue_id: int
repo: str
title: str
current_labels: List[str]
current_priority: Optional[str]
suggested_priority: Optional[str]
score: float
reasons: List[str]
age_days: int
comment_count: int
assignee: Optional[str]
dependencies: List[str] = field(default_factory=list)
blocking: List[str] = field(default_factory=list)
@dataclass
class PipelineSignal:
source: str # "knowledge", "metrics", "sessions", "staleness"
signal_type: str # "stale_knowledge", "high_error_rate", "missing_coverage", etc.
weight: float # 0.0 - 1.0
detail: str
affected_repos: List[str] = field(default_factory=list)
affected_issues: List[int] = field(default_factory=list)
# ============================================================
# Gitea API Client
# ============================================================
class GiteaClient:
def __init__(self, base_url: str, token: str):
self.base_url = base_url.rstrip("/")
self.token = token
def _request(self, path: str, params: Dict = None) -> Any:
url = f"{self.base_url}/api/v1{path}"
if params:
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
url += f"?{qs}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
print(f"API error {e.code} for {path}: {e.read().decode()[:200]}", file=sys.stderr)
return None
def get_org_repos(self, org: str) -> List[Dict]:
repos = []
page = 1
while True:
batch = self._request(f"/orgs/{org}/repos", {"limit": 50, "page": page})
if not batch:
break
repos.extend(batch)
if len(batch) < 50:
break
page += 1
return repos
def get_issues(self, org: str, repo: str, state: str = "open") -> List[Dict]:
issues = []
page = 1
while True:
batch = self._request(f"/repos/{org}/{repo}/issues",
{"state": state, "limit": 50, "page": page, "type": "issues"})
if not batch:
break
issues.extend(batch)
if len(batch) < 50:
break
page += 1
return issues
def add_label_to_issue(self, org: str, repo: str, issue_num: int, label_ids: List[int]) -> bool:
url = f"{self.base_url}/api/v1/repos/{org}/{repo}/issues/{issue_num}/labels"
data = json.dumps({"labels": label_ids}).encode()
req = urllib.request.Request(url, data=data, method="POST")
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return resp.status == 200
except Exception:
return False
def remove_label_from_issue(self, org: str, repo: str, issue_num: int, label_id: int) -> bool:
url = f"{self.base_url}/api/v1/repos/{org}/{repo}/issues/{issue_num}/labels/{label_id}"
req = urllib.request.Request(url, method="DELETE")
req.add_header("Authorization", f"token {self.token}")
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return resp.status == 200
except Exception:
return False
def get_repo_labels(self, org: str, repo: str) -> List[Dict]:
labels = []
page = 1
while True:
batch = self._request(f"/repos/{org}/{repo}/labels", {"limit": 50, "page": page})
if not batch:
break
labels.extend(batch)
if len(batch) < 50:
break
page += 1
return labels
def add_comment(self, org: str, repo: str, issue_num: int, body: str) -> bool:
url = f"{self.base_url}/api/v1/repos/{org}/{repo}/issues/{issue_num}/comments"
data = json.dumps({"body": body}).encode()
req = urllib.request.Request(url, data=data, method="POST")
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return resp.status == 201
except Exception:
return False
# ============================================================
# Pipeline Signal Collectors
# ============================================================
def collect_knowledge_signals(knowledge_dir: str) -> List[PipelineSignal]:
"""Analyze knowledge store for coverage gaps and staleness."""
signals = []
index_path = os.path.join(knowledge_dir, "index.json")
if not os.path.exists(index_path):
signals.append(PipelineSignal(
source="knowledge",
signal_type="missing_index",
weight=0.8,
detail="knowledge/index.json not found — no knowledge base exists"
))
return signals
try:
with open(index_path) as f:
data = json.load(f)
except (json.JSONDecodeError, IOError) as e:
signals.append(PipelineSignal(
source="knowledge",
signal_type="corrupt_index",
weight=0.9,
detail=f"knowledge/index.json is corrupt: {e}"
))
return signals
facts = data.get("facts", [])
total = len(facts)
if total == 0:
signals.append(PipelineSignal(
source="knowledge",
signal_type="empty_knowledge",
weight=0.7,
detail="Knowledge store has 0 facts — harvester not running or not finding sessions"
))
return signals
# Check staleness
stale_count = 0
missing_source = 0
for fact in facts:
status = fact.get("status", "unknown")
if status == "stale":
stale_count += 1
elif status in ("missing_source", "no_source"):
missing_source += 1
if stale_count > 0:
signals.append(PipelineSignal(
source="knowledge",
signal_type="stale_knowledge",
weight=min(1.0, stale_count / max(1, total)),
detail=f"{stale_count}/{total} facts are stale (source files changed)"
))
if missing_source > 0:
signals.append(PipelineSignal(
source="knowledge",
signal_type="missing_sources",
weight=min(1.0, missing_source / max(1, total)),
detail=f"{missing_source}/{total} facts have missing source files"
))
# Coverage by repo
repo_counts = Counter(f.get("repo", "unknown") for f in facts)
if len(repo_counts) < 3:
signals.append(PipelineSignal(
source="knowledge",
signal_type="low_coverage",
weight=0.5,
detail=f"Knowledge covers only {len(repo_counts)} repos — expand harvester scope",
affected_repos=list(repo_counts.keys())
))
return signals
def collect_staleness_signals(scripts_dir: str, knowledge_dir: str) -> List[PipelineSignal]:
"""Run staleness checker if available."""
signals = []
checker = os.path.join(scripts_dir, "knowledge_staleness_check.py")
index_path = os.path.join(knowledge_dir, "index.json")
if not os.path.exists(checker) or not os.path.exists(index_path):
return signals
try:
import subprocess
result = subprocess.run(
["python3", checker, "--index", index_path, "--json"],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0:
data = json.loads(result.stdout)
stale = data.get("stale_count", 0)
total = data.get("total", 0)
if stale > 0:
signals.append(PipelineSignal(
source="staleness",
signal_type="stale_knowledge",
weight=min(1.0, stale / max(1, total)),
detail=f"Staleness checker found {stale}/{total} stale entries"
))
except Exception:
pass
return signals
def collect_metrics_signals(metrics_dir: str) -> List[PipelineSignal]:
"""Analyze metrics directory for pipeline health."""
signals = []
if not os.path.isdir(metrics_dir):
return signals
files = os.listdir(metrics_dir)
if len(files) <= 1: # Only .gitkeep
signals.append(PipelineSignal(
source="metrics",
signal_type="no_metrics",
weight=0.6,
detail="Metrics directory is empty — measurer pipeline not producing output"
))
return signals
# ============================================================
# Priority Scoring Engine
# ============================================================
PRIORITY_LEVELS = ["P0", "P1", "P2", "P3", "none"]
PRIORITY_LABELS = {"p0": "P0", "p1": "P1", "p2": "P2", "p3": "P3",
"priority:critical": "P0", "priority:high": "P1",
"priority:medium": "P2", "priority:low": "P3"}
def extract_priority(labels: List[str]) -> Optional[str]:
"""Extract priority level from issue labels."""
for label in labels:
lower = label.lower().strip()
if lower in PRIORITY_LABELS:
return PRIORITY_LABELS[lower]
return None
def compute_issue_score(
issue: Dict,
repo: str,
signals: List[PipelineSignal],
now: datetime
) -> IssueScore:
"""Compute priority score for a single issue."""
labels = [l.get("name", "") if isinstance(l, dict) else l for l in issue.get("labels", [])]
title = issue.get("title", "")
issue_id = issue.get("number", 0)
current_priority = extract_priority(labels)
# Parse dates
created_str = issue.get("created_at", "")
try:
created = datetime.fromisoformat(created_str.replace("Z", "+00:00"))
age_days = (now - created.replace(tzinfo=None)).days
except (ValueError, AttributeError):
age_days = 0
comment_count = issue.get("comments", 0)
assignee = None
assignees = issue.get("assignees") or []
if assignees:
assignee = assignees[0].get("login") if isinstance(assignees[0], dict) else str(assignees[0])
# Base score
score = 0.0
reasons = []
# Age factor: older issues drift down unless actively discussed
if age_days > 90 and comment_count < 2:
score -= 15
reasons.append(f"Dormant: {age_days} days old with only {comment_count} comments")
elif age_days > 30:
score -= 5
reasons.append(f"Aging: {age_days} days old")
# Activity factor: recent discussion suggests urgency
if comment_count > 5:
score += 10
reasons.append(f"Active discussion: {comment_count} comments")
elif comment_count > 0:
score += 3
# Assignment factor: unassigned issues need triage
if not assignee:
score += 5
reasons.append("Unassigned — needs triage")
# Pipeline signal alignment
for signal in signals:
title_lower = title.lower()
if signal.signal_type == "stale_knowledge" and "stale" in title_lower:
score += signal.weight * 20
reasons.append(f"Matches signal: {signal.detail}")
elif signal.signal_type == "empty_knowledge" and ("harvester" in title_lower or "knowledge" in title_lower):
score += signal.weight * 25
reasons.append(f"Critical gap: {signal.detail}")
elif signal.signal_type == "no_metrics" and "measur" in title_lower:
score += signal.weight * 15
reasons.append(f"Pipeline gap: {signal.detail}")
elif signal.signal_type == "low_coverage" and any(r.lower() in title_lower for r in signal.affected_repos):
score += signal.weight * 10
reasons.append(f"Coverage gap: {signal.detail}")
# Keyword boosts
keyword_scores = {
"broken": 20, "bug": 15, "fix": 10, "error": 12, "fail": 15,
"security": 25, "auth": 20, "data loss": 30, "crash": 25,
"blocker": 20, "urgent": 15, "critical": 15,
"epic": 8, "feature": -3, "nice to have": -10, "someday": -15
}
title_lower = title.lower()
for keyword, boost in keyword_scores.items():
if keyword in title_lower:
score += boost
if boost > 0:
reasons.append(f"Keyword match: '{keyword}' (+{boost})")
# Label-based adjustments
for label in labels:
lower = label.lower()
if lower == "pipeline":
score += 5 # Pipeline issues are infrastructure
elif lower == "bug":
score += 12
elif lower == "enhancement":
score -= 2
elif lower == "documentation":
score -= 5
elif "epic" in lower:
score += 3
# Determine suggested priority
if score >= 40:
suggested = "P0"
elif score >= 25:
suggested = "P1"
elif score >= 10:
suggested = "P2"
elif score >= 0:
suggested = "P3"
else:
suggested = None # Consider closing or deprioritizing
return IssueScore(
issue_id=issue_id,
repo=repo,
title=title,
current_labels=labels,
current_priority=current_priority,
suggested_priority=suggested,
score=round(score, 1),
reasons=reasons if reasons else ["No strong signals"],
age_days=age_days,
comment_count=comment_count,
assignee=assignee
)
# ============================================================
# Report Generation
# ============================================================
def generate_report(
scores: List[IssueScore],
signals: List[PipelineSignal],
org: str,
repos_scanned: List[str]
) -> Dict[str, Any]:
"""Generate the full priority report."""
now = datetime.now(timezone.utc).isoformat()
# Categorize changes
upgrades = [s for s in scores if s.suggested_priority and s.current_priority and
PRIORITY_LEVELS.index(s.suggested_priority) < PRIORITY_LEVELS.index(s.current_priority)]
downgrades = [s for s in scores if s.suggested_priority and s.current_priority and
PRIORITY_LEVELS.index(s.suggested_priority) > PRIORITY_LEVELS.index(s.current_priority)]
new_assignments = [s for s in scores if s.suggested_priority and not s.current_priority]
no_change = [s for s in scores if s.suggested_priority == s.current_priority]
return {
"generated_at": now,
"org": org,
"repos_scanned": repos_scanned,
"total_issues": len(scores),
"signals": [asdict(s) for s in signals],
"summary": {
"suggested_upgrades": len(upgrades),
"suggested_downgrades": len(downgrades),
"suggested_new_priorities": len(new_assignments),
"unchanged": len(no_change)
},
"top_priority": [asdict(s) for s in sorted(scores, key=lambda x: x.score, reverse=True)[:10]],
"upgrades": [asdict(s) for s in upgrades],
"downgrades": [asdict(s) for s in downgrades],
"new_assignments": [asdict(s) for s in new_assignments],
"all_scores": [asdict(s) for s in sorted(scores, key=lambda x: x.score, reverse=True)]
}
def generate_markdown_report(report: Dict[str, Any]) -> str:
"""Generate human-readable markdown report."""
lines = []
lines.append("# Priority Rebalancer Report")
lines.append(f"**Generated:** {report['generated_at']}")
lines.append(f"**Org:** {report['org']}")
lines.append(f"**Repos scanned:** {', '.join(report['repos_scanned'])}")
lines.append(f"**Issues analyzed:** {report['total_issues']}")
lines.append("")
# Signals
if report["signals"]:
lines.append("## Pipeline Signals")
for sig in report["signals"]:
weight_bar = "" * int(sig["weight"] * 10) + "" * (10 - int(sig["weight"] * 10))
lines.append(f"- [{weight_bar}] **{sig['source']}/{sig['signal_type']}** — {sig['detail']}")
lines.append("")
# Summary
s = report["summary"]
lines.append("## Summary")
lines.append(f"- Suggested upgrades: **{s['suggested_upgrades']}**")
lines.append(f"- Suggested downgrades: **{s['suggested_downgrades']}**")
lines.append(f"- New priority assignments: **{s['suggested_new_priorities']}**")
lines.append(f"- Unchanged: **{s['unchanged']}**")
lines.append("")
# Top 10
lines.append("## Top 10 by Score")
for i, item in enumerate(report["top_priority"][:10], 1):
cur = item["current_priority"] or "none"
sug = item["suggested_priority"] or "none"
arrow = "" if PRIORITY_LEVELS.index(sug) < PRIORITY_LEVELS.index(cur) else "" if PRIORITY_LEVELS.index(sug) > PRIORITY_LEVELS.index(cur) else ""
lines.append(f"{i}. **[{item['repo']}#{item['issue_id']}]** {item['title']}")
lines.append(f" Score: {item['score']} | Current: {cur} {arrow} Suggested: {sug}")
lines.append(f" Reasons: {'; '.join(item['reasons'][:3])}")
lines.append("")
# Upgrades
if report["upgrades"]:
lines.append("## Suggested Upgrades")
for item in report["upgrades"]:
lines.append(f"- **[{item['repo']}#{item['issue_id']}]** {item['title']}")
lines.append(f" {item['current_priority']}{item['suggested_priority']} (score: {item['score']})")
lines.append(f" {'; '.join(item['reasons'][:2])}")
lines.append("")
# Downgrades
if report["downgrades"]:
lines.append("## Suggested Downgrades")
for item in report["downgrades"]:
lines.append(f"- **[{item['repo']}#{item['issue_id']}]** {item['title']}")
lines.append(f" {item['current_priority']}{item['suggested_priority']} (score: {item['score']})")
lines.append(f" {'; '.join(item['reasons'][:2])}")
lines.append("")
# New assignments
if report["new_assignments"]:
lines.append("## New Priority Suggestions (currently unlabelled)")
for item in report["new_assignments"][:20]:
lines.append(f"- **[{item['repo']}#{item['issue_id']}]** {item['title']}")
lines.append(f" Suggested: {item['suggested_priority']} (score: {item['score']})")
lines.append(f" {'; '.join(item['reasons'][:2])}")
lines.append("")
return "\n".join(lines)
# ============================================================
# Main
# ============================================================
def main():
parser = argparse.ArgumentParser(description="Priority Rebalancer — re-score issues based on pipeline data")
parser.add_argument("--org", default="Timmy_Foundation", help="Gitea org name")
parser.add_argument("--repo", help="Single repo to analyze (default: all)")
parser.add_argument("--base-url", default="https://forge.alexanderwhitestone.com", help="Gitea base URL")
parser.add_argument("--knowledge-dir", default=None, help="Path to knowledge directory")
parser.add_argument("--metrics-dir", default=None, help="Path to metrics directory")
parser.add_argument("--scripts-dir", default=None, help="Path to scripts directory")
parser.add_argument("--output-dir", default=None, help="Path to output directory")
parser.add_argument("--dry-run", action="store_true", help="Show what would change without applying")
parser.add_argument("--apply", action="store_true", help="Apply priority changes via API")
parser.add_argument("--json", action="store_true", help="Output JSON instead of markdown")
args = parser.parse_args()
# Resolve paths relative to script location
script_dir = Path(__file__).parent
repo_root = script_dir.parent
knowledge_dir = args.knowledge_dir or str(repo_root / "knowledge")
metrics_dir = args.metrics_dir or str(repo_root / "metrics")
scripts_dir = args.scripts_dir or str(repo_root / "scripts")
output_dir = args.output_dir or str(repo_root / "metrics")
# Get token
token = os.environ.get("GITEA_TOKEN")
if not token:
token_path = os.path.expanduser("~/.config/gitea/token")
if os.path.exists(token_path):
with open(token_path) as f:
token = f.read().strip()
if not token:
print("Error: No Gitea token found. Set GITEA_TOKEN or create ~/.config/gitea/token", file=sys.stderr)
sys.exit(1)
client = GiteaClient(args.base_url, token)
now = datetime.utcnow()
# Collect pipeline signals
print("Collecting pipeline signals...", file=sys.stderr)
signals = []
signals.extend(collect_knowledge_signals(knowledge_dir))
signals.extend(collect_staleness_signals(scripts_dir, knowledge_dir))
signals.extend(collect_metrics_signals(metrics_dir))
print(f" Found {len(signals)} signals", file=sys.stderr)
# Get repos
if args.repo:
repos = [{"name": args.repo}]
else:
repos = client.get_org_repos(args.org)
repo_names = [r["name"] for r in repos]
print(f"Scanning {len(repo_names)} repos: {', '.join(repo_names[:5])}{'...' if len(repo_names) > 5 else ''}", file=sys.stderr)
# Score all issues
all_scores = []
for repo in repos:
repo_name = repo["name"]
issues = client.get_issues(args.org, repo_name)
print(f" {repo_name}: {len(issues)} open issues", file=sys.stderr)
for issue in issues:
if issue.get("pull_request"):
continue
score = compute_issue_score(issue, repo_name, signals, now)
all_scores.append(score)
# Generate report
report = generate_report(all_scores, signals, args.org, repo_names)
# Output
os.makedirs(output_dir, exist_ok=True)
if args.json:
print(json.dumps(report, indent=2))
else:
md = generate_markdown_report(report)
print(md)
# Write files
report_path = os.path.join(output_dir, "priority_report.json")
with open(report_path, "w") as f:
json.dump(report, f, indent=2)
print(f"\nFull report: {report_path}", file=sys.stderr)
md_path = os.path.join(output_dir, "priority_suggestions.md")
with open(md_path, "w") as f:
f.write(generate_markdown_report(report))
print(f"Suggestions: {md_path}", file=sys.stderr)
# Apply changes if requested
if args.apply:
print("\nApplying priority changes...", file=sys.stderr)
applied = 0
# Get label IDs for priority labels
priority_label_map = {}
for repo_name in repo_names:
labels = client.get_repo_labels(args.org, repo_name)
for label in labels:
name = label.get("name", "").lower()
if name in ("p0", "p1", "p2", "p3"):
priority_label_map[(repo_name, name)] = label["id"]
for score in all_scores:
if score.suggested_priority and score.suggested_priority != score.current_priority:
sug_lower = score.suggested_priority.lower()
label_id = priority_label_map.get((score.repo, sug_lower))
if label_id:
ok = client.add_label_to_issue(args.org, score.repo, score.issue_id, [label_id])
if ok:
applied += 1
print(f" Applied: [{score.repo}#{score.issue_id}] → {score.suggested_priority}", file=sys.stderr)
# Add comment explaining the change
comment = f"**Priority Rebalancer** suggested: **{score.suggested_priority}** (was: {score.current_priority or 'none'})\n\n"
comment += f"Score: {score.score}\n"
comment += f"Reasons:\n"
for r in score.reasons[:5]:
comment += f"- {r}\n"
client.add_comment(args.org, score.repo, score.issue_id, comment)
print(f"Applied {applied} priority changes", file=sys.stderr)
elif args.dry_run:
print(f"\nDry run — {report['summary']['suggested_upgrades'] + report['summary']['suggested_downgrades'] + report['summary']['suggested_new_priorities']} changes would be applied", file=sys.stderr)
if __name__ == "__main__":
main()