Compare commits

..

2 Commits

6 changed files with 273 additions and 1247 deletions

View File

@@ -0,0 +1,162 @@
#!/usr/bin/env python3
"""
Gitea Issue Body Parser
Extracts structured data from Gitea issue markdown bodies:
- Title
- Context section
- Acceptance criteria (checkboxes)
- Labels
- Epic/parent references
Usage:
python3 scripts/gitea_issue_parser.py <issue_body.txt
python3 scripts/gitea_issue_parser.py --url https://forge.../api/v1/repos/.../issues/123
echo "issue body" | python3 scripts/gitea_issue_parser.py --stdin
Output: JSON with {title, context, criteria[], labels[], epic_ref}
"""
import argparse
import json
import re
import sys
from typing import Optional
def parse_issue_body(body: str, title: str = "", labels: list = None) -> dict:
"""Parse a Gitea issue body into structured JSON."""
result = {
"title": title,
"context": "",
"criteria": [],
"labels": labels or [],
"epic_ref": None,
"sections": {},
}
if not body:
return result
# Extract epic/parent reference from title or body
epic_pattern = r"#(\d+)"
title_refs = re.findall(epic_pattern, title)
body_refs = re.findall(epic_pattern, body[:200]) # Check early body refs
# Look for "Closes #N" or "Part of #N" or "Epic: #N"
close_match = re.search(r"(?:Closes?|Fixes?|Resolves?)\s+#(\d+)", body, re.IGNORECASE)
part_match = re.search(r"(?:Part of|Epic|Parent|Blocks?)\s+#(\d+)", body, re.IGNORECASE)
if close_match:
result["epic_ref"] = f"#{close_match.group(1)}"
elif part_match:
result["epic_ref"] = f"#{part_match.group(1)}"
elif title_refs:
result["epic_ref"] = f"#{title_refs[0]}"
elif body_refs:
result["epic_ref"] = f"#{body_refs[0]}"
# Split into sections by ## headers
section_pattern = r"^##\s+(.+)$"
lines = body.split("\n")
current_section = None
current_content = []
for line in lines:
header_match = re.match(section_pattern, line)
if header_match:
# Save previous section
if current_section:
result["sections"][current_section] = "\n".join(current_content).strip()
current_section = header_match.group(1).strip().lower()
current_content = []
else:
current_content.append(line)
# Save last section
if current_section:
result["sections"][current_section] = "\n".join(current_content).strip()
# Extract context
for key in ["context", "background", "description", "problem"]:
if key in result["sections"]:
result["context"] = result["sections"][key]
break
# Extract acceptance criteria (checkboxes)
criteria_section = None
for key in ["acceptance criteria", "acceptance_criteria", "criteria", "requirements", "definition of done"]:
if key in result["sections"]:
criteria_section = result["sections"][key]
break
if criteria_section:
checkbox_pattern = r"-\s*\[[ xX]?\]\s*(.+)"
for match in re.finditer(checkbox_pattern, criteria_section):
result["criteria"].append(match.group(1).strip())
# Also try plain numbered/bulleted lists if no checkboxes found
if not result["criteria"]:
list_pattern = r"^\s*(?:\d+\.|-|\*)\s+(.+)"
for match in re.finditer(list_pattern, criteria_section, re.MULTILINE):
result["criteria"].append(match.group(1).strip())
# If no sectioned criteria found, scan whole body for checkboxes
if not result["criteria"]:
for match in re.finditer(r"-\s*\[[ xX]?\]\s*(.+)", body):
result["criteria"].append(match.group(1).strip())
return result
def parse_from_url(api_url: str, token: str = None) -> dict:
"""Parse an issue from a Gitea API URL."""
import urllib.request
headers = {}
if token:
headers["Authorization"] = f"token {token}"
req = urllib.request.Request(api_url, headers=headers)
resp = json.loads(urllib.request.urlopen(req, timeout=30).read())
title = resp.get("title", "")
body = resp.get("body", "")
labels = [l["name"] for l in resp.get("labels", [])]
return parse_issue_body(body, title, labels)
def main():
parser = argparse.ArgumentParser(description="Parse Gitea issue body into structured JSON")
parser.add_argument("input", nargs="?", help="Issue body file (or - for stdin)")
parser.add_argument("--url", help="Gitea API URL for the issue")
parser.add_argument("--stdin", action="store_true", help="Read from stdin")
parser.add_argument("--token", help="Gitea API token (or set GITEA_TOKEN env var)")
parser.add_argument("--title", default="", help="Issue title (for epic ref extraction)")
parser.add_argument("--labels", nargs="*", default=[], help="Issue labels")
parser.add_argument("--pretty", action="store_true", help="Pretty-print JSON output")
args = parser.parse_args()
import os
token = args.token or os.environ.get("GITEA_TOKEN")
if args.url:
result = parse_from_url(args.url, token)
elif args.stdin or (args.input and args.input == "-"):
body = sys.stdin.read()
result = parse_issue_body(body, args.title, args.labels)
elif args.input:
with open(args.input) as f:
body = f.read()
result = parse_issue_body(body, args.title, args.labels)
else:
parser.print_help()
sys.exit(1)
indent = 2 if args.pretty else None
print(json.dumps(result, indent=indent))
if __name__ == "__main__":
main()

View File

