228 lines
7.1 KiB
Python
228 lines
7.1 KiB
Python
#!/usr/bin/env python3
|
|
"""Backfill cycle retrospective data from Gitea merged PRs and git log.
|
|
|
|
One-time script to seed .loop/retro/cycles.jsonl and summary.json
|
|
from existing history so the LOOPSTAT panel isn't empty.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from urllib.request import Request, urlopen
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parent.parent
|
|
RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
|
|
SUMMARY_FILE = REPO_ROOT / ".loop" / "retro" / "summary.json"
|
|
|
|
GITEA_API = "http://localhost:3000/api/v1"
|
|
REPO_SLUG = "rockachopa/Timmy-time-dashboard"
|
|
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
|
|
|
|
TAG_RE = re.compile(r"\[([^\]]+)\]")
|
|
CYCLE_RE = re.compile(r"\[loop-cycle-(\d+)\]", re.IGNORECASE)
|
|
ISSUE_RE = re.compile(r"#(\d+)")
|
|
|
|
|
|
def get_token() -> str:
|
|
return TOKEN_FILE.read_text().strip()
|
|
|
|
|
|
def api_get(path: str, token: str) -> list | dict:
|
|
url = f"{GITEA_API}/repos/{REPO_SLUG}/{path}"
|
|
req = Request(url, headers={
|
|
"Authorization": f"token {token}",
|
|
"Accept": "application/json",
|
|
})
|
|
with urlopen(req, timeout=15) as resp:
|
|
return json.loads(resp.read())
|
|
|
|
|
|
def get_all_merged_prs(token: str) -> list[dict]:
|
|
"""Fetch all merged PRs from Gitea."""
|
|
all_prs = []
|
|
page = 1
|
|
while True:
|
|
batch = api_get(f"pulls?state=closed&sort=created&limit=50&page={page}", token)
|
|
if not batch:
|
|
break
|
|
merged = [p for p in batch if p.get("merged")]
|
|
all_prs.extend(merged)
|
|
if len(batch) < 50:
|
|
break
|
|
page += 1
|
|
return all_prs
|
|
|
|
|
|
def get_pr_diff_stats(token: str, pr_number: int) -> dict:
|
|
"""Get diff stats for a PR."""
|
|
try:
|
|
pr = api_get(f"pulls/{pr_number}", token)
|
|
return {
|
|
"additions": pr.get("additions", 0),
|
|
"deletions": pr.get("deletions", 0),
|
|
"changed_files": pr.get("changed_files", 0),
|
|
}
|
|
except Exception:
|
|
return {"additions": 0, "deletions": 0, "changed_files": 0}
|
|
|
|
|
|
def classify_pr(title: str, body: str) -> str:
|
|
"""Guess issue type from PR title/body."""
|
|
tags = set()
|
|
for match in TAG_RE.finditer(title):
|
|
tags.add(match.group(1).lower())
|
|
|
|
lower = title.lower()
|
|
if "fix" in lower or "bug" in tags:
|
|
return "bug"
|
|
elif "feat" in lower or "feature" in tags:
|
|
return "feature"
|
|
elif "refactor" in lower or "refactor" in tags:
|
|
return "refactor"
|
|
elif "test" in lower:
|
|
return "feature"
|
|
elif "policy" in lower or "chore" in lower:
|
|
return "refactor"
|
|
return "unknown"
|
|
|
|
|
|
def extract_cycle_number(title: str) -> int | None:
|
|
m = CYCLE_RE.search(title)
|
|
return int(m.group(1)) if m else None
|
|
|
|
|
|
def extract_issue_number(title: str, body: str) -> int | None:
|
|
# Try body first (usually has "closes #N")
|
|
for text in [body or "", title]:
|
|
m = ISSUE_RE.search(text)
|
|
if m:
|
|
return int(m.group(1))
|
|
return None
|
|
|
|
|
|
def estimate_duration(pr: dict) -> int:
|
|
"""Estimate cycle duration from PR created_at to merged_at."""
|
|
try:
|
|
created = datetime.fromisoformat(pr["created_at"].replace("Z", "+00:00"))
|
|
merged = datetime.fromisoformat(pr["merged_at"].replace("Z", "+00:00"))
|
|
delta = (merged - created).total_seconds()
|
|
# Cap at 1200s (max cycle time) — some PRs sit open for days
|
|
return min(int(delta), 1200)
|
|
except (KeyError, ValueError, TypeError):
|
|
return 0
|
|
|
|
|
|
def main():
|
|
token = get_token()
|
|
|
|
print("[backfill] Fetching merged PRs from Gitea...")
|
|
prs = get_all_merged_prs(token)
|
|
print(f"[backfill] Found {len(prs)} merged PRs")
|
|
|
|
# Sort oldest first
|
|
prs.sort(key=lambda p: p.get("merged_at", ""))
|
|
|
|
entries = []
|
|
cycle_counter = 0
|
|
|
|
for pr in prs:
|
|
title = pr.get("title", "")
|
|
body = pr.get("body", "") or ""
|
|
pr_num = pr["number"]
|
|
|
|
cycle = extract_cycle_number(title)
|
|
if cycle is None:
|
|
cycle_counter += 1
|
|
cycle = cycle_counter
|
|
else:
|
|
cycle_counter = max(cycle_counter, cycle)
|
|
|
|
issue = extract_issue_number(title, body)
|
|
issue_type = classify_pr(title, body)
|
|
duration = estimate_duration(pr)
|
|
diff = get_pr_diff_stats(token, pr_num)
|
|
|
|
merged_at = pr.get("merged_at", "")
|
|
|
|
entry = {
|
|
"timestamp": merged_at,
|
|
"cycle": cycle,
|
|
"issue": issue,
|
|
"type": issue_type,
|
|
"success": True, # it merged, so it succeeded
|
|
"duration": duration,
|
|
"tests_passed": 0, # can't recover this
|
|
"tests_added": 0,
|
|
"files_changed": diff["changed_files"],
|
|
"lines_added": diff["additions"],
|
|
"lines_removed": diff["deletions"],
|
|
"kimi_panes": 0,
|
|
"pr": pr_num,
|
|
"reason": "",
|
|
"notes": f"backfilled from PR#{pr_num}: {title[:80]}",
|
|
}
|
|
entries.append(entry)
|
|
print(f" PR#{pr_num:>3d} cycle={cycle:>3d} #{issue or '-':<5} "
|
|
f"+{diff['additions']:<5d} -{diff['deletions']:<5d} {issue_type:<8s} "
|
|
f"{title[:50]}")
|
|
|
|
# Write cycles.jsonl
|
|
RETRO_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(RETRO_FILE, "w") as f:
|
|
for entry in entries:
|
|
f.write(json.dumps(entry) + "\n")
|
|
print(f"\n[backfill] Wrote {len(entries)} entries to {RETRO_FILE}")
|
|
|
|
# Generate summary
|
|
generate_summary(entries)
|
|
print(f"[backfill] Wrote summary to {SUMMARY_FILE}")
|
|
|
|
|
|
def generate_summary(entries: list[dict]):
|
|
"""Compute rolling summary from entries."""
|
|
window = 50
|
|
recent = entries[-window:]
|
|
if not recent:
|
|
return
|
|
|
|
successes = [e for e in recent if e.get("success")]
|
|
durations = [e["duration"] for e in recent if e.get("duration", 0) > 0]
|
|
|
|
type_stats: dict[str, dict] = {}
|
|
for e in recent:
|
|
t = e.get("type", "unknown")
|
|
if t not in type_stats:
|
|
type_stats[t] = {"count": 0, "success": 0, "total_duration": 0}
|
|
type_stats[t]["count"] += 1
|
|
if e.get("success"):
|
|
type_stats[t]["success"] += 1
|
|
type_stats[t]["total_duration"] += e.get("duration", 0)
|
|
|
|
for t, stats in type_stats.items():
|
|
if stats["count"] > 0:
|
|
stats["success_rate"] = round(stats["success"] / stats["count"], 2)
|
|
stats["avg_duration"] = round(stats["total_duration"] / stats["count"])
|
|
|
|
summary = {
|
|
"updated_at": datetime.now(timezone.utc).isoformat(),
|
|
"window": len(recent),
|
|
"total_cycles": len(entries),
|
|
"success_rate": round(len(successes) / len(recent), 2) if recent else 0,
|
|
"avg_duration_seconds": round(sum(durations) / len(durations)) if durations else 0,
|
|
"total_lines_added": sum(e.get("lines_added", 0) for e in recent),
|
|
"total_lines_removed": sum(e.get("lines_removed", 0) for e in recent),
|
|
"total_prs_merged": sum(1 for e in recent if e.get("pr")),
|
|
"by_type": type_stats,
|
|
"quarantine_candidates": {},
|
|
"recent_failures": [],
|
|
}
|
|
|
|
SUMMARY_FILE.write_text(json.dumps(summary, indent=2) + "\n")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|