#!/usr/bin/env python3 """Backfill cycle retrospective data from Gitea merged PRs and git log. One-time script to seed .loop/retro/cycles.jsonl and summary.json from existing history so the LOOPSTAT panel isn't empty. """ import json import os import re import subprocess from datetime import datetime, timezone from pathlib import Path from urllib.request import Request, urlopen REPO_ROOT = Path(__file__).resolve().parent.parent RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl" SUMMARY_FILE = REPO_ROOT / ".loop" / "retro" / "summary.json" def _get_gitea_api() -> str: """Read Gitea API URL from env var, then ~/.hermes/gitea_api file, then default.""" # Check env vars first (TIMMY_GITEA_API is preferred, GITEA_API for compatibility) api_url = os.environ.get("TIMMY_GITEA_API") or os.environ.get("GITEA_API") if api_url: return api_url # Check ~/.hermes/gitea_api file api_file = Path.home() / ".hermes" / "gitea_api" if api_file.exists(): return api_file.read_text().strip() # Default fallback return "http://localhost:3000/api/v1" GITEA_API = _get_gitea_api() REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard") TOKEN_FILE = Path.home() / ".hermes" / "gitea_token" TAG_RE = re.compile(r"\[([^\]]+)\]") CYCLE_RE = re.compile(r"\[loop-cycle-(\d+)\]", re.IGNORECASE) ISSUE_RE = re.compile(r"#(\d+)") def get_token() -> str: return TOKEN_FILE.read_text().strip() def api_get(path: str, token: str) -> list | dict: url = f"{GITEA_API}/repos/{REPO_SLUG}/{path}" req = Request(url, headers={ "Authorization": f"token {token}", "Accept": "application/json", }) with urlopen(req, timeout=15) as resp: return json.loads(resp.read()) def get_all_merged_prs(token: str) -> list[dict]: """Fetch all merged PRs from Gitea.""" all_prs = [] page = 1 while True: batch = api_get(f"pulls?state=closed&sort=created&limit=50&page={page}", token) if not batch: break merged = [p for p in batch if p.get("merged")] all_prs.extend(merged) if len(batch) < 50: break page += 1 return all_prs def get_pr_diff_stats(token: str, pr_number: int) -> dict: """Get diff stats for a PR.""" try: pr = api_get(f"pulls/{pr_number}", token) return { "additions": pr.get("additions", 0), "deletions": pr.get("deletions", 0), "changed_files": pr.get("changed_files", 0), } except Exception: return {"additions": 0, "deletions": 0, "changed_files": 0} def classify_pr(title: str, body: str) -> str: """Guess issue type from PR title/body.""" tags = set() for match in TAG_RE.finditer(title): tags.add(match.group(1).lower()) lower = title.lower() if "fix" in lower or "bug" in tags: return "bug" elif "feat" in lower or "feature" in tags: return "feature" elif "refactor" in lower or "refactor" in tags: return "refactor" elif "test" in lower: return "feature" elif "policy" in lower or "chore" in lower: return "refactor" return "unknown" def extract_cycle_number(title: str) -> int | None: m = CYCLE_RE.search(title) return int(m.group(1)) if m else None def extract_issue_number(title: str, body: str, pr_number: int | None = None) -> int | None: """Extract the issue number from PR body/title, ignoring the PR number itself. Gitea appends "(#N)" to PR titles where N is the PR number — skip that so we don't confuse it with the linked issue. """ for text in [body or "", title]: for m in ISSUE_RE.finditer(text): num = int(m.group(1)) if num != pr_number: return num return None def estimate_duration(pr: dict) -> int: """Estimate cycle duration from PR created_at to merged_at.""" try: created = datetime.fromisoformat(pr["created_at"].replace("Z", "+00:00")) merged = datetime.fromisoformat(pr["merged_at"].replace("Z", "+00:00")) delta = (merged - created).total_seconds() # Cap at 1200s (max cycle time) — some PRs sit open for days return min(int(delta), 1200) except (KeyError, ValueError, TypeError): return 0 def main(): token = get_token() print("[backfill] Fetching merged PRs from Gitea...") prs = get_all_merged_prs(token) print(f"[backfill] Found {len(prs)} merged PRs") # Sort oldest first prs.sort(key=lambda p: p.get("merged_at", "")) entries = [] cycle_counter = 0 for pr in prs: title = pr.get("title", "") body = pr.get("body", "") or "" pr_num = pr["number"] cycle = extract_cycle_number(title) if cycle is None: cycle_counter += 1 cycle = cycle_counter else: cycle_counter = max(cycle_counter, cycle) issue = extract_issue_number(title, body, pr_number=pr_num) issue_type = classify_pr(title, body) duration = estimate_duration(pr) diff = get_pr_diff_stats(token, pr_num) merged_at = pr.get("merged_at", "") entry = { "timestamp": merged_at, "cycle": cycle, "issue": issue, "type": issue_type, "success": True, # it merged, so it succeeded "duration": duration, "tests_passed": 0, # can't recover this "tests_added": 0, "files_changed": diff["changed_files"], "lines_added": diff["additions"], "lines_removed": diff["deletions"], "kimi_panes": 0, "pr": pr_num, "reason": "", "notes": f"backfilled from PR#{pr_num}: {title[:80]}", } entries.append(entry) print(f" PR#{pr_num:>3d} cycle={cycle:>3d} #{issue or '-':<5} " f"+{diff['additions']:<5d} -{diff['deletions']:<5d} {issue_type:<8s} " f"{title[:50]}") # Write cycles.jsonl RETRO_FILE.parent.mkdir(parents=True, exist_ok=True) with open(RETRO_FILE, "w") as f: for entry in entries: f.write(json.dumps(entry) + "\n") print(f"\n[backfill] Wrote {len(entries)} entries to {RETRO_FILE}") # Generate summary generate_summary(entries) print(f"[backfill] Wrote summary to {SUMMARY_FILE}") def generate_summary(entries: list[dict]): """Compute rolling summary from entries.""" window = 50 recent = entries[-window:] if not recent: return successes = [e for e in recent if e.get("success")] durations = [e["duration"] for e in recent if e.get("duration", 0) > 0] type_stats: dict[str, dict] = {} for e in recent: t = e.get("type", "unknown") if t not in type_stats: type_stats[t] = {"count": 0, "success": 0, "total_duration": 0} type_stats[t]["count"] += 1 if e.get("success"): type_stats[t]["success"] += 1 type_stats[t]["total_duration"] += e.get("duration", 0) for t, stats in type_stats.items(): if stats["count"] > 0: stats["success_rate"] = round(stats["success"] / stats["count"], 2) stats["avg_duration"] = round(stats["total_duration"] / stats["count"]) summary = { "updated_at": datetime.now(timezone.utc).isoformat(), "window": len(recent), "total_cycles": len(entries), "success_rate": round(len(successes) / len(recent), 2) if recent else 0, "avg_duration_seconds": round(sum(durations) / len(durations)) if durations else 0, "total_lines_added": sum(e.get("lines_added", 0) for e in recent), "total_lines_removed": sum(e.get("lines_removed", 0) for e in recent), "total_prs_merged": sum(1 for e in recent if e.get("pr")), "by_type": type_stats, "quarantine_candidates": {}, "recent_failures": [], } SUMMARY_FILE.write_text(json.dumps(summary, indent=2) + "\n") if __name__ == "__main__": main()