1
0
This repository has been archived on 2026-03-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
Timmy-time-dashboard/scripts/triage_score.py

407 lines
13 KiB
Python

#!/usr/bin/env python3
"""Mechanical triage scoring for the Timmy dev loop.
Reads open issues from Gitea, scores them on scope/acceptance/alignment,
writes a ranked queue to .loop/queue.json. No LLM calls — pure heuristics.
Run: python3 scripts/triage_score.py
Env: GITEA_TOKEN (or reads ~/.hermes/gitea_token)
GITEA_API (default: http://localhost:3000/api/v1)
REPO_SLUG (default: rockachopa/Timmy-time-dashboard)
"""
from __future__ import annotations
import json
import os
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
# ── Config ──────────────────────────────────────────────────────────────
def _get_gitea_api() -> str:
"""Read Gitea API URL from env var, then ~/.hermes/gitea_api file, then default."""
# Check env vars first (TIMMY_GITEA_API is preferred, GITEA_API for compatibility)
api_url = os.environ.get("TIMMY_GITEA_API") or os.environ.get("GITEA_API")
if api_url:
return api_url
# Check ~/.hermes/gitea_api file
api_file = Path.home() / ".hermes" / "gitea_api"
if api_file.exists():
return api_file.read_text().strip()
# Default fallback
return "http://localhost:3000/api/v1"
GITEA_API = _get_gitea_api()
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
REPO_ROOT = Path(__file__).resolve().parent.parent
QUEUE_FILE = REPO_ROOT / ".loop" / "queue.json"
QUEUE_BACKUP_FILE = REPO_ROOT / ".loop" / "queue.json.bak"
RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "triage.jsonl"
QUARANTINE_FILE = REPO_ROOT / ".loop" / "quarantine.json"
CYCLE_RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
# Minimum score to be considered "ready"
READY_THRESHOLD = 5
# How many recent cycle retros to check for quarantine
QUARANTINE_LOOKBACK = 20
# ── Helpers ─────────────────────────────────────────────────────────────
def get_token() -> str:
token = os.environ.get("GITEA_TOKEN", "").strip()
if not token and TOKEN_FILE.exists():
token = TOKEN_FILE.read_text().strip()
if not token:
print("[triage] ERROR: No Gitea token found", file=sys.stderr)
sys.exit(1)
return token
def api_get(path: str, token: str) -> list | dict:
"""Minimal HTTP GET using urllib (no dependencies)."""
import urllib.request
url = f"{GITEA_API}/repos/{REPO_SLUG}/{path}"
req = urllib.request.Request(url, headers={
"Authorization": f"token {token}",
"Accept": "application/json",
})
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read())
def load_quarantine() -> dict:
"""Load quarantined issues {issue_num: {reason, quarantined_at, failures}}."""
if QUARANTINE_FILE.exists():
try:
return json.loads(QUARANTINE_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass
return {}
def save_quarantine(q: dict) -> None:
QUARANTINE_FILE.parent.mkdir(parents=True, exist_ok=True)
QUARANTINE_FILE.write_text(json.dumps(q, indent=2) + "\n")
def load_cycle_failures() -> dict[int, int]:
"""Count failures per issue from recent cycle retros."""
failures: dict[int, int] = {}
if not CYCLE_RETRO_FILE.exists():
return failures
lines = CYCLE_RETRO_FILE.read_text().strip().splitlines()
for line in lines[-QUARANTINE_LOOKBACK:]:
try:
entry = json.loads(line)
if not entry.get("success", True):
issue = entry.get("issue")
if issue:
failures[issue] = failures.get(issue, 0) + 1
except (json.JSONDecodeError, KeyError):
continue
return failures
# ── Scoring ─────────────────────────────────────────────────────────────
# Patterns that indicate file/function specificity
FILE_PATTERNS = re.compile(
r"(?:src/|tests/|scripts/|\.py|\.html|\.js|\.yaml|\.toml|\.sh)", re.IGNORECASE
)
FUNCTION_PATTERNS = re.compile(
r"(?:def |class |function |method |`\w+\(\)`)", re.IGNORECASE
)
# Patterns that indicate acceptance criteria
ACCEPTANCE_PATTERNS = re.compile(
r"(?:should|must|expect|verify|assert|test.?case|acceptance|criteria"
r"|pass(?:es|ing)|fail(?:s|ing)|return(?:s)?|raise(?:s)?)",
re.IGNORECASE,
)
TEST_PATTERNS = re.compile(
r"(?:tox|pytest|test_\w+|\.test\.|assert\s)", re.IGNORECASE
)
# Tags in issue titles
TAG_PATTERN = re.compile(r"\[([^\]]+)\]")
# Priority labels / tags
BUG_TAGS = {"bug", "broken", "crash", "error", "fix", "regression", "hotfix"}
FEATURE_TAGS = {"feature", "feat", "enhancement", "capability", "timmy-capability"}
REFACTOR_TAGS = {"refactor", "cleanup", "tech-debt", "optimization", "perf"}
META_TAGS = {"philosophy", "soul-gap", "discussion", "question", "rfc"}
LOOP_TAG = "loop-generated"
def extract_tags(title: str, labels: list[str]) -> set[str]:
"""Pull tags from [bracket] notation in title + Gitea labels."""
tags = set()
for match in TAG_PATTERN.finditer(title):
tags.add(match.group(1).lower().strip())
for label in labels:
tags.add(label.lower().strip())
return tags
def score_scope(title: str, body: str, tags: set[str]) -> int:
"""0-3: How well-scoped is this issue?"""
text = f"{title}\n{body}"
score = 0
# Mentions specific files?
if FILE_PATTERNS.search(text):
score += 1
# Mentions specific functions/classes?
if FUNCTION_PATTERNS.search(text):
score += 1
# Short, focused title (not a novel)?
clean_title = TAG_PATTERN.sub("", title).strip()
if len(clean_title) < 80:
score += 1
# Philosophy/meta issues are inherently unscoped for dev work
if tags & META_TAGS:
score = max(0, score - 2)
return min(3, score)
def score_acceptance(title: str, body: str, tags: set[str]) -> int:
"""0-3: Does this have clear acceptance criteria?"""
text = f"{title}\n{body}"
score = 0
# Has acceptance-related language?
matches = len(ACCEPTANCE_PATTERNS.findall(text))
if matches >= 3:
score += 2
elif matches >= 1:
score += 1
# Mentions specific tests?
if TEST_PATTERNS.search(text):
score += 1
# Has a "## Problem" + "## Solution" or similar structure?
if re.search(r"##\s*(problem|solution|expected|actual|steps)", body, re.IGNORECASE):
score += 1
# Philosophy issues don't have testable criteria
if tags & META_TAGS:
score = max(0, score - 1)
return min(3, score)
def score_alignment(title: str, body: str, tags: set[str]) -> int:
"""0-3: How aligned is this with the north star?"""
score = 0
# Bug on main = highest priority
if tags & BUG_TAGS:
score += 3
return min(3, score)
# Refactors that improve code health
if tags & REFACTOR_TAGS:
score += 2
# Features that grow Timmy's capabilities
if tags & FEATURE_TAGS:
score += 2
# Loop-generated issues get a small boost (the loop found real problems)
if LOOP_TAG in tags:
score += 1
# Philosophy issues are important but not dev-actionable
if tags & META_TAGS:
score = 0
return min(3, score)
def score_issue(issue: dict) -> dict:
"""Score a single issue. Returns enriched dict."""
title = issue.get("title", "")
body = issue.get("body", "") or ""
labels = [l["name"] for l in issue.get("labels", [])]
tags = extract_tags(title, labels)
number = issue["number"]
scope = score_scope(title, body, tags)
acceptance = score_acceptance(title, body, tags)
alignment = score_alignment(title, body, tags)
total = scope + acceptance + alignment
# Determine issue type
if tags & BUG_TAGS:
issue_type = "bug"
elif tags & FEATURE_TAGS:
issue_type = "feature"
elif tags & REFACTOR_TAGS:
issue_type = "refactor"
elif tags & META_TAGS:
issue_type = "philosophy"
else:
issue_type = "unknown"
# Extract mentioned files from body
files = list(set(re.findall(r"(?:src|tests|scripts)/[\w/.]+\.(?:py|html|js|yaml)", body)))
return {
"issue": number,
"title": TAG_PATTERN.sub("", title).strip(),
"type": issue_type,
"score": total,
"scope": scope,
"acceptance": acceptance,
"alignment": alignment,
"tags": sorted(tags),
"files": files[:10],
"ready": total >= READY_THRESHOLD,
}
# ── Quarantine ──────────────────────────────────────────────────────────
def update_quarantine(scored: list[dict]) -> list[dict]:
"""Auto-quarantine issues that have failed >= 2 times. Returns filtered list."""
failures = load_cycle_failures()
quarantine = load_quarantine()
now = datetime.now(timezone.utc).isoformat()
filtered = []
for item in scored:
num = item["issue"]
fail_count = failures.get(num, 0)
str_num = str(num)
if fail_count >= 2 and str_num not in quarantine:
quarantine[str_num] = {
"reason": f"Failed {fail_count} times in recent cycles",
"quarantined_at": now,
"failures": fail_count,
}
print(f"[triage] QUARANTINED #{num}: failed {fail_count} times")
continue
if str_num in quarantine:
print(f"[triage] Skipping #{num} (quarantined)")
continue
filtered.append(item)
save_quarantine(quarantine)
return filtered
# ── Main ────────────────────────────────────────────────────────────────
def run_triage() -> list[dict]:
token = get_token()
# Fetch all open issues (paginate)
page = 1
all_issues: list[dict] = []
while True:
batch = api_get(f"issues?state=open&limit=50&page={page}&type=issues", token)
if not batch:
break
all_issues.extend(batch)
if len(batch) < 50:
break
page += 1
print(f"[triage] Fetched {len(all_issues)} open issues")
# Score each
scored = [score_issue(i) for i in all_issues]
# Auto-quarantine repeat failures
scored = update_quarantine(scored)
# Sort: ready first, then by score descending, bugs always on top
def sort_key(item: dict) -> tuple:
return (
0 if item["type"] == "bug" else 1,
-item["score"],
item["issue"],
)
scored.sort(key=sort_key)
# Write queue (ready items only)
ready = [s for s in scored if s["ready"]]
not_ready = [s for s in scored if not s["ready"]]
# Save backup before writing (if current file exists and is valid)
if QUEUE_FILE.exists():
try:
json.loads(QUEUE_FILE.read_text()) # Validate current file
QUEUE_BACKUP_FILE.write_text(QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass # Current file is corrupt, don't overwrite backup
# Write new queue file
QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
QUEUE_FILE.write_text(json.dumps(ready, indent=2) + "\n")
# Validate the write by re-reading and parsing
try:
json.loads(QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError) as exc:
print(f"[triage] ERROR: queue.json validation failed: {exc}", file=sys.stderr)
# Restore from backup if available
if QUEUE_BACKUP_FILE.exists():
try:
backup_data = QUEUE_BACKUP_FILE.read_text()
json.loads(backup_data) # Validate backup
QUEUE_FILE.write_text(backup_data)
print(f"[triage] Restored queue.json from backup")
except (json.JSONDecodeError, OSError) as restore_exc:
print(f"[triage] ERROR: Backup restore failed: {restore_exc}", file=sys.stderr)
# Write empty list as last resort
QUEUE_FILE.write_text("[]\n")
else:
# No backup, write empty list
QUEUE_FILE.write_text("[]\n")
# Write retro entry
retro_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"total_open": len(all_issues),
"scored": len(scored),
"ready": len(ready),
"not_ready": len(not_ready),
"top_issue": ready[0]["issue"] if ready else None,
"quarantined": len(load_quarantine()),
}
RETRO_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(RETRO_FILE, "a") as f:
f.write(json.dumps(retro_entry) + "\n")
# Summary
print(f"[triage] Ready: {len(ready)} | Not ready: {len(not_ready)}")
for item in ready[:5]:
flag = "🐛" if item["type"] == "bug" else ""
print(f" {flag} #{item['issue']} score={item['score']} {item['title'][:60]}")
if not_ready:
print(f"[triage] Low-scoring ({len(not_ready)}):")
for item in not_ready[:3]:
print(f" #{item['issue']} score={item['score']} {item['title'][:50]}")
return ready
if __name__ == "__main__":
run_triage()