diff --git a/scripts/priority_rebalancer.py b/scripts/priority_rebalancer.py new file mode 100644 index 0000000..43d5b66 --- /dev/null +++ b/scripts/priority_rebalancer.py @@ -0,0 +1,682 @@ +#!/usr/bin/env python3 +""" +Priority Rebalancer — Re-evaluate issue priorities based on accumulated data. + +Reads pipeline outputs, knowledge store, and Gitea issues to suggest +priority changes based on what the fleet has learned. + +Usage: + python3 scripts/priority_rebalancer.py --org Timmy_Foundation + python3 scripts/priority_rebalancer.py --org Timmy_Foundation --repo compounding-intelligence + python3 scripts/priority_rebalancer.py --org Timmy_Foundation --dry-run + python3 scripts/priority_rebalancer.py --org Timmy_Foundation --apply + +Output: + metrics/priority_report.json — full analysis + metrics/priority_suggestions.md — human-readable suggestions +""" + +import argparse +import json +import os +import sys +from datetime import datetime, timezone, timedelta +from pathlib import Path +from typing import Dict, List, Any, Optional, Tuple +from dataclasses import dataclass, field, asdict +from collections import Counter, defaultdict +import urllib.request +import urllib.error + + +# ============================================================ +# Data Models +# ============================================================ + +@dataclass +class IssueScore: + issue_id: int + repo: str + title: str + current_labels: List[str] + current_priority: Optional[str] + suggested_priority: Optional[str] + score: float + reasons: List[str] + age_days: int + comment_count: int + assignee: Optional[str] + dependencies: List[str] = field(default_factory=list) + blocking: List[str] = field(default_factory=list) + + +@dataclass +class PipelineSignal: + source: str # "knowledge", "metrics", "sessions", "staleness" + signal_type: str # "stale_knowledge", "high_error_rate", "missing_coverage", etc. + weight: float # 0.0 - 1.0 + detail: str + affected_repos: List[str] = field(default_factory=list) + affected_issues: List[int] = field(default_factory=list) + + +# ============================================================ +# Gitea API Client +# ============================================================ + +class GiteaClient: + def __init__(self, base_url: str, token: str): + self.base_url = base_url.rstrip("/") + self.token = token + + def _request(self, path: str, params: Dict = None) -> Any: + url = f"{self.base_url}/api/v1{path}" + if params: + qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None) + url += f"?{qs}" + + req = urllib.request.Request(url) + req.add_header("Authorization", f"token {self.token}") + req.add_header("Content-Type", "application/json") + + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return json.loads(resp.read().decode()) + except urllib.error.HTTPError as e: + print(f"API error {e.code} for {path}: {e.read().decode()[:200]}", file=sys.stderr) + return None + + def get_org_repos(self, org: str) -> List[Dict]: + repos = [] + page = 1 + while True: + batch = self._request(f"/orgs/{org}/repos", {"limit": 50, "page": page}) + if not batch: + break + repos.extend(batch) + if len(batch) < 50: + break + page += 1 + return repos + + def get_issues(self, org: str, repo: str, state: str = "open") -> List[Dict]: + issues = [] + page = 1 + while True: + batch = self._request(f"/repos/{org}/{repo}/issues", + {"state": state, "limit": 50, "page": page, "type": "issues"}) + if not batch: + break + issues.extend(batch) + if len(batch) < 50: + break + page += 1 + return issues + + def add_label_to_issue(self, org: str, repo: str, issue_num: int, label_ids: List[int]) -> bool: + url = f"{self.base_url}/api/v1/repos/{org}/{repo}/issues/{issue_num}/labels" + data = json.dumps({"labels": label_ids}).encode() + req = urllib.request.Request(url, data=data, method="POST") + req.add_header("Authorization", f"token {self.token}") + req.add_header("Content-Type", "application/json") + try: + with urllib.request.urlopen(req, timeout=15) as resp: + return resp.status == 200 + except Exception: + return False + + def remove_label_from_issue(self, org: str, repo: str, issue_num: int, label_id: int) -> bool: + url = f"{self.base_url}/api/v1/repos/{org}/{repo}/issues/{issue_num}/labels/{label_id}" + req = urllib.request.Request(url, method="DELETE") + req.add_header("Authorization", f"token {self.token}") + try: + with urllib.request.urlopen(req, timeout=15) as resp: + return resp.status == 200 + except Exception: + return False + + def get_repo_labels(self, org: str, repo: str) -> List[Dict]: + labels = [] + page = 1 + while True: + batch = self._request(f"/repos/{org}/{repo}/labels", {"limit": 50, "page": page}) + if not batch: + break + labels.extend(batch) + if len(batch) < 50: + break + page += 1 + return labels + + def add_comment(self, org: str, repo: str, issue_num: int, body: str) -> bool: + url = f"{self.base_url}/api/v1/repos/{org}/{repo}/issues/{issue_num}/comments" + data = json.dumps({"body": body}).encode() + req = urllib.request.Request(url, data=data, method="POST") + req.add_header("Authorization", f"token {self.token}") + req.add_header("Content-Type", "application/json") + try: + with urllib.request.urlopen(req, timeout=15) as resp: + return resp.status == 201 + except Exception: + return False + + +# ============================================================ +# Pipeline Signal Collectors +# ============================================================ + +def collect_knowledge_signals(knowledge_dir: str) -> List[PipelineSignal]: + """Analyze knowledge store for coverage gaps and staleness.""" + signals = [] + index_path = os.path.join(knowledge_dir, "index.json") + + if not os.path.exists(index_path): + signals.append(PipelineSignal( + source="knowledge", + signal_type="missing_index", + weight=0.8, + detail="knowledge/index.json not found — no knowledge base exists" + )) + return signals + + try: + with open(index_path) as f: + data = json.load(f) + except (json.JSONDecodeError, IOError) as e: + signals.append(PipelineSignal( + source="knowledge", + signal_type="corrupt_index", + weight=0.9, + detail=f"knowledge/index.json is corrupt: {e}" + )) + return signals + + facts = data.get("facts", []) + total = len(facts) + + if total == 0: + signals.append(PipelineSignal( + source="knowledge", + signal_type="empty_knowledge", + weight=0.7, + detail="Knowledge store has 0 facts — harvester not running or not finding sessions" + )) + return signals + + # Check staleness + stale_count = 0 + missing_source = 0 + for fact in facts: + status = fact.get("status", "unknown") + if status == "stale": + stale_count += 1 + elif status in ("missing_source", "no_source"): + missing_source += 1 + + if stale_count > 0: + signals.append(PipelineSignal( + source="knowledge", + signal_type="stale_knowledge", + weight=min(1.0, stale_count / max(1, total)), + detail=f"{stale_count}/{total} facts are stale (source files changed)" + )) + + if missing_source > 0: + signals.append(PipelineSignal( + source="knowledge", + signal_type="missing_sources", + weight=min(1.0, missing_source / max(1, total)), + detail=f"{missing_source}/{total} facts have missing source files" + )) + + # Coverage by repo + repo_counts = Counter(f.get("repo", "unknown") for f in facts) + if len(repo_counts) < 3: + signals.append(PipelineSignal( + source="knowledge", + signal_type="low_coverage", + weight=0.5, + detail=f"Knowledge covers only {len(repo_counts)} repos — expand harvester scope", + affected_repos=list(repo_counts.keys()) + )) + + return signals + + +def collect_staleness_signals(scripts_dir: str, knowledge_dir: str) -> List[PipelineSignal]: + """Run staleness checker if available.""" + signals = [] + checker = os.path.join(scripts_dir, "knowledge_staleness_check.py") + index_path = os.path.join(knowledge_dir, "index.json") + + if not os.path.exists(checker) or not os.path.exists(index_path): + return signals + + try: + import subprocess + result = subprocess.run( + ["python3", checker, "--index", index_path, "--json"], + capture_output=True, text=True, timeout=30 + ) + if result.returncode == 0: + data = json.loads(result.stdout) + stale = data.get("stale_count", 0) + total = data.get("total", 0) + if stale > 0: + signals.append(PipelineSignal( + source="staleness", + signal_type="stale_knowledge", + weight=min(1.0, stale / max(1, total)), + detail=f"Staleness checker found {stale}/{total} stale entries" + )) + except Exception: + pass + + return signals + + +def collect_metrics_signals(metrics_dir: str) -> List[PipelineSignal]: + """Analyze metrics directory for pipeline health.""" + signals = [] + + if not os.path.isdir(metrics_dir): + return signals + + files = os.listdir(metrics_dir) + if len(files) <= 1: # Only .gitkeep + signals.append(PipelineSignal( + source="metrics", + signal_type="no_metrics", + weight=0.6, + detail="Metrics directory is empty — measurer pipeline not producing output" + )) + + return signals + + +# ============================================================ +# Priority Scoring Engine +# ============================================================ + +PRIORITY_LEVELS = ["P0", "P1", "P2", "P3", "none"] +PRIORITY_LABELS = {"p0": "P0", "p1": "P1", "p2": "P2", "p3": "P3", + "priority:critical": "P0", "priority:high": "P1", + "priority:medium": "P2", "priority:low": "P3"} + + +def extract_priority(labels: List[str]) -> Optional[str]: + """Extract priority level from issue labels.""" + for label in labels: + lower = label.lower().strip() + if lower in PRIORITY_LABELS: + return PRIORITY_LABELS[lower] + return None + + +def compute_issue_score( + issue: Dict, + repo: str, + signals: List[PipelineSignal], + now: datetime +) -> IssueScore: + """Compute priority score for a single issue.""" + + labels = [l.get("name", "") if isinstance(l, dict) else l for l in issue.get("labels", [])] + title = issue.get("title", "") + issue_id = issue.get("number", 0) + current_priority = extract_priority(labels) + + # Parse dates + created_str = issue.get("created_at", "") + try: + created = datetime.fromisoformat(created_str.replace("Z", "+00:00")) + age_days = (now - created.replace(tzinfo=None)).days + except (ValueError, AttributeError): + age_days = 0 + + comment_count = issue.get("comments", 0) + assignee = None + assignees = issue.get("assignees") or [] + if assignees: + assignee = assignees[0].get("login") if isinstance(assignees[0], dict) else str(assignees[0]) + + # Base score + score = 0.0 + reasons = [] + + # Age factor: older issues drift down unless actively discussed + if age_days > 90 and comment_count < 2: + score -= 15 + reasons.append(f"Dormant: {age_days} days old with only {comment_count} comments") + elif age_days > 30: + score -= 5 + reasons.append(f"Aging: {age_days} days old") + + # Activity factor: recent discussion suggests urgency + if comment_count > 5: + score += 10 + reasons.append(f"Active discussion: {comment_count} comments") + elif comment_count > 0: + score += 3 + + # Assignment factor: unassigned issues need triage + if not assignee: + score += 5 + reasons.append("Unassigned — needs triage") + + # Pipeline signal alignment + for signal in signals: + title_lower = title.lower() + if signal.signal_type == "stale_knowledge" and "stale" in title_lower: + score += signal.weight * 20 + reasons.append(f"Matches signal: {signal.detail}") + elif signal.signal_type == "empty_knowledge" and ("harvester" in title_lower or "knowledge" in title_lower): + score += signal.weight * 25 + reasons.append(f"Critical gap: {signal.detail}") + elif signal.signal_type == "no_metrics" and "measur" in title_lower: + score += signal.weight * 15 + reasons.append(f"Pipeline gap: {signal.detail}") + elif signal.signal_type == "low_coverage" and any(r.lower() in title_lower for r in signal.affected_repos): + score += signal.weight * 10 + reasons.append(f"Coverage gap: {signal.detail}") + + # Keyword boosts + keyword_scores = { + "broken": 20, "bug": 15, "fix": 10, "error": 12, "fail": 15, + "security": 25, "auth": 20, "data loss": 30, "crash": 25, + "blocker": 20, "urgent": 15, "critical": 15, + "epic": 8, "feature": -3, "nice to have": -10, "someday": -15 + } + title_lower = title.lower() + for keyword, boost in keyword_scores.items(): + if keyword in title_lower: + score += boost + if boost > 0: + reasons.append(f"Keyword match: '{keyword}' (+{boost})") + + # Label-based adjustments + for label in labels: + lower = label.lower() + if lower == "pipeline": + score += 5 # Pipeline issues are infrastructure + elif lower == "bug": + score += 12 + elif lower == "enhancement": + score -= 2 + elif lower == "documentation": + score -= 5 + elif "epic" in lower: + score += 3 + + # Determine suggested priority + if score >= 40: + suggested = "P0" + elif score >= 25: + suggested = "P1" + elif score >= 10: + suggested = "P2" + elif score >= 0: + suggested = "P3" + else: + suggested = None # Consider closing or deprioritizing + + return IssueScore( + issue_id=issue_id, + repo=repo, + title=title, + current_labels=labels, + current_priority=current_priority, + suggested_priority=suggested, + score=round(score, 1), + reasons=reasons if reasons else ["No strong signals"], + age_days=age_days, + comment_count=comment_count, + assignee=assignee + ) + + +# ============================================================ +# Report Generation +# ============================================================ + +def generate_report( + scores: List[IssueScore], + signals: List[PipelineSignal], + org: str, + repos_scanned: List[str] +) -> Dict[str, Any]: + """Generate the full priority report.""" + now = datetime.now(timezone.utc).isoformat() + + # Categorize changes + upgrades = [s for s in scores if s.suggested_priority and s.current_priority and + PRIORITY_LEVELS.index(s.suggested_priority) < PRIORITY_LEVELS.index(s.current_priority)] + downgrades = [s for s in scores if s.suggested_priority and s.current_priority and + PRIORITY_LEVELS.index(s.suggested_priority) > PRIORITY_LEVELS.index(s.current_priority)] + new_assignments = [s for s in scores if s.suggested_priority and not s.current_priority] + no_change = [s for s in scores if s.suggested_priority == s.current_priority] + + return { + "generated_at": now, + "org": org, + "repos_scanned": repos_scanned, + "total_issues": len(scores), + "signals": [asdict(s) for s in signals], + "summary": { + "suggested_upgrades": len(upgrades), + "suggested_downgrades": len(downgrades), + "suggested_new_priorities": len(new_assignments), + "unchanged": len(no_change) + }, + "top_priority": [asdict(s) for s in sorted(scores, key=lambda x: x.score, reverse=True)[:10]], + "upgrades": [asdict(s) for s in upgrades], + "downgrades": [asdict(s) for s in downgrades], + "new_assignments": [asdict(s) for s in new_assignments], + "all_scores": [asdict(s) for s in sorted(scores, key=lambda x: x.score, reverse=True)] + } + + +def generate_markdown_report(report: Dict[str, Any]) -> str: + """Generate human-readable markdown report.""" + lines = [] + lines.append("# Priority Rebalancer Report") + lines.append(f"**Generated:** {report['generated_at']}") + lines.append(f"**Org:** {report['org']}") + lines.append(f"**Repos scanned:** {', '.join(report['repos_scanned'])}") + lines.append(f"**Issues analyzed:** {report['total_issues']}") + lines.append("") + + # Signals + if report["signals"]: + lines.append("## Pipeline Signals") + for sig in report["signals"]: + weight_bar = "█" * int(sig["weight"] * 10) + "░" * (10 - int(sig["weight"] * 10)) + lines.append(f"- [{weight_bar}] **{sig['source']}/{sig['signal_type']}** — {sig['detail']}") + lines.append("") + + # Summary + s = report["summary"] + lines.append("## Summary") + lines.append(f"- Suggested upgrades: **{s['suggested_upgrades']}**") + lines.append(f"- Suggested downgrades: **{s['suggested_downgrades']}**") + lines.append(f"- New priority assignments: **{s['suggested_new_priorities']}**") + lines.append(f"- Unchanged: **{s['unchanged']}**") + lines.append("") + + # Top 10 + lines.append("## Top 10 by Score") + for i, item in enumerate(report["top_priority"][:10], 1): + cur = item["current_priority"] or "none" + sug = item["suggested_priority"] or "none" + arrow = "↑" if PRIORITY_LEVELS.index(sug) < PRIORITY_LEVELS.index(cur) else "↓" if PRIORITY_LEVELS.index(sug) > PRIORITY_LEVELS.index(cur) else "→" + lines.append(f"{i}. **[{item['repo']}#{item['issue_id']}]** {item['title']}") + lines.append(f" Score: {item['score']} | Current: {cur} {arrow} Suggested: {sug}") + lines.append(f" Reasons: {'; '.join(item['reasons'][:3])}") + lines.append("") + + # Upgrades + if report["upgrades"]: + lines.append("## Suggested Upgrades") + for item in report["upgrades"]: + lines.append(f"- **[{item['repo']}#{item['issue_id']}]** {item['title']}") + lines.append(f" {item['current_priority']} → {item['suggested_priority']} (score: {item['score']})") + lines.append(f" {'; '.join(item['reasons'][:2])}") + lines.append("") + + # Downgrades + if report["downgrades"]: + lines.append("## Suggested Downgrades") + for item in report["downgrades"]: + lines.append(f"- **[{item['repo']}#{item['issue_id']}]** {item['title']}") + lines.append(f" {item['current_priority']} → {item['suggested_priority']} (score: {item['score']})") + lines.append(f" {'; '.join(item['reasons'][:2])}") + lines.append("") + + # New assignments + if report["new_assignments"]: + lines.append("## New Priority Suggestions (currently unlabelled)") + for item in report["new_assignments"][:20]: + lines.append(f"- **[{item['repo']}#{item['issue_id']}]** {item['title']}") + lines.append(f" Suggested: {item['suggested_priority']} (score: {item['score']})") + lines.append(f" {'; '.join(item['reasons'][:2])}") + lines.append("") + + return "\n".join(lines) + + +# ============================================================ +# Main +# ============================================================ + +def main(): + parser = argparse.ArgumentParser(description="Priority Rebalancer — re-score issues based on pipeline data") + parser.add_argument("--org", default="Timmy_Foundation", help="Gitea org name") + parser.add_argument("--repo", help="Single repo to analyze (default: all)") + parser.add_argument("--base-url", default="https://forge.alexanderwhitestone.com", help="Gitea base URL") + parser.add_argument("--knowledge-dir", default=None, help="Path to knowledge directory") + parser.add_argument("--metrics-dir", default=None, help="Path to metrics directory") + parser.add_argument("--scripts-dir", default=None, help="Path to scripts directory") + parser.add_argument("--output-dir", default=None, help="Path to output directory") + parser.add_argument("--dry-run", action="store_true", help="Show what would change without applying") + parser.add_argument("--apply", action="store_true", help="Apply priority changes via API") + parser.add_argument("--json", action="store_true", help="Output JSON instead of markdown") + + args = parser.parse_args() + + # Resolve paths relative to script location + script_dir = Path(__file__).parent + repo_root = script_dir.parent + + knowledge_dir = args.knowledge_dir or str(repo_root / "knowledge") + metrics_dir = args.metrics_dir or str(repo_root / "metrics") + scripts_dir = args.scripts_dir or str(repo_root / "scripts") + output_dir = args.output_dir or str(repo_root / "metrics") + + # Get token + token = os.environ.get("GITEA_TOKEN") + if not token: + token_path = os.path.expanduser("~/.config/gitea/token") + if os.path.exists(token_path): + with open(token_path) as f: + token = f.read().strip() + + if not token: + print("Error: No Gitea token found. Set GITEA_TOKEN or create ~/.config/gitea/token", file=sys.stderr) + sys.exit(1) + + client = GiteaClient(args.base_url, token) + now = datetime.utcnow() + + # Collect pipeline signals + print("Collecting pipeline signals...", file=sys.stderr) + signals = [] + signals.extend(collect_knowledge_signals(knowledge_dir)) + signals.extend(collect_staleness_signals(scripts_dir, knowledge_dir)) + signals.extend(collect_metrics_signals(metrics_dir)) + print(f" Found {len(signals)} signals", file=sys.stderr) + + # Get repos + if args.repo: + repos = [{"name": args.repo}] + else: + repos = client.get_org_repos(args.org) + + repo_names = [r["name"] for r in repos] + print(f"Scanning {len(repo_names)} repos: {', '.join(repo_names[:5])}{'...' if len(repo_names) > 5 else ''}", file=sys.stderr) + + # Score all issues + all_scores = [] + for repo in repos: + repo_name = repo["name"] + issues = client.get_issues(args.org, repo_name) + print(f" {repo_name}: {len(issues)} open issues", file=sys.stderr) + + for issue in issues: + if issue.get("pull_request"): + continue + score = compute_issue_score(issue, repo_name, signals, now) + all_scores.append(score) + + # Generate report + report = generate_report(all_scores, signals, args.org, repo_names) + + # Output + os.makedirs(output_dir, exist_ok=True) + + if args.json: + print(json.dumps(report, indent=2)) + else: + md = generate_markdown_report(report) + print(md) + + # Write files + report_path = os.path.join(output_dir, "priority_report.json") + with open(report_path, "w") as f: + json.dump(report, f, indent=2) + print(f"\nFull report: {report_path}", file=sys.stderr) + + md_path = os.path.join(output_dir, "priority_suggestions.md") + with open(md_path, "w") as f: + f.write(generate_markdown_report(report)) + print(f"Suggestions: {md_path}", file=sys.stderr) + + # Apply changes if requested + if args.apply: + print("\nApplying priority changes...", file=sys.stderr) + applied = 0 + + # Get label IDs for priority labels + priority_label_map = {} + for repo_name in repo_names: + labels = client.get_repo_labels(args.org, repo_name) + for label in labels: + name = label.get("name", "").lower() + if name in ("p0", "p1", "p2", "p3"): + priority_label_map[(repo_name, name)] = label["id"] + + for score in all_scores: + if score.suggested_priority and score.suggested_priority != score.current_priority: + sug_lower = score.suggested_priority.lower() + label_id = priority_label_map.get((score.repo, sug_lower)) + if label_id: + ok = client.add_label_to_issue(args.org, score.repo, score.issue_id, [label_id]) + if ok: + applied += 1 + print(f" Applied: [{score.repo}#{score.issue_id}] → {score.suggested_priority}", file=sys.stderr) + + # Add comment explaining the change + comment = f"**Priority Rebalancer** suggested: **{score.suggested_priority}** (was: {score.current_priority or 'none'})\n\n" + comment += f"Score: {score.score}\n" + comment += f"Reasons:\n" + for r in score.reasons[:5]: + comment += f"- {r}\n" + client.add_comment(args.org, score.repo, score.issue_id, comment) + + print(f"Applied {applied} priority changes", file=sys.stderr) + + elif args.dry_run: + print(f"\nDry run — {report['summary']['suggested_upgrades'] + report['summary']['suggested_downgrades'] + report['summary']['suggested_new_priorities']} changes would be applied", file=sys.stderr) + + +if __name__ == "__main__": + main() diff --git a/scripts/test_priority_rebalancer.py b/scripts/test_priority_rebalancer.py new file mode 100644 index 0000000..a10587f --- /dev/null +++ b/scripts/test_priority_rebalancer.py @@ -0,0 +1,305 @@ +#!/usr/bin/env python3 +""" +Tests for Priority Rebalancer +""" + +import json +import os +import sys +import tempfile +from datetime import datetime, timedelta +from pathlib import Path + +# Add script dir to path +sys.path.insert(0, str(Path(__file__).parent)) + +from priority_rebalancer import ( + GiteaClient, + IssueScore, + PipelineSignal, + compute_issue_score, + collect_knowledge_signals, + collect_metrics_signals, + extract_priority, + generate_report, + generate_markdown_report, + PRIORITY_LEVELS, +) + +# ============================================================ +# Test Helpers +# ============================================================ + +PASS = 0 +FAIL = 0 + +def test(name): + def decorator(fn): + global PASS, FAIL + try: + fn() + PASS += 1 + print(f" [PASS] {name}") + except Exception as e: + FAIL += 1 + print(f" [FAIL] {name}: {e}") + return decorator + +def assert_eq(a, b, msg=""): + if a != b: + raise AssertionError(f"{msg} expected {b!r}, got {a!r}") + +def assert_true(v, msg=""): + if not v: + raise AssertionError(msg or "Expected True") + +def assert_false(v, msg=""): + if v: + raise AssertionError(msg or "Expected False") + + +# ============================================================ +# Priority Extraction Tests +# ============================================================ + +print("=== Priority Rebalancer Tests ===\n") + +print("-- Priority Extraction --") + +@test("extract P0 from label") +def _(): + assert_eq(extract_priority(["P0", "bug"]), "P0") + +@test("extract P1 from priority:high") +def _(): + assert_eq(extract_priority(["priority:high"]), "P1") + +@test("extract P2 from priority:medium") +def _(): + assert_eq(extract_priority(["priority:medium"]), "P2") + +@test("extract P3 from priority:low") +def _(): + assert_eq(extract_priority(["priority:low"]), "P3") + +@test("returns None for no priority") +def _(): + assert_eq(extract_priority(["bug", "enhancement"]), None) + +@test("case insensitive") +def _(): + assert_eq(extract_priority(["p1"]), "P1") + assert_eq(extract_priority(["PRIORITY:CRITICAL"]), "P0") + + +# ============================================================ +# Issue Scoring Tests +# ============================================================ + +print("\n-- Issue Scoring --") + +def make_issue(**kwargs): + defaults = { + "number": 1, + "title": "Test issue", + "labels": [], + "created_at": (datetime.utcnow() - timedelta(days=5)).isoformat() + "Z", + "comments": 0, + "assignees": None, + } + defaults.update(kwargs) + return defaults + +@test("bug gets score boost") +def _(): + issue = make_issue(title="Incorrect output format", labels=["bug"]) + score = compute_issue_score(issue, "test-repo", [], datetime.utcnow()) + assert_true(score.score > 0, f"Bug should boost score, got {score.score}") + # Bug label alone should be P2 or P3 (not P0) + assert_true(score.suggested_priority in ("P2", "P3"), + f"Bug label alone should be P2/P3, got {score.suggested_priority}") + +@test("security gets high score") +def _(): + issue = make_issue(title="Security: auth bypass", labels=["bug"]) + score = compute_issue_score(issue, "test-repo", [], datetime.utcnow()) + assert_true(score.score >= 25, f"Security should score high, got {score.score}") + +@test("old dormant issue gets penalized") +def _(): + issue = make_issue( + title="Some old feature", + created_at=(datetime.utcnow() - timedelta(days=120)).isoformat() + "Z", + comments=0 + ) + score = compute_issue_score(issue, "test-repo", [], datetime.utcnow()) + assert_true(score.score < 0, f"Old dormant should be negative, got {score.score}") + assert_true(any("Dormant" in r for r in score.reasons), "Should mention dormancy") + +@test("active discussion boosts score") +def _(): + issue = make_issue(title="Important fix", comments=8) + score = compute_issue_score(issue, "test-repo", [], datetime.utcnow()) + assert_true(score.score > 5, f"Active discussion should boost, got {score.score}") + assert_true(any("Active" in r for r in score.reasons)) + +@test("unassigned gets slight boost") +def _(): + issue = make_issue(title="Fix bug", assignees=None) + score = compute_issue_score(issue, "test-repo", [], datetime.utcnow()) + assert_true(any("Unassigned" in r for r in score.reasons)) + +@test("assigned issue notes assignee") +def _(): + issue = make_issue(title="Fix bug", assignees=[{"login": "alice"}]) + score = compute_issue_score(issue, "test-repo", [], datetime.utcnow()) + assert_eq(score.assignee, "alice") + +@test("nice-to-have gets penalized") +def _(): + issue = make_issue(title="Nice to have: fancy animation") + score = compute_issue_score(issue, "test-repo", [], datetime.utcnow()) + assert_true(score.score < 0, f"Nice-to-have should be negative, got {score.score}") + + +# ============================================================ +# Pipeline Signal Tests +# ============================================================ + +print("\n-- Pipeline Signals --") + +@test("signal alignment boosts matching issues") +def _(): + signals = [PipelineSignal( + source="knowledge", + signal_type="stale_knowledge", + weight=0.8, + detail="20 stale facts" + )] + issue = make_issue(title="Fix stale knowledge entries") + score = compute_issue_score(issue, "test-repo", signals, datetime.utcnow()) + assert_true(any("Matches signal" in r for r in score.reasons)) + +@test("empty knowledge boosts harvester issues") +def _(): + signals = [PipelineSignal( + source="knowledge", + signal_type="empty_knowledge", + weight=0.7, + detail="0 facts" + )] + issue = make_issue(title="Implement harvester pipeline") + score = compute_issue_score(issue, "test-repo", signals, datetime.utcnow()) + assert_true(any("Critical gap" in r for r in score.reasons)) + + +# ============================================================ +# Knowledge Signal Collection Tests +# ============================================================ + +print("\n-- Knowledge Signal Collection --") + +@test("missing index generates signal") +def _(): + with tempfile.TemporaryDirectory() as tmpdir: + signals = collect_knowledge_signals(tmpdir) + assert_true(len(signals) > 0) + assert_eq(signals[0].signal_type, "missing_index") + +@test("empty knowledge generates signal") +def _(): + with tempfile.TemporaryDirectory() as tmpdir: + idx = os.path.join(tmpdir, "index.json") + with open(idx, "w") as f: + json.dump({"facts": []}, f) + signals = collect_knowledge_signals(tmpdir) + assert_true(any(s.signal_type == "empty_knowledge" for s in signals)) + +@test("corrupt index generates signal") +def _(): + with tempfile.TemporaryDirectory() as tmpdir: + idx = os.path.join(tmpdir, "index.json") + with open(idx, "w") as f: + f.write("not json {{{") + signals = collect_knowledge_signals(tmpdir) + assert_true(any(s.signal_type == "corrupt_index" for s in signals)) + +@test("knowledge with facts passes") +def _(): + with tempfile.TemporaryDirectory() as tmpdir: + idx = os.path.join(tmpdir, "index.json") + with open(idx, "w") as f: + json.dump({"facts": [ + {"id": 1, "repo": "test", "status": "fresh"}, + {"id": 2, "repo": "test", "status": "fresh"}, + ]}, f) + signals = collect_knowledge_signals(tmpdir) + # Should not generate missing_index or empty_knowledge + assert_false(any(s.signal_type in ("missing_index", "empty_knowledge") for s in signals)) + + +# ============================================================ +# Metrics Signal Collection Tests +# ============================================================ + +print("\n-- Metrics Signal Collection --") + +@test("empty metrics dir generates signal") +def _(): + with tempfile.TemporaryDirectory() as tmpdir: + signals = collect_metrics_signals(tmpdir) + assert_true(any(s.signal_type == "no_metrics" for s in signals)) + +@test("metrics with files passes") +def _(): + with tempfile.TemporaryDirectory() as tmpdir: + # Create files (simulating real metrics dir with .gitkeep + actual files) + with open(os.path.join(tmpdir, ".gitkeep"), "w") as f: + f.write("") + with open(os.path.join(tmpdir, "report.json"), "w") as f: + f.write("{}") + signals = collect_metrics_signals(tmpdir) + assert_false(any(s.signal_type == "no_metrics" for s in signals)) + + +# ============================================================ +# Report Generation Tests +# ============================================================ + +print("\n-- Report Generation --") + +@test("report has correct structure") +def _(): + scores = [ + IssueScore(1, "repo1", "Bug fix", ["bug"], None, "P1", 30.0, ["test"], 5, 3, None), + IssueScore(2, "repo1", "Feature", ["enhancement"], "P3", None, -5.0, ["test"], 60, 0, "alice"), + ] + signals = [PipelineSignal("knowledge", "stale_knowledge", 0.5, "10 stale")] + report = generate_report(scores, signals, "test-org", ["repo1"]) + + assert_eq(report["org"], "test-org") + assert_eq(report["total_issues"], 2) + assert_true("generated_at" in report) + assert_true("summary" in report) + assert_true("top_priority" in report) + assert_eq(report["summary"]["suggested_new_priorities"], 1) + +@test("markdown report is non-empty") +def _(): + scores = [IssueScore(1, "repo1", "Test", ["bug"], None, "P2", 15.0, ["reason"], 5, 0, None)] + report = generate_report(scores, [], "test-org", ["repo1"]) + md = generate_markdown_report(report) + assert_true(len(md) > 100) + assert_true("Priority Rebalancer Report" in md) + assert_true("Top 10" in md) + + +# ============================================================ +# Summary +# ============================================================ + +print(f"\n=== Summary ===") +print(f"Total: {PASS + FAIL} | Passed: {PASS} | Failed: {FAIL}") + +if FAIL > 0: + sys.exit(1)