@@ -1,131 +0,0 @@
#!/usr/bin/env python3
"""
Knowledge Store Staleness Detector — Detect stale knowledge entries by comparing source file hashes.
Usage:
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --json
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --fix
"""
import argparse
import hashlib
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Any, Optional
def compute_file_hash(filepath: str) -> Optional[str]:
"""Compute SHA-256 hash of a file. Returns None if file doesn't exist."""
try:
with open(filepath, "rb") as f:
return "sha256:" + hashlib.sha256(f.read()).hexdigest()
except (FileNotFoundError, IsADirectoryError, PermissionError):
return None
def check_staleness(index_path: str, repo_root: str = ".") -> List[Dict[str, Any]]:
"""Check all entries in knowledge index for staleness.
Returns list of entries with staleness info:
- status: "fresh" | "stale" | "missing_source" | "no_hash"
- current_hash: computed hash (if source exists)
- stored_hash: hash from index
"""
with open(index_path) as f:
data = json.load(f)
facts = data.get("facts", [])
results = []
for entry in facts:
source_file = entry.get("source_file")
stored_hash = entry.get("source_hash")
if not source_file:
results.append({**entry, "status": "no_source", "current_hash": None})
continue
full_path = os.path.join(repo_root, source_file)
current_hash = compute_file_hash(full_path)
if current_hash is None:
results.append({**entry, "status": "missing_source", "current_hash": None})
elif not stored_hash:
results.append({**entry, "status": "no_hash", "current_hash": current_hash})
elif current_hash != stored_hash:
results.append({**entry, "status": "stale", "current_hash": current_hash})
else:
results.append({**entry, "status": "fresh", "current_hash": current_hash})
return results
def fix_hashes(index_path: str, repo_root: str = ".") -> int:
"""Add hashes to entries missing them. Returns count of fixed entries."""
with open(index_path) as f:
data = json.load(f)
fixed = 0
for entry in data.get("facts", []):
if entry.get("source_hash"):
continue
source_file = entry.get("source_file")
if not source_file:
continue
full_path = os.path.join(repo_root, source_file)
h = compute_file_hash(full_path)
if h:
entry["source_hash"] = h
fixed += 1
with open(index_path, "w") as f:
json.dump(data, f, indent=2)
return fixed
def main():
parser = argparse.ArgumentParser(description="Check knowledge store staleness")
parser.add_argument("--index", required=True, help="Path to knowledge/index.json")
parser.add_argument("--repo", default=".", help="Repo root for source file resolution")
parser.add_argument("--json", action="store_true", help="Output as JSON")
parser.add_argument("--fix", action="store_true", help="Add hashes to entries missing them")
args = parser.parse_args()
if args.fix:
fixed = fix_hashes(args.index, args.repo)
print(f"Fixed {fixed} entries with missing hashes.")
return
results = check_staleness(args.index, args.repo)
if args.json:
print(json.dumps(results, indent=2))
else:
stale = [r for r in results if r["status"] != "fresh"]
fresh = [r for r in results if r["status"] == "fresh"]
print(f"Knowledge Store Staleness Check")
print(f" Total entries: {len(results)}")
print(f" Fresh: {len(fresh)}")
print(f" Stale/Issues: {len(stale)}")
print()
if stale:
print("Issues found:")
for r in stale:
status = r["status"]
fact = r.get("fact", "?")[:60]
source = r.get("source_file", "?")
print(f" [{status}] {source}: {fact}")
else:
print("All entries are fresh!")
if __name__ == "__main__":
main()

View File

