forked from Rockachopa/Timmy-time-dashboard
407 lines
13 KiB
Python
407 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""Mechanical triage scoring for the Timmy dev loop.
|
|
|
|
Reads open issues from Gitea, scores them on scope/acceptance/alignment,
|
|
writes a ranked queue to .loop/queue.json. No LLM calls — pure heuristics.
|
|
|
|
Run: python3 scripts/triage_score.py
|
|
Env: GITEA_TOKEN (or reads ~/.hermes/gitea_token)
|
|
GITEA_API (default: http://localhost:3000/api/v1)
|
|
REPO_SLUG (default: rockachopa/Timmy-time-dashboard)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
# ── Config ──────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _get_gitea_api() -> str:
|
|
"""Read Gitea API URL from env var, then ~/.hermes/gitea_api file, then default."""
|
|
# Check env vars first (TIMMY_GITEA_API is preferred, GITEA_API for compatibility)
|
|
api_url = os.environ.get("TIMMY_GITEA_API") or os.environ.get("GITEA_API")
|
|
if api_url:
|
|
return api_url
|
|
# Check ~/.hermes/gitea_api file
|
|
api_file = Path.home() / ".hermes" / "gitea_api"
|
|
if api_file.exists():
|
|
return api_file.read_text().strip()
|
|
# Default fallback
|
|
return "http://localhost:3000/api/v1"
|
|
|
|
|
|
GITEA_API = _get_gitea_api()
|
|
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
|
|
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
|
|
REPO_ROOT = Path(__file__).resolve().parent.parent
|
|
QUEUE_FILE = REPO_ROOT / ".loop" / "queue.json"
|
|
QUEUE_BACKUP_FILE = REPO_ROOT / ".loop" / "queue.json.bak"
|
|
RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "triage.jsonl"
|
|
QUARANTINE_FILE = REPO_ROOT / ".loop" / "quarantine.json"
|
|
CYCLE_RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
|
|
|
|
# Minimum score to be considered "ready"
|
|
READY_THRESHOLD = 5
|
|
# How many recent cycle retros to check for quarantine
|
|
QUARANTINE_LOOKBACK = 20
|
|
|
|
# ── Helpers ─────────────────────────────────────────────────────────────
|
|
|
|
def get_token() -> str:
|
|
token = os.environ.get("GITEA_TOKEN", "").strip()
|
|
if not token and TOKEN_FILE.exists():
|
|
token = TOKEN_FILE.read_text().strip()
|
|
if not token:
|
|
print("[triage] ERROR: No Gitea token found", file=sys.stderr)
|
|
sys.exit(1)
|
|
return token
|
|
|
|
|
|
def api_get(path: str, token: str) -> list | dict:
|
|
"""Minimal HTTP GET using urllib (no dependencies)."""
|
|
import urllib.request
|
|
url = f"{GITEA_API}/repos/{REPO_SLUG}/{path}"
|
|
req = urllib.request.Request(url, headers={
|
|
"Authorization": f"token {token}",
|
|
"Accept": "application/json",
|
|
})
|
|
with urllib.request.urlopen(req, timeout=15) as resp:
|
|
return json.loads(resp.read())
|
|
|
|
|
|
def load_quarantine() -> dict:
|
|
"""Load quarantined issues {issue_num: {reason, quarantined_at, failures}}."""
|
|
if QUARANTINE_FILE.exists():
|
|
try:
|
|
return json.loads(QUARANTINE_FILE.read_text())
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
return {}
|
|
|
|
|
|
def save_quarantine(q: dict) -> None:
|
|
QUARANTINE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
QUARANTINE_FILE.write_text(json.dumps(q, indent=2) + "\n")
|
|
|
|
|
|
def load_cycle_failures() -> dict[int, int]:
|
|
"""Count failures per issue from recent cycle retros."""
|
|
failures: dict[int, int] = {}
|
|
if not CYCLE_RETRO_FILE.exists():
|
|
return failures
|
|
lines = CYCLE_RETRO_FILE.read_text().strip().splitlines()
|
|
for line in lines[-QUARANTINE_LOOKBACK:]:
|
|
try:
|
|
entry = json.loads(line)
|
|
if not entry.get("success", True):
|
|
issue = entry.get("issue")
|
|
if issue:
|
|
failures[issue] = failures.get(issue, 0) + 1
|
|
except (json.JSONDecodeError, KeyError):
|
|
continue
|
|
return failures
|
|
|
|
|
|
# ── Scoring ─────────────────────────────────────────────────────────────
|
|
|
|
# Patterns that indicate file/function specificity
|
|
FILE_PATTERNS = re.compile(
|
|
r"(?:src/|tests/|scripts/|\.py|\.html|\.js|\.yaml|\.toml|\.sh)", re.IGNORECASE
|
|
)
|
|
FUNCTION_PATTERNS = re.compile(
|
|
r"(?:def |class |function |method |`\w+\(\)`)", re.IGNORECASE
|
|
)
|
|
|
|
# Patterns that indicate acceptance criteria
|
|
ACCEPTANCE_PATTERNS = re.compile(
|
|
r"(?:should|must|expect|verify|assert|test.?case|acceptance|criteria"
|
|
r"|pass(?:es|ing)|fail(?:s|ing)|return(?:s)?|raise(?:s)?)",
|
|
re.IGNORECASE,
|
|
)
|
|
TEST_PATTERNS = re.compile(
|
|
r"(?:tox|pytest|test_\w+|\.test\.|assert\s)", re.IGNORECASE
|
|
)
|
|
|
|
# Tags in issue titles
|
|
TAG_PATTERN = re.compile(r"\[([^\]]+)\]")
|
|
|
|
# Priority labels / tags
|
|
BUG_TAGS = {"bug", "broken", "crash", "error", "fix", "regression", "hotfix"}
|
|
FEATURE_TAGS = {"feature", "feat", "enhancement", "capability", "timmy-capability"}
|
|
REFACTOR_TAGS = {"refactor", "cleanup", "tech-debt", "optimization", "perf"}
|
|
META_TAGS = {"philosophy", "soul-gap", "discussion", "question", "rfc"}
|
|
LOOP_TAG = "loop-generated"
|
|
|
|
|
|
def extract_tags(title: str, labels: list[str]) -> set[str]:
|
|
"""Pull tags from [bracket] notation in title + Gitea labels."""
|
|
tags = set()
|
|
for match in TAG_PATTERN.finditer(title):
|
|
tags.add(match.group(1).lower().strip())
|
|
for label in labels:
|
|
tags.add(label.lower().strip())
|
|
return tags
|
|
|
|
|
|
def score_scope(title: str, body: str, tags: set[str]) -> int:
|
|
"""0-3: How well-scoped is this issue?"""
|
|
text = f"{title}\n{body}"
|
|
score = 0
|
|
|
|
# Mentions specific files?
|
|
if FILE_PATTERNS.search(text):
|
|
score += 1
|
|
|
|
# Mentions specific functions/classes?
|
|
if FUNCTION_PATTERNS.search(text):
|
|
score += 1
|
|
|
|
# Short, focused title (not a novel)?
|
|
clean_title = TAG_PATTERN.sub("", title).strip()
|
|
if len(clean_title) < 80:
|
|
score += 1
|
|
|
|
# Philosophy/meta issues are inherently unscoped for dev work
|
|
if tags & META_TAGS:
|
|
score = max(0, score - 2)
|
|
|
|
return min(3, score)
|
|
|
|
|
|
def score_acceptance(title: str, body: str, tags: set[str]) -> int:
|
|
"""0-3: Does this have clear acceptance criteria?"""
|
|
text = f"{title}\n{body}"
|
|
score = 0
|
|
|
|
# Has acceptance-related language?
|
|
matches = len(ACCEPTANCE_PATTERNS.findall(text))
|
|
if matches >= 3:
|
|
score += 2
|
|
elif matches >= 1:
|
|
score += 1
|
|
|
|
# Mentions specific tests?
|
|
if TEST_PATTERNS.search(text):
|
|
score += 1
|
|
|
|
# Has a "## Problem" + "## Solution" or similar structure?
|
|
if re.search(r"##\s*(problem|solution|expected|actual|steps)", body, re.IGNORECASE):
|
|
score += 1
|
|
|
|
# Philosophy issues don't have testable criteria
|
|
if tags & META_TAGS:
|
|
score = max(0, score - 1)
|
|
|
|
return min(3, score)
|
|
|
|
|
|
def score_alignment(title: str, body: str, tags: set[str]) -> int:
|
|
"""0-3: How aligned is this with the north star?"""
|
|
score = 0
|
|
|
|
# Bug on main = highest priority
|
|
if tags & BUG_TAGS:
|
|
score += 3
|
|
return min(3, score)
|
|
|
|
# Refactors that improve code health
|
|
if tags & REFACTOR_TAGS:
|
|
score += 2
|
|
|
|
# Features that grow Timmy's capabilities
|
|
if tags & FEATURE_TAGS:
|
|
score += 2
|
|
|
|
# Loop-generated issues get a small boost (the loop found real problems)
|
|
if LOOP_TAG in tags:
|
|
score += 1
|
|
|
|
# Philosophy issues are important but not dev-actionable
|
|
if tags & META_TAGS:
|
|
score = 0
|
|
|
|
return min(3, score)
|
|
|
|
|
|
def score_issue(issue: dict) -> dict:
|
|
"""Score a single issue. Returns enriched dict."""
|
|
title = issue.get("title", "")
|
|
body = issue.get("body", "") or ""
|
|
labels = [l["name"] for l in issue.get("labels", [])]
|
|
tags = extract_tags(title, labels)
|
|
number = issue["number"]
|
|
|
|
scope = score_scope(title, body, tags)
|
|
acceptance = score_acceptance(title, body, tags)
|
|
alignment = score_alignment(title, body, tags)
|
|
total = scope + acceptance + alignment
|
|
|
|
# Determine issue type
|
|
if tags & BUG_TAGS:
|
|
issue_type = "bug"
|
|
elif tags & FEATURE_TAGS:
|
|
issue_type = "feature"
|
|
elif tags & REFACTOR_TAGS:
|
|
issue_type = "refactor"
|
|
elif tags & META_TAGS:
|
|
issue_type = "philosophy"
|
|
else:
|
|
issue_type = "unknown"
|
|
|
|
# Extract mentioned files from body
|
|
files = list(set(re.findall(r"(?:src|tests|scripts)/[\w/.]+\.(?:py|html|js|yaml)", body)))
|
|
|
|
return {
|
|
"issue": number,
|
|
"title": TAG_PATTERN.sub("", title).strip(),
|
|
"type": issue_type,
|
|
"score": total,
|
|
"scope": scope,
|
|
"acceptance": acceptance,
|
|
"alignment": alignment,
|
|
"tags": sorted(tags),
|
|
"files": files[:10],
|
|
"ready": total >= READY_THRESHOLD,
|
|
}
|
|
|
|
|
|
# ── Quarantine ──────────────────────────────────────────────────────────
|
|
|
|
def update_quarantine(scored: list[dict]) -> list[dict]:
|
|
"""Auto-quarantine issues that have failed >= 2 times. Returns filtered list."""
|
|
failures = load_cycle_failures()
|
|
quarantine = load_quarantine()
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
|
|
filtered = []
|
|
for item in scored:
|
|
num = item["issue"]
|
|
fail_count = failures.get(num, 0)
|
|
str_num = str(num)
|
|
|
|
if fail_count >= 2 and str_num not in quarantine:
|
|
quarantine[str_num] = {
|
|
"reason": f"Failed {fail_count} times in recent cycles",
|
|
"quarantined_at": now,
|
|
"failures": fail_count,
|
|
}
|
|
print(f"[triage] QUARANTINED #{num}: failed {fail_count} times")
|
|
continue
|
|
|
|
if str_num in quarantine:
|
|
print(f"[triage] Skipping #{num} (quarantined)")
|
|
continue
|
|
|
|
filtered.append(item)
|
|
|
|
save_quarantine(quarantine)
|
|
return filtered
|
|
|
|
|
|
# ── Main ────────────────────────────────────────────────────────────────
|
|
|
|
def run_triage() -> list[dict]:
|
|
token = get_token()
|
|
|
|
# Fetch all open issues (paginate)
|
|
page = 1
|
|
all_issues: list[dict] = []
|
|
while True:
|
|
batch = api_get(f"issues?state=open&limit=50&page={page}&type=issues", token)
|
|
if not batch:
|
|
break
|
|
all_issues.extend(batch)
|
|
if len(batch) < 50:
|
|
break
|
|
page += 1
|
|
|
|
print(f"[triage] Fetched {len(all_issues)} open issues")
|
|
|
|
# Score each
|
|
scored = [score_issue(i) for i in all_issues]
|
|
|
|
# Auto-quarantine repeat failures
|
|
scored = update_quarantine(scored)
|
|
|
|
# Sort: ready first, then by score descending, bugs always on top
|
|
def sort_key(item: dict) -> tuple:
|
|
return (
|
|
0 if item["type"] == "bug" else 1,
|
|
-item["score"],
|
|
item["issue"],
|
|
)
|
|
|
|
scored.sort(key=sort_key)
|
|
|
|
# Write queue (ready items only)
|
|
ready = [s for s in scored if s["ready"]]
|
|
not_ready = [s for s in scored if not s["ready"]]
|
|
|
|
# Save backup before writing (if current file exists and is valid)
|
|
if QUEUE_FILE.exists():
|
|
try:
|
|
json.loads(QUEUE_FILE.read_text()) # Validate current file
|
|
QUEUE_BACKUP_FILE.write_text(QUEUE_FILE.read_text())
|
|
except (json.JSONDecodeError, OSError):
|
|
pass # Current file is corrupt, don't overwrite backup
|
|
|
|
# Write new queue file
|
|
QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
QUEUE_FILE.write_text(json.dumps(ready, indent=2) + "\n")
|
|
|
|
# Validate the write by re-reading and parsing
|
|
try:
|
|
json.loads(QUEUE_FILE.read_text())
|
|
except (json.JSONDecodeError, OSError) as exc:
|
|
print(f"[triage] ERROR: queue.json validation failed: {exc}", file=sys.stderr)
|
|
# Restore from backup if available
|
|
if QUEUE_BACKUP_FILE.exists():
|
|
try:
|
|
backup_data = QUEUE_BACKUP_FILE.read_text()
|
|
json.loads(backup_data) # Validate backup
|
|
QUEUE_FILE.write_text(backup_data)
|
|
print(f"[triage] Restored queue.json from backup")
|
|
except (json.JSONDecodeError, OSError) as restore_exc:
|
|
print(f"[triage] ERROR: Backup restore failed: {restore_exc}", file=sys.stderr)
|
|
# Write empty list as last resort
|
|
QUEUE_FILE.write_text("[]\n")
|
|
else:
|
|
# No backup, write empty list
|
|
QUEUE_FILE.write_text("[]\n")
|
|
|
|
# Write retro entry
|
|
retro_entry = {
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"total_open": len(all_issues),
|
|
"scored": len(scored),
|
|
"ready": len(ready),
|
|
"not_ready": len(not_ready),
|
|
"top_issue": ready[0]["issue"] if ready else None,
|
|
"quarantined": len(load_quarantine()),
|
|
}
|
|
RETRO_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(RETRO_FILE, "a") as f:
|
|
f.write(json.dumps(retro_entry) + "\n")
|
|
|
|
# Summary
|
|
print(f"[triage] Ready: {len(ready)} | Not ready: {len(not_ready)}")
|
|
for item in ready[:5]:
|
|
flag = "🐛" if item["type"] == "bug" else "✦"
|
|
print(f" {flag} #{item['issue']} score={item['score']} {item['title'][:60]}")
|
|
if not_ready:
|
|
print(f"[triage] Low-scoring ({len(not_ready)}):")
|
|
for item in not_ready[:3]:
|
|
print(f" #{item['issue']} score={item['score']} {item['title'][:50]}")
|
|
|
|
return ready
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run_triage()
|