@@ -1,682 +0,0 @@
#!/usr/bin/env python3
"""
Priority Rebalancer — Re-evaluate issue priorities based on accumulated data.
Reads pipeline outputs, knowledge store, and Gitea issues to suggest
priority changes based on what the fleet has learned.
Usage:
python3 scripts/priority_rebalancer.py --org Timmy_Foundation
python3 scripts/priority_rebalancer.py --org Timmy_Foundation --repo compounding-intelligence
python3 scripts/priority_rebalancer.py --org Timmy_Foundation --dry-run
python3 scripts/priority_rebalancer.py --org Timmy_Foundation --apply
Output:
metrics/priority_report.json — full analysis
metrics/priority_suggestions.md — human-readable suggestions
"""
import argparse
import json
import os
import sys
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, field, asdict
from collections import Counter, defaultdict
import urllib.request
import urllib.error
# ============================================================
# Data Models
# ============================================================
@dataclass
class IssueScore:
issue_id: int
repo: str
title: str
current_labels: List[str]
current_priority: Optional[str]
suggested_priority: Optional[str]
score: float
reasons: List[str]
age_days: int
comment_count: int
assignee: Optional[str]
dependencies: List[str] = field(default_factory=list)
blocking: List[str] = field(default_factory=list)
@dataclass
class PipelineSignal:
source: str # "knowledge", "metrics", "sessions", "staleness"
signal_type: str # "stale_knowledge", "high_error_rate", "missing_coverage", etc.
weight: float # 0.0 - 1.0
detail: str
affected_repos: List[str] = field(default_factory=list)
affected_issues: List[int] = field(default_factory=list)
# ============================================================
# Gitea API Client
# ============================================================
class GiteaClient:
def __init__(self, base_url: str, token: str):
self.base_url = base_url.rstrip("/")
self.token = token
def _request(self, path: str, params: Dict = None) -> Any:
url = f"{self.base_url}/api/v1{path}"
if params:
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
url += f"?{qs}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
print(f"API error {e.code} for {path}: {e.read().decode()[:200]}", file=sys.stderr)
return None
def get_org_repos(self, org: str) -> List[Dict]:
repos = []
page = 1
while True:
batch = self._request(f"/orgs/{org}/repos", {"limit": 50, "page": page})
if not batch:
break
repos.extend(batch)
if len(batch) < 50:
break
page += 1
return repos
def get_issues(self, org: str, repo: str, state: str = "open") -> List[Dict]:
issues = []
page = 1
while True:
batch = self._request(f"/repos/{org}/{repo}/issues",
{"state": state, "limit": 50, "page": page, "type": "issues"})
if not batch:
break
issues.extend(batch)
if len(batch) < 50:
break
page += 1
return issues
def add_label_to_issue(self, org: str, repo: str, issue_num: int, label_ids: List[int]) -> bool:
url = f"{self.base_url}/api/v1/repos/{org}/{repo}/issues/{issue_num}/labels"
data = json.dumps({"labels": label_ids}).encode()
req = urllib.request.Request(url, data=data, method="POST")
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return resp.status == 200
except Exception:
return False
def remove_label_from_issue(self, org: str, repo: str, issue_num: int, label_id: int) -> bool:
url = f"{self.base_url}/api/v1/repos/{org}/{repo}/issues/{issue_num}/labels/{label_id}"
req = urllib.request.Request(url, method="DELETE")
req.add_header("Authorization", f"token {self.token}")
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return resp.status == 200
except Exception:
return False
def get_repo_labels(self, org: str, repo: str) -> List[Dict]:
labels = []
page = 1
while True:
batch = self._request(f"/repos/{org}/{repo}/labels", {"limit": 50, "page": page})
if not batch:
break
labels.extend(batch)
if len(batch) < 50:
break
page += 1
return labels
def add_comment(self, org: str, repo: str, issue_num: int, body: str) -> bool:
url = f"{self.base_url}/api/v1/repos/{org}/{repo}/issues/{issue_num}/comments"
data = json.dumps({"body": body}).encode()
req = urllib.request.Request(url, data=data, method="POST")
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return resp.status == 201
except Exception:
return False
# ============================================================
# Pipeline Signal Collectors
# ============================================================
def collect_knowledge_signals(knowledge_dir: str) -> List[PipelineSignal]:
"""Analyze knowledge store for coverage gaps and staleness."""
signals = []
index_path = os.path.join(knowledge_dir, "index.json")
if not os.path.exists(index_path):
signals.append(PipelineSignal(
source="knowledge",
signal_type="missing_index",
weight=0.8,
detail="knowledge/index.json not found — no knowledge base exists"
))
return signals
try:
with open(index_path) as f:
data = json.load(f)
except (json.JSONDecodeError, IOError) as e:
signals.append(PipelineSignal(
source="knowledge",
signal_type="corrupt_index",
weight=0.9,
detail=f"knowledge/index.json is corrupt: {e}"
))
return signals
facts = data.get("facts", [])
total = len(facts)
if total == 0:
signals.append(PipelineSignal(
source="knowledge",
signal_type="empty_knowledge",
weight=0.7,
detail="Knowledge store has 0 facts — harvester not running or not finding sessions"
))
return signals
# Check staleness
stale_count = 0
missing_source = 0
for fact in facts:
status = fact.get("status", "unknown")
if status == "stale":
stale_count += 1
elif status in ("missing_source", "no_source"):
missing_source += 1
if stale_count > 0:
signals.append(PipelineSignal(
source="knowledge",
signal_type="stale_knowledge",
weight=min(1.0, stale_count / max(1, total)),
detail=f"{stale_count}/{total} facts are stale (source files changed)"
))
if missing_source > 0:
signals.append(PipelineSignal(
source="knowledge",
signal_type="missing_sources",
weight=min(1.0, missing_source / max(1, total)),
detail=f"{missing_source}/{total} facts have missing source files"
))
# Coverage by repo
repo_counts = Counter(f.get("repo", "unknown") for f in facts)
if len(repo_counts) < 3:
signals.append(PipelineSignal(
source="knowledge",
signal_type="low_coverage",
weight=0.5,
detail=f"Knowledge covers only {len(repo_counts)} repos — expand harvester scope",
affected_repos=list(repo_counts.keys())
))
return signals
def collect_staleness_signals(scripts_dir: str, knowledge_dir: str) -> List[PipelineSignal]:
"""Run staleness checker if available."""
signals = []
checker = os.path.join(scripts_dir, "knowledge_staleness_check.py")
index_path = os.path.join(knowledge_dir, "index.json")
if not os.path.exists(checker) or not os.path.exists(index_path):
return signals
try:
import subprocess
result = subprocess.run(
["python3", checker, "--index", index_path, "--json"],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0:
data = json.loads(result.stdout)
stale = data.get("stale_count", 0)
total = data.get("total", 0)
if stale > 0:
signals.append(PipelineSignal(
source="staleness",
signal_type="stale_knowledge",
weight=min(1.0, stale / max(1, total)),
detail=f"Staleness checker found {stale}/{total} stale entries"
))
except Exception:
pass
return signals
def collect_metrics_signals(metrics_dir: str) -> List[PipelineSignal]:
"""Analyze metrics directory for pipeline health."""
signals = []
if not os.path.isdir(metrics_dir):
return signals
files = os.listdir(metrics_dir)
if len(files) <= 1: # Only .gitkeep
signals.append(PipelineSignal(
source="metrics",
signal_type="no_metrics",
weight=0.6,
detail="Metrics directory is empty — measurer pipeline not producing output"
))
return signals
# ============================================================
# Priority Scoring Engine
# ============================================================
PRIORITY_LEVELS = ["P0", "P1", "P2", "P3", "none"]
PRIORITY_LABELS = {"p0": "P0", "p1": "P1", "p2": "P2", "p3": "P3",
"priority:critical": "P0", "priority:high": "P1",
"priority:medium": "P2", "priority:low": "P3"}
def extract_priority(labels: List[str]) -> Optional[str]:
"""Extract priority level from issue labels."""
for label in labels:
lower = label.lower().strip()
if lower in PRIORITY_LABELS:
return PRIORITY_LABELS[lower]
return None
def compute_issue_score(
issue: Dict,
repo: str,
signals: List[PipelineSignal],
now: datetime
) -> IssueScore:
"""Compute priority score for a single issue."""
labels = [l.get("name", "") if isinstance(l, dict) else l for l in issue.get("labels", [])]
title = issue.get("title", "")
issue_id = issue.get("number", 0)
current_priority = extract_priority(labels)
# Parse dates
created_str = issue.get("created_at", "")
try:
created = datetime.fromisoformat(created_str.replace("Z", "+00:00"))
age_days = (now - created.replace(tzinfo=None)).days
except (ValueError, AttributeError):
age_days = 0
comment_count = issue.get("comments", 0)
assignee = None
assignees = issue.get("assignees") or []
if assignees:
assignee = assignees[0].get("login") if isinstance(assignees[0], dict) else str(assignees[0])
# Base score
score = 0.0
reasons = []
# Age factor: older issues drift down unless actively discussed
if age_days > 90 and comment_count < 2:
score -= 15
reasons.append(f"Dormant: {age_days} days old with only {comment_count} comments")
elif age_days > 30:
score -= 5
reasons.append(f"Aging: {age_days} days old")
# Activity factor: recent discussion suggests urgency
if comment_count > 5:
score += 10
reasons.append(f"Active discussion: {comment_count} comments")
elif comment_count > 0:
score += 3
# Assignment factor: unassigned issues need triage
if not assignee:
score += 5
reasons.append("Unassigned — needs triage")
# Pipeline signal alignment
for signal in signals:
title_lower = title.lower()
if signal.signal_type == "stale_knowledge" and "stale" in title_lower:
score += signal.weight * 20
reasons.append(f"Matches signal: {signal.detail}")
elif signal.signal_type == "empty_knowledge" and ("harvester" in title_lower or "knowledge" in title_lower):
score += signal.weight * 25
reasons.append(f"Critical gap: {signal.detail}")
elif signal.signal_type == "no_metrics" and "measur" in title_lower:
score += signal.weight * 15
reasons.append(f"Pipeline gap: {signal.detail}")
elif signal.signal_type == "low_coverage" and any(r.lower() in title_lower for r in signal.affected_repos):
score += signal.weight * 10
reasons.append(f"Coverage gap: {signal.detail}")
# Keyword boosts
keyword_scores = {
"broken": 20, "bug": 15, "fix": 10, "error": 12, "fail": 15,
"security": 25, "auth": 20, "data loss": 30, "crash": 25,
"blocker": 20, "urgent": 15, "critical": 15,
"epic": 8, "feature": -3, "nice to have": -10, "someday": -15
}
title_lower = title.lower()
for keyword, boost in keyword_scores.items():
if keyword in title_lower:
score += boost
if boost > 0:
reasons.append(f"Keyword match: '{keyword}' (+{boost})")
# Label-based adjustments
for label in labels:
lower = label.lower()
if lower == "pipeline":
score += 5 # Pipeline issues are infrastructure
elif lower == "bug":
score += 12
elif lower == "enhancement":
score -= 2
elif lower == "documentation":
score -= 5
elif "epic" in lower:
score += 3
# Determine suggested priority
if score >= 40:
suggested = "P0"
elif score >= 25:
suggested = "P1"
elif score >= 10:
suggested = "P2"
elif score >= 0:
suggested = "P3"
else:
suggested = None # Consider closing or deprioritizing
return IssueScore(
issue_id=issue_id,
repo=repo,
title=title,
current_labels=labels,
current_priority=current_priority,
suggested_priority=suggested,
score=round(score, 1),
reasons=reasons if reasons else ["No strong signals"],
age_days=age_days,
comment_count=comment_count,
assignee=assignee
)
# ============================================================
# Report Generation
# ============================================================
def generate_report(
scores: List[IssueScore],
signals: List[PipelineSignal],
org: str,
repos_scanned: List[str]
) -> Dict[str, Any]:
"""Generate the full priority report."""
now = datetime.now(timezone.utc).isoformat()
# Categorize changes
upgrades = [s for s in scores if s.suggested_priority and s.current_priority and
PRIORITY_LEVELS.index(s.suggested_priority) < PRIORITY_LEVELS.index(s.current_priority)]
downgrades = [s for s in scores if s.suggested_priority and s.current_priority and
PRIORITY_LEVELS.index(s.suggested_priority) > PRIORITY_LEVELS.index(s.current_priority)]
new_assignments = [s for s in scores if s.suggested_priority and not s.current_priority]
no_change = [s for s in scores if s.suggested_priority == s.current_priority]
return {
"generated_at": now,
"org": org,
"repos_scanned": repos_scanned,
"total_issues": len(scores),
"signals": [asdict(s) for s in signals],
"summary": {
"suggested_upgrades": len(upgrades),
"suggested_downgrades": len(downgrades),
"suggested_new_priorities": len(new_assignments),
"unchanged": len(no_change)
},
"top_priority": [asdict(s) for s in sorted(scores, key=lambda x: x.score, reverse=True)[:10]],
"upgrades": [asdict(s) for s in upgrades],
"downgrades": [asdict(s) for s in downgrades],
"new_assignments": [asdict(s) for s in new_assignments],
"all_scores": [asdict(s) for s in sorted(scores, key=lambda x: x.score, reverse=True)]
}
def generate_markdown_report(report: Dict[str, Any]) -> str:
"""Generate human-readable markdown report."""
lines = []
lines.append("# Priority Rebalancer Report")
lines.append(f"**Generated:** {report['generated_at']}")
lines.append(f"**Org:** {report['org']}")
lines.append(f"**Repos scanned:** {', '.join(report['repos_scanned'])}")
lines.append(f"**Issues analyzed:** {report['total_issues']}")
lines.append("")
# Signals
if report["signals"]:
lines.append("## Pipeline Signals")
for sig in report["signals"]:
weight_bar = "" * int(sig["weight"] * 10) + "" * (10 - int(sig["weight"] * 10))
lines.append(f"- [{weight_bar}] **{sig['source']}/{sig['signal_type']}** — {sig['detail']}")
lines.append("")
# Summary
s = report["summary"]
lines.append("## Summary")
lines.append(f"- Suggested upgrades: **{s['suggested_upgrades']}**")
lines.append(f"- Suggested downgrades: **{s['suggested_downgrades']}**")
lines.append(f"- New priority assignments: **{s['suggested_new_priorities']}**")
lines.append(f"- Unchanged: **{s['unchanged']}**")
lines.append("")
# Top 10
lines.append("## Top 10 by Score")
for i, item in enumerate(report["top_priority"][:10], 1):
cur = item["current_priority"] or "none"
sug = item["suggested_priority"] or "none"
arrow = "" if PRIORITY_LEVELS.index(sug) < PRIORITY_LEVELS.index(cur) else "" if PRIORITY_LEVELS.index(sug) > PRIORITY_LEVELS.index(cur) else ""
lines.append(f"{i}. **[{item['repo']}#{item['issue_id']}]** {item['title']}")
lines.append(f" Score: {item['score']} | Current: {cur} {arrow} Suggested: {sug}")
lines.append(f" Reasons: {'; '.join(item['reasons'][:3])}")
lines.append("")
# Upgrades
if report["upgrades"]:
lines.append("## Suggested Upgrades")
for item in report["upgrades"]:
lines.append(f"- **[{item['repo']}#{item['issue_id']}]** {item['title']}")
lines.append(f" {item['current_priority']}{item['suggested_priority']} (score: {item['score']})")
lines.append(f" {'; '.join(item['reasons'][:2])}")
lines.append("")
# Downgrades
if report["downgrades"]:
lines.append("## Suggested Downgrades")
for item in report["downgrades"]:
lines.append(f"- **[{item['repo']}#{item['issue_id']}]** {item['title']}")
lines.append(f" {item['current_priority']}{item['suggested_priority']} (score: {item['score']})")
lines.append(f" {'; '.join(item['reasons'][:2])}")
lines.append("")
# New assignments
if report["new_assignments"]:
lines.append("## New Priority Suggestions (currently unlabelled)")
for item in report["new_assignments"][:20]:
lines.append(f"- **[{item['repo']}#{item['issue_id']}]** {item['title']}")
lines.append(f" Suggested: {item['suggested_priority']} (score: {item['score']})")
lines.append(f" {'; '.join(item['reasons'][:2])}")
lines.append("")
return "\n".join(lines)
# ============================================================
# Main
# ============================================================
def main():
parser = argparse.ArgumentParser(description="Priority Rebalancer — re-score issues based on pipeline data")
parser.add_argument("--org", default="Timmy_Foundation", help="Gitea org name")
parser.add_argument("--repo", help="Single repo to analyze (default: all)")
parser.add_argument("--base-url", default="https://forge.alexanderwhitestone.com", help="Gitea base URL")
parser.add_argument("--knowledge-dir", default=None, help="Path to knowledge directory")
parser.add_argument("--metrics-dir", default=None, help="Path to metrics directory")
parser.add_argument("--scripts-dir", default=None, help="Path to scripts directory")
parser.add_argument("--output-dir", default=None, help="Path to output directory")
parser.add_argument("--dry-run", action="store_true", help="Show what would change without applying")
parser.add_argument("--apply", action="store_true", help="Apply priority changes via API")
parser.add_argument("--json", action="store_true", help="Output JSON instead of markdown")
args = parser.parse_args()
# Resolve paths relative to script location
script_dir = Path(__file__).parent
repo_root = script_dir.parent
knowledge_dir = args.knowledge_dir or str(repo_root / "knowledge")
metrics_dir = args.metrics_dir or str(repo_root / "metrics")
scripts_dir = args.scripts_dir or str(repo_root / "scripts")
output_dir = args.output_dir or str(repo_root / "metrics")
# Get token
token = os.environ.get("GITEA_TOKEN")
if not token:
token_path = os.path.expanduser("~/.config/gitea/token")
if os.path.exists(token_path):
with open(token_path) as f:
token = f.read().strip()
if not token:
print("Error: No Gitea token found. Set GITEA_TOKEN or create ~/.config/gitea/token", file=sys.stderr)
sys.exit(1)
client = GiteaClient(args.base_url, token)
now = datetime.utcnow()
# Collect pipeline signals
print("Collecting pipeline signals...", file=sys.stderr)
signals = []
signals.extend(collect_knowledge_signals(knowledge_dir))
signals.extend(collect_staleness_signals(scripts_dir, knowledge_dir))
signals.extend(collect_metrics_signals(metrics_dir))
print(f" Found {len(signals)} signals", file=sys.stderr)
# Get repos
if args.repo:
repos = [{"name": args.repo}]
else:
repos = client.get_org_repos(args.org)
repo_names = [r["name"] for r in repos]
print(f"Scanning {len(repo_names)} repos: {', '.join(repo_names[:5])}{'...' if len(repo_names) > 5 else ''}", file=sys.stderr)
# Score all issues
all_scores = []
for repo in repos:
repo_name = repo["name"]
issues = client.get_issues(args.org, repo_name)
print(f" {repo_name}: {len(issues)} open issues", file=sys.stderr)
for issue in issues:
if issue.get("pull_request"):
continue
score = compute_issue_score(issue, repo_name, signals, now)
all_scores.append(score)
# Generate report
report = generate_report(all_scores, signals, args.org, repo_names)
# Output
os.makedirs(output_dir, exist_ok=True)
if args.json:
print(json.dumps(report, indent=2))
else:
md = generate_markdown_report(report)
print(md)
# Write files
report_path = os.path.join(output_dir, "priority_report.json")
with open(report_path, "w") as f:
json.dump(report, f, indent=2)
print(f"\nFull report: {report_path}", file=sys.stderr)
md_path = os.path.join(output_dir, "priority_suggestions.md")
with open(md_path, "w") as f:
f.write(generate_markdown_report(report))
print(f"Suggestions: {md_path}", file=sys.stderr)
# Apply changes if requested
if args.apply:
print("\nApplying priority changes...", file=sys.stderr)
applied = 0
# Get label IDs for priority labels
priority_label_map = {}
for repo_name in repo_names:
labels = client.get_repo_labels(args.org, repo_name)
for label in labels:
name = label.get("name", "").lower()
if name in ("p0", "p1", "p2", "p3"):
priority_label_map[(repo_name, name)] = label["id"]
for score in all_scores:
if score.suggested_priority and score.suggested_priority != score.current_priority:
sug_lower = score.suggested_priority.lower()
label_id = priority_label_map.get((score.repo, sug_lower))
if label_id:
ok = client.add_label_to_issue(args.org, score.repo, score.issue_id, [label_id])
if ok:
applied += 1
print(f" Applied: [{score.repo}#{score.issue_id}] → {score.suggested_priority}", file=sys.stderr)
# Add comment explaining the change
comment = f"**Priority Rebalancer** suggested: **{score.suggested_priority}** (was: {score.current_priority or 'none'})\n\n"
comment += f"Score: {score.score}\n"
comment += f"Reasons:\n"
for r in score.reasons[:5]:
comment += f"- {r}\n"
client.add_comment(args.org, score.repo, score.issue_id, comment)
print(f"Applied {applied} priority changes", file=sys.stderr)
elif args.dry_run:
print(f"\nDry run — {report['summary']['suggested_upgrades'] + report['summary']['suggested_downgrades'] + report['summary']['suggested_new_priorities']} changes would be applied", file=sys.stderr)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,111 @@
#!/usr/bin/env python3
"""Tests for gitea_issue_parser."""
import json
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
from gitea_issue_parser import parse_issue_body
def test_basic_structure():
body = """## Context
This is the background.
## Acceptance Criteria
- [ ] First criterion
- [x] Second criterion (already done)
- [ ] Third criterion
## Labels
`pipeline`, `extraction`
"""
result = parse_issue_body(body, "Test Issue", ["pipeline", "extraction"])
assert result["title"] == "Test Issue"
assert "background" in result["context"].lower()
assert len(result["criteria"]) == 3
assert "First criterion" in result["criteria"]
assert result["labels"] == ["pipeline", "extraction"]
print("PASS: test_basic_structure")
def test_epic_ref():
body = "Closes #645\n\nSome description."
result = parse_issue_body(body, "feat: thing (#688)")
assert result["epic_ref"] == "#645"
print("PASS: test_epic_ref")
def test_epic_ref_from_title():
body = "Some description without close ref."
result = parse_issue_body(body, "feat: scene descriptions (#645)")
assert result["epic_ref"] == "#645"
print("PASS: test_epic_ref_from_title")
def test_no_checkboxes():
body = """## Requirements
1. First thing
2. Second thing
3. Third thing
"""
result = parse_issue_body(body)
assert len(result["criteria"]) == 3
print("PASS: test_no_checkboxes")
def test_empty_body():
result = parse_issue_body("", "Empty Issue")
assert result["title"] == "Empty Issue"
assert result["criteria"] == []
assert result["context"] == ""
print("PASS: test_empty_body")
def test_real_issue_format():
body = """Closes #681
## Changes
Add `#!/usr/bin/env python3` shebang to 6 Python scripts.
## Verification
All 6 files confirmed missing shebangs before fix.
## Impact
Scripts can now be executed directly.
"""
result = parse_issue_body(body, "fix: add python3 shebangs (#685)")
assert result["epic_ref"] == "#681"
assert "shebang" in result["context"].lower()
print("PASS: test_real_issue_format")
def test_all_sections_captured():
body = """## Context
Background info.
## Acceptance Criteria
- [ ] Do thing
## Labels
`test`
"""
result = parse_issue_body(body)
assert "context" in result["sections"]
assert "acceptance criteria" in result["sections"]
print("PASS: test_all_sections_captured")
if __name__ == "__main__":
test_basic_structure()
test_epic_ref()
test_epic_ref_from_title()
test_no_checkboxes()
test_empty_body()
test_real_issue_format()
test_all_sections_captured()
print("\nAll tests passed.")

View File

@@ -1,129 +0,0 @@
#!/usr/bin/env python3
"""Tests for scripts/knowledge_staleness_check.py — 8 tests."""
import json
import os
import sys
import tempfile
sys.path.insert(0, os.path.dirname(__file__) or ".")
import importlib.util
spec = importlib.util.spec_from_file_location("ks", os.path.join(os.path.dirname(__file__) or ".", "knowledge_staleness_check.py"))
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
check_staleness = mod.check_staleness
fix_hashes = mod.fix_hashes
compute_file_hash = mod.compute_file_hash
def test_fresh_entry():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("print('hello')")
h = compute_file_hash(src)
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "hello", "source_file": "source.py", "source_hash": h}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "fresh"
print("PASS: test_fresh_entry")
def test_stale_entry():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("original content")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "old", "source_file": "source.py", "source_hash": "sha256:wrong"}]}, f)
# Now change the source
with open(src, "w") as f:
f.write("modified content")
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "stale"
print("PASS: test_stale_entry")
def test_missing_source():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "gone", "source_file": "nonexistent.py", "source_hash": "sha256:abc"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "missing_source"
print("PASS: test_missing_source")
def test_no_hash():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("content")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "no hash", "source_file": "source.py"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "no_hash"
assert results[0]["current_hash"].startswith("sha256:")
print("PASS: test_no_hash")
def test_no_source_field():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "orphan"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "no_source"
print("PASS: test_no_source_field")
def test_fix_hashes():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("content for hashing")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "needs hash", "source_file": "source.py"}]}, f)
fixed = fix_hashes(idx, tmpdir)
assert fixed == 1
# Verify hash was added
with open(idx) as f:
data = json.load(f)
assert data["facts"][0]["source_hash"].startswith("sha256:")
print("PASS: test_fix_hashes")
def test_empty_index():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": []}, f)
results = check_staleness(idx, tmpdir)
assert results == []
print("PASS: test_empty_index")
def test_compute_hash_nonexistent():
h = compute_file_hash("/nonexistent/path/file.py")
assert h is None
print("PASS: test_compute_hash_nonexistent")
def run_all():
test_fresh_entry()
test_stale_entry()
test_missing_source()
test_no_hash()
test_no_source_field()
test_fix_hashes()
test_empty_index()
test_compute_hash_nonexistent()
print("\nAll 8 tests passed!")
if __name__ == "__main__":
run_all()

View File

@@ -1,305 +0,0 @@
#!/usr/bin/env python3
"""
Tests for Priority Rebalancer
"""
import json
import os
import sys
import tempfile
from datetime import datetime, timedelta
from pathlib import Path
# Add script dir to path
sys.path.insert(0, str(Path(__file__).parent))
from priority_rebalancer import (
GiteaClient,
IssueScore,
PipelineSignal,
compute_issue_score,
collect_knowledge_signals,
collect_metrics_signals,
extract_priority,
generate_report,
generate_markdown_report,
PRIORITY_LEVELS,
)
# ============================================================
# Test Helpers
# ============================================================
PASS = 0
FAIL = 0
def test(name):
def decorator(fn):
global PASS, FAIL
try:
fn()
PASS += 1
print(f" [PASS] {name}")
except Exception as e:
FAIL += 1
print(f" [FAIL] {name}: {e}")
return decorator
def assert_eq(a, b, msg=""):
if a != b:
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
def assert_true(v, msg=""):
if not v:
raise AssertionError(msg or "Expected True")
def assert_false(v, msg=""):
if v:
raise AssertionError(msg or "Expected False")
# ============================================================
# Priority Extraction Tests
# ============================================================
print("=== Priority Rebalancer Tests ===\n")
print("-- Priority Extraction --")
@test("extract P0 from label")
def _():
assert_eq(extract_priority(["P0", "bug"]), "P0")
@test("extract P1 from priority:high")
def _():
assert_eq(extract_priority(["priority:high"]), "P1")
@test("extract P2 from priority:medium")
def _():
assert_eq(extract_priority(["priority:medium"]), "P2")
@test("extract P3 from priority:low")
def _():
assert_eq(extract_priority(["priority:low"]), "P3")
@test("returns None for no priority")
def _():
assert_eq(extract_priority(["bug", "enhancement"]), None)
@test("case insensitive")
def _():
assert_eq(extract_priority(["p1"]), "P1")
assert_eq(extract_priority(["PRIORITY:CRITICAL"]), "P0")
# ============================================================
# Issue Scoring Tests
# ============================================================
print("\n-- Issue Scoring --")
def make_issue(**kwargs):
defaults = {
"number": 1,
"title": "Test issue",
"labels": [],
"created_at": (datetime.utcnow() - timedelta(days=5)).isoformat() + "Z",
"comments": 0,
"assignees": None,
}
defaults.update(kwargs)
return defaults
@test("bug gets score boost")
def _():
issue = make_issue(title="Incorrect output format", labels=["bug"])
score = compute_issue_score(issue, "test-repo", [], datetime.utcnow())
assert_true(score.score > 0, f"Bug should boost score, got {score.score}")
# Bug label alone should be P2 or P3 (not P0)
assert_true(score.suggested_priority in ("P2", "P3"),
f"Bug label alone should be P2/P3, got {score.suggested_priority}")
@test("security gets high score")
def _():
issue = make_issue(title="Security: auth bypass", labels=["bug"])
score = compute_issue_score(issue, "test-repo", [], datetime.utcnow())
assert_true(score.score >= 25, f"Security should score high, got {score.score}")
@test("old dormant issue gets penalized")
def _():
issue = make_issue(
title="Some old feature",
created_at=(datetime.utcnow() - timedelta(days=120)).isoformat() + "Z",
comments=0
)
score = compute_issue_score(issue, "test-repo", [], datetime.utcnow())
assert_true(score.score < 0, f"Old dormant should be negative, got {score.score}")
assert_true(any("Dormant" in r for r in score.reasons), "Should mention dormancy")
@test("active discussion boosts score")
def _():
issue = make_issue(title="Important fix", comments=8)
score = compute_issue_score(issue, "test-repo", [], datetime.utcnow())
assert_true(score.score > 5, f"Active discussion should boost, got {score.score}")
assert_true(any("Active" in r for r in score.reasons))
@test("unassigned gets slight boost")
def _():
issue = make_issue(title="Fix bug", assignees=None)
score = compute_issue_score(issue, "test-repo", [], datetime.utcnow())
assert_true(any("Unassigned" in r for r in score.reasons))
@test("assigned issue notes assignee")
def _():
issue = make_issue(title="Fix bug", assignees=[{"login": "alice"}])
score = compute_issue_score(issue, "test-repo", [], datetime.utcnow())
assert_eq(score.assignee, "alice")
@test("nice-to-have gets penalized")
def _():
issue = make_issue(title="Nice to have: fancy animation")
score = compute_issue_score(issue, "test-repo", [], datetime.utcnow())
assert_true(score.score < 0, f"Nice-to-have should be negative, got {score.score}")
# ============================================================
# Pipeline Signal Tests
# ============================================================
print("\n-- Pipeline Signals --")
@test("signal alignment boosts matching issues")
def _():
signals = [PipelineSignal(
source="knowledge",
signal_type="stale_knowledge",
weight=0.8,
detail="20 stale facts"
)]
issue = make_issue(title="Fix stale knowledge entries")
score = compute_issue_score(issue, "test-repo", signals, datetime.utcnow())
assert_true(any("Matches signal" in r for r in score.reasons))
@test("empty knowledge boosts harvester issues")
def _():
signals = [PipelineSignal(
source="knowledge",
signal_type="empty_knowledge",
weight=0.7,
detail="0 facts"
)]
issue = make_issue(title="Implement harvester pipeline")
score = compute_issue_score(issue, "test-repo", signals, datetime.utcnow())
assert_true(any("Critical gap" in r for r in score.reasons))
# ============================================================
# Knowledge Signal Collection Tests
# ============================================================
print("\n-- Knowledge Signal Collection --")
@test("missing index generates signal")
def _():
with tempfile.TemporaryDirectory() as tmpdir:
signals = collect_knowledge_signals(tmpdir)
assert_true(len(signals) > 0)
assert_eq(signals[0].signal_type, "missing_index")
@test("empty knowledge generates signal")
def _():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": []}, f)
signals = collect_knowledge_signals(tmpdir)
assert_true(any(s.signal_type == "empty_knowledge" for s in signals))
@test("corrupt index generates signal")
def _():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
f.write("not json {{{")
signals = collect_knowledge_signals(tmpdir)
assert_true(any(s.signal_type == "corrupt_index" for s in signals))
@test("knowledge with facts passes")
def _():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [
{"id": 1, "repo": "test", "status": "fresh"},
{"id": 2, "repo": "test", "status": "fresh"},
]}, f)
signals = collect_knowledge_signals(tmpdir)
# Should not generate missing_index or empty_knowledge
assert_false(any(s.signal_type in ("missing_index", "empty_knowledge") for s in signals))
# ============================================================
# Metrics Signal Collection Tests
# ============================================================
print("\n-- Metrics Signal Collection --")
@test("empty metrics dir generates signal")
def _():
with tempfile.TemporaryDirectory() as tmpdir:
signals = collect_metrics_signals(tmpdir)
assert_true(any(s.signal_type == "no_metrics" for s in signals))
@test("metrics with files passes")
def _():
with tempfile.TemporaryDirectory() as tmpdir:
# Create files (simulating real metrics dir with .gitkeep + actual files)
with open(os.path.join(tmpdir, ".gitkeep"), "w") as f:
f.write("")
with open(os.path.join(tmpdir, "report.json"), "w") as f:
f.write("{}")
signals = collect_metrics_signals(tmpdir)
assert_false(any(s.signal_type == "no_metrics" for s in signals))
# ============================================================
# Report Generation Tests
# ============================================================
print("\n-- Report Generation --")
@test("report has correct structure")
def _():
scores = [
IssueScore(1, "repo1", "Bug fix", ["bug"], None, "P1", 30.0, ["test"], 5, 3, None),
IssueScore(2, "repo1", "Feature", ["enhancement"], "P3", None, -5.0, ["test"], 60, 0, "alice"),
]
signals = [PipelineSignal("knowledge", "stale_knowledge", 0.5, "10 stale")]
report = generate_report(scores, signals, "test-org", ["repo1"])
assert_eq(report["org"], "test-org")
assert_eq(report["total_issues"], 2)
assert_true("generated_at" in report)
assert_true("summary" in report)
assert_true("top_priority" in report)
assert_eq(report["summary"]["suggested_new_priorities"], 1)
@test("markdown report is non-empty")
def _():
scores = [IssueScore(1, "repo1", "Test", ["bug"], None, "P2", 15.0, ["reason"], 5, 0, None)]
report = generate_report(scores, [], "test-org", ["repo1"])
md = generate_markdown_report(report)
assert_true(len(md) > 100)
assert_true("Priority Rebalancer Report" in md)
assert_true("Top 10" in md)
# ============================================================
# Summary
# ============================================================
print(f"\n=== Summary ===")
print(f"Total: {PASS + FAIL} | Passed: {PASS} | Failed: {FAIL}")
if FAIL > 0:
sys.exit(1)