diff --git a/scripts/cycle_retro.py b/scripts/cycle_retro.py index 45ac320b..0803ce13 100644 --- a/scripts/cycle_retro.py +++ b/scripts/cycle_retro.py @@ -4,11 +4,26 @@ Called after each cycle completes (success or failure). Appends a structured entry to .loop/retro/cycles.jsonl. +EPOCH NOTATION (turnover system): + Each cycle carries a symbolic epoch tag alongside the raw integer: + + ⟳WW.D:NNN + + ⟳ turnover glyph — marks epoch-aware cycles + WW ISO week-of-year (01–53) + D ISO weekday (1=Mon … 7=Sun) + NNN daily cycle counter, zero-padded, resets at midnight UTC + + Example: ⟳12.3:042 — Week 12, Wednesday, 42nd cycle of the day. + + The raw `cycle` integer is preserved for backward compatibility. + The `epoch` field carries the symbolic notation. + SUCCESS DEFINITION: A cycle is only "success" if BOTH conditions are met: 1. The hermes process exited cleanly (exit code 0) 2. Main is green (smoke test passes on main after merge) - + A cycle that merges a PR but leaves main red is a FAILURE. The --main-green flag records the smoke test result. @@ -36,11 +51,52 @@ from pathlib import Path REPO_ROOT = Path(__file__).resolve().parent.parent RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl" SUMMARY_FILE = REPO_ROOT / ".loop" / "retro" / "summary.json" +EPOCH_COUNTER_FILE = REPO_ROOT / ".loop" / "retro" / ".epoch_counter" # How many recent entries to include in rolling summary SUMMARY_WINDOW = 50 +# ── Epoch turnover ──────────────────────────────────────────────────────── + +def _epoch_tag(now: datetime | None = None) -> tuple[str, dict]: + """Generate the symbolic epoch tag and advance the daily counter. + + Returns (epoch_string, epoch_parts) where epoch_parts is a dict with + week, weekday, daily_n for structured storage. + + The daily counter persists in .epoch_counter as a two-line file: + line 1: ISO date (YYYY-MM-DD) of the current epoch day + line 2: integer count + When the date rolls over, the counter resets to 1. + """ + if now is None: + now = datetime.now(timezone.utc) + + iso_cal = now.isocalendar() # (year, week, weekday) + week = iso_cal[1] + weekday = iso_cal[2] + today_str = now.strftime("%Y-%m-%d") + + # Read / reset daily counter + daily_n = 1 + EPOCH_COUNTER_FILE.parent.mkdir(parents=True, exist_ok=True) + if EPOCH_COUNTER_FILE.exists(): + try: + lines = EPOCH_COUNTER_FILE.read_text().strip().splitlines() + if len(lines) == 2 and lines[0] == today_str: + daily_n = int(lines[1]) + 1 + except (ValueError, IndexError): + pass # corrupt file — reset + + # Persist + EPOCH_COUNTER_FILE.write_text(f"{today_str}\n{daily_n}\n") + + tag = f"\u27f3{week:02d}.{weekday}:{daily_n:03d}" + parts = {"week": week, "weekday": weekday, "daily_n": daily_n} + return tag, parts + + def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser(description="Log a cycle retrospective") p.add_argument("--cycle", type=int, required=True) @@ -123,8 +179,30 @@ def update_summary() -> None: issue_failures[e["issue"]] = issue_failures.get(e["issue"], 0) + 1 quarantine_candidates = {k: v for k, v in issue_failures.items() if v >= 2} + # Epoch turnover stats — cycles per week/day from epoch-tagged entries + epoch_entries = [e for e in recent if e.get("epoch")] + by_week: dict[int, int] = {} + by_weekday: dict[int, int] = {} + for e in epoch_entries: + w = e.get("epoch_week") + d = e.get("epoch_weekday") + if w is not None: + by_week[w] = by_week.get(w, 0) + 1 + if d is not None: + by_weekday[d] = by_weekday.get(d, 0) + 1 + + # Current epoch — latest entry's epoch tag + current_epoch = epoch_entries[-1].get("epoch", "") if epoch_entries else "" + + # Weekday names for display + weekday_glyphs = {1: "Mon", 2: "Tue", 3: "Wed", 4: "Thu", + 5: "Fri", 6: "Sat", 7: "Sun"} + by_weekday_named = {weekday_glyphs.get(k, str(k)): v + for k, v in sorted(by_weekday.items())} + summary = { "updated_at": datetime.now(timezone.utc).isoformat(), + "current_epoch": current_epoch, "window": len(recent), "measured_cycles": len(measured), "total_cycles": len(entries), @@ -136,9 +214,12 @@ def update_summary() -> None: "total_lines_removed": sum(e.get("lines_removed", 0) for e in recent), "total_prs_merged": sum(1 for e in recent if e.get("pr")), "by_type": type_stats, + "by_week": dict(sorted(by_week.items())), + "by_weekday": by_weekday_named, "quarantine_candidates": quarantine_candidates, "recent_failures": [ - {"cycle": e["cycle"], "issue": e.get("issue"), "reason": e.get("reason", "")} + {"cycle": e["cycle"], "epoch": e.get("epoch", ""), + "issue": e.get("issue"), "reason": e.get("reason", "")} for e in failures[-5:] ], } @@ -157,9 +238,17 @@ def main() -> None: # A cycle is only truly successful if hermes exited clean AND main is green truly_success = args.success and args.main_green + # Generate epoch turnover tag + now = datetime.now(timezone.utc) + epoch_tag, epoch_parts = _epoch_tag(now) + entry = { - "timestamp": datetime.now(timezone.utc).isoformat(), + "timestamp": now.isoformat(), "cycle": args.cycle, + "epoch": epoch_tag, + "epoch_week": epoch_parts["week"], + "epoch_weekday": epoch_parts["weekday"], + "epoch_daily_n": epoch_parts["daily_n"], "issue": args.issue, "type": args.type, "success": truly_success, @@ -184,7 +273,7 @@ def main() -> None: update_summary() status = "✓ SUCCESS" if args.success else "✗ FAILURE" - print(f"[retro] Cycle {args.cycle} {status}", end="") + print(f"[retro] {epoch_tag} Cycle {args.cycle} {status}", end="") if args.issue: print(f" (#{args.issue} {args.type})", end="") if args.duration: diff --git a/scripts/loop_introspect.py b/scripts/loop_introspect.py new file mode 100644 index 00000000..2af9c7a3 --- /dev/null +++ b/scripts/loop_introspect.py @@ -0,0 +1,407 @@ +#!/usr/bin/env python3 +"""Loop introspection — the self-improvement engine. + +Analyzes retro data across time windows to detect trends, extract patterns, +and produce structured recommendations. Output is consumed by deep_triage +and injected into the loop prompt context. + +This is the piece that closes the feedback loop: + cycle_retro → introspect → deep_triage → loop behavior changes + +Run: python3 scripts/loop_introspect.py +Output: .loop/retro/insights.json (structured insights + recommendations) + Prints human-readable summary to stdout. + +Called by: deep_triage.sh (before the LLM triage), timmy-loop.sh (every 50 cycles) +""" + +from __future__ import annotations + +import json +import sys +from collections import defaultdict +from datetime import datetime, timezone, timedelta +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parent.parent +CYCLES_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl" +DEEP_TRIAGE_FILE = REPO_ROOT / ".loop" / "retro" / "deep-triage.jsonl" +TRIAGE_FILE = REPO_ROOT / ".loop" / "retro" / "triage.jsonl" +QUARANTINE_FILE = REPO_ROOT / ".loop" / "quarantine.json" +INSIGHTS_FILE = REPO_ROOT / ".loop" / "retro" / "insights.json" + +# ── Helpers ────────────────────────────────────────────────────────────── + +def load_jsonl(path: Path) -> list[dict]: + """Load a JSONL file, skipping bad lines.""" + if not path.exists(): + return [] + entries = [] + for line in path.read_text().strip().splitlines(): + try: + entries.append(json.loads(line)) + except (json.JSONDecodeError, ValueError): + continue + return entries + + +def parse_ts(ts_str: str) -> datetime | None: + """Parse an ISO timestamp, tolerating missing tz.""" + if not ts_str: + return None + try: + dt = datetime.fromisoformat(ts_str.replace("Z", "+00:00")) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + except (ValueError, TypeError): + return None + + +def window(entries: list[dict], days: int) -> list[dict]: + """Filter entries to the last N days.""" + cutoff = datetime.now(timezone.utc) - timedelta(days=days) + result = [] + for e in entries: + ts = parse_ts(e.get("timestamp", "")) + if ts and ts >= cutoff: + result.append(e) + return result + + +# ── Analysis functions ─────────────────────────────────────────────────── + +def compute_trends(cycles: list[dict]) -> dict: + """Compare recent window (last 7d) vs older window (7-14d ago).""" + recent = window(cycles, 7) + older = window(cycles, 14) + # Remove recent from older to get the 7-14d window + recent_set = {(e.get("cycle"), e.get("timestamp")) for e in recent} + older = [e for e in older if (e.get("cycle"), e.get("timestamp")) not in recent_set] + + def stats(entries): + if not entries: + return {"count": 0, "success_rate": None, "avg_duration": None, + "lines_net": 0, "prs_merged": 0} + successes = sum(1 for e in entries if e.get("success")) + durations = [e["duration"] for e in entries if e.get("duration", 0) > 0] + return { + "count": len(entries), + "success_rate": round(successes / len(entries), 3) if entries else None, + "avg_duration": round(sum(durations) / len(durations)) if durations else None, + "lines_net": sum(e.get("lines_added", 0) - e.get("lines_removed", 0) for e in entries), + "prs_merged": sum(1 for e in entries if e.get("pr")), + } + + recent_stats = stats(recent) + older_stats = stats(older) + + trend = { + "recent_7d": recent_stats, + "previous_7d": older_stats, + "velocity_change": None, + "success_rate_change": None, + "duration_change": None, + } + + if recent_stats["count"] and older_stats["count"]: + trend["velocity_change"] = recent_stats["count"] - older_stats["count"] + if recent_stats["success_rate"] is not None and older_stats["success_rate"] is not None: + trend["success_rate_change"] = round( + recent_stats["success_rate"] - older_stats["success_rate"], 3 + ) + if recent_stats["avg_duration"] is not None and older_stats["avg_duration"] is not None: + trend["duration_change"] = recent_stats["avg_duration"] - older_stats["avg_duration"] + + return trend + + +def type_analysis(cycles: list[dict]) -> dict: + """Per-type success rates and durations.""" + by_type: dict[str, list[dict]] = defaultdict(list) + for c in cycles: + by_type[c.get("type", "unknown")].append(c) + + result = {} + for t, entries in by_type.items(): + durations = [e["duration"] for e in entries if e.get("duration", 0) > 0] + successes = sum(1 for e in entries if e.get("success")) + result[t] = { + "count": len(entries), + "success_rate": round(successes / len(entries), 3) if entries else 0, + "avg_duration": round(sum(durations) / len(durations)) if durations else 0, + "max_duration": max(durations) if durations else 0, + } + return result + + +def repeat_failures(cycles: list[dict]) -> list[dict]: + """Issues that have failed multiple times — quarantine candidates.""" + failures: dict[int, list] = defaultdict(list) + for c in cycles: + if not c.get("success") and c.get("issue"): + failures[c["issue"]].append({ + "cycle": c.get("cycle"), + "reason": c.get("reason", ""), + "duration": c.get("duration", 0), + }) + # Only issues with 2+ failures + return [ + {"issue": k, "failure_count": len(v), "attempts": v} + for k, v in sorted(failures.items(), key=lambda x: -len(x[1])) + if len(v) >= 2 + ] + + +def duration_outliers(cycles: list[dict], threshold_multiple: float = 3.0) -> list[dict]: + """Cycles that took way longer than average — something went wrong.""" + durations = [c["duration"] for c in cycles if c.get("duration", 0) > 0] + if len(durations) < 5: + return [] + avg = sum(durations) / len(durations) + threshold = avg * threshold_multiple + + outliers = [] + for c in cycles: + dur = c.get("duration", 0) + if dur > threshold: + outliers.append({ + "cycle": c.get("cycle"), + "issue": c.get("issue"), + "type": c.get("type"), + "duration": dur, + "avg_duration": round(avg), + "multiple": round(dur / avg, 1) if avg > 0 else 0, + "reason": c.get("reason", ""), + }) + return outliers + + +def triage_effectiveness(deep_triages: list[dict]) -> dict: + """How well is the deep triage performing?""" + if not deep_triages: + return {"runs": 0, "note": "No deep triage data yet"} + + total_reviewed = sum(d.get("issues_reviewed", 0) for d in deep_triages) + total_refined = sum(len(d.get("issues_refined", [])) for d in deep_triages) + total_created = sum(len(d.get("issues_created", [])) for d in deep_triages) + total_closed = sum(len(d.get("issues_closed", [])) for d in deep_triages) + timmy_available = sum(1 for d in deep_triages if d.get("timmy_available")) + + # Extract Timmy's feedback themes + timmy_themes = [] + for d in deep_triages: + fb = d.get("timmy_feedback", "") + if fb: + timmy_themes.append(fb[:200]) + + return { + "runs": len(deep_triages), + "total_reviewed": total_reviewed, + "total_refined": total_refined, + "total_created": total_created, + "total_closed": total_closed, + "timmy_consultation_rate": round(timmy_available / len(deep_triages), 2), + "timmy_recent_feedback": timmy_themes[-1] if timmy_themes else "", + "timmy_feedback_history": timmy_themes, + } + + +def generate_recommendations( + trends: dict, + types: dict, + repeats: list, + outliers: list, + triage_eff: dict, +) -> list[dict]: + """Produce actionable recommendations from the analysis.""" + recs = [] + + # 1. Success rate declining? + src = trends.get("success_rate_change") + if src is not None and src < -0.1: + recs.append({ + "severity": "high", + "category": "reliability", + "finding": f"Success rate dropped {abs(src)*100:.0f}pp in the last 7 days", + "recommendation": "Review recent failures. Are issues poorly scoped? " + "Is main unstable? Check if triage is producing bad work items.", + }) + + # 2. Velocity dropping? + vc = trends.get("velocity_change") + if vc is not None and vc < -5: + recs.append({ + "severity": "medium", + "category": "throughput", + "finding": f"Velocity dropped by {abs(vc)} cycles vs previous week", + "recommendation": "Check for loop stalls, long-running cycles, or queue starvation.", + }) + + # 3. Duration creep? + dc = trends.get("duration_change") + if dc is not None and dc > 120: # 2+ minutes longer + recs.append({ + "severity": "medium", + "category": "efficiency", + "finding": f"Average cycle duration increased by {dc}s vs previous week", + "recommendation": "Issues may be growing in scope. Enforce tighter decomposition " + "in deep triage. Check if tests are getting slower.", + }) + + # 4. Type-specific problems + for t, info in types.items(): + if info["count"] >= 3 and info["success_rate"] < 0.5: + recs.append({ + "severity": "high", + "category": "type_reliability", + "finding": f"'{t}' issues fail {(1-info['success_rate'])*100:.0f}% of the time " + f"({info['count']} attempts)", + "recommendation": f"'{t}' issues need better scoping or different approach. " + f"Consider: tighter acceptance criteria, smaller scope, " + f"or delegating to Kimi with more context.", + }) + if info["avg_duration"] > 600 and info["count"] >= 3: # >10 min avg + recs.append({ + "severity": "medium", + "category": "type_efficiency", + "finding": f"'{t}' issues average {info['avg_duration']//60}m{info['avg_duration']%60}s " + f"(max {info['max_duration']//60}m)", + "recommendation": f"Break '{t}' issues into smaller pieces. Target <5 min per cycle.", + }) + + # 5. Repeat failures + for rf in repeats[:3]: + recs.append({ + "severity": "high", + "category": "repeat_failure", + "finding": f"Issue #{rf['issue']} has failed {rf['failure_count']} times", + "recommendation": "Quarantine or rewrite this issue. Repeated failure = " + "bad scope or missing prerequisite.", + }) + + # 6. Outliers + if len(outliers) > 2: + recs.append({ + "severity": "medium", + "category": "outliers", + "finding": f"{len(outliers)} cycles took {outliers[0].get('multiple', '?')}x+ " + f"longer than average", + "recommendation": "Long cycles waste resources. Add timeout enforcement or " + "break complex issues earlier.", + }) + + # 7. Code growth + recent = trends.get("recent_7d", {}) + net = recent.get("lines_net", 0) + if net > 500: + recs.append({ + "severity": "low", + "category": "code_health", + "finding": f"Net +{net} lines added in the last 7 days", + "recommendation": "Lines of code is a liability. Balance feature work with " + "refactoring. Target net-zero or negative line growth.", + }) + + # 8. Triage health + if triage_eff.get("runs", 0) == 0: + recs.append({ + "severity": "high", + "category": "triage", + "finding": "Deep triage has never run", + "recommendation": "Enable deep triage (every 20 cycles). The loop needs " + "LLM-driven issue refinement to stay effective.", + }) + + # No recommendations = things are healthy + if not recs: + recs.append({ + "severity": "info", + "category": "health", + "finding": "No significant issues detected", + "recommendation": "System is healthy. Continue current patterns.", + }) + + return recs + + +# ── Main ───────────────────────────────────────────────────────────────── + +def main() -> None: + cycles = load_jsonl(CYCLES_FILE) + deep_triages = load_jsonl(DEEP_TRIAGE_FILE) + + if not cycles: + print("[introspect] No cycle data found. Nothing to analyze.") + return + + # Run all analyses + trends = compute_trends(cycles) + types = type_analysis(cycles) + repeats = repeat_failures(cycles) + outliers = duration_outliers(cycles) + triage_eff = triage_effectiveness(deep_triages) + recommendations = generate_recommendations(trends, types, repeats, outliers, triage_eff) + + insights = { + "generated_at": datetime.now(timezone.utc).isoformat(), + "total_cycles_analyzed": len(cycles), + "trends": trends, + "by_type": types, + "repeat_failures": repeats[:5], + "duration_outliers": outliers[:5], + "triage_effectiveness": triage_eff, + "recommendations": recommendations, + } + + # Write insights + INSIGHTS_FILE.parent.mkdir(parents=True, exist_ok=True) + INSIGHTS_FILE.write_text(json.dumps(insights, indent=2) + "\n") + + # Current epoch from latest entry + latest_epoch = "" + for c in reversed(cycles): + if c.get("epoch"): + latest_epoch = c["epoch"] + break + + # Human-readable output + header = f"[introspect] Analyzed {len(cycles)} cycles" + if latest_epoch: + header += f" · current epoch: {latest_epoch}" + print(header) + + print(f"\n TRENDS (7d vs previous 7d):") + r7 = trends["recent_7d"] + p7 = trends["previous_7d"] + print(f" Cycles: {r7['count']:>3d} (was {p7['count']})") + if r7["success_rate"] is not None: + arrow = "↑" if (trends["success_rate_change"] or 0) > 0 else "↓" if (trends["success_rate_change"] or 0) < 0 else "→" + print(f" Success rate: {r7['success_rate']*100:>4.0f}% {arrow}") + if r7["avg_duration"] is not None: + print(f" Avg duration: {r7['avg_duration']//60}m{r7['avg_duration']%60:02d}s") + print(f" PRs merged: {r7['prs_merged']:>3d} (was {p7['prs_merged']})") + print(f" Lines net: {r7['lines_net']:>+5d}") + + print(f"\n BY TYPE:") + for t, info in sorted(types.items(), key=lambda x: -x[1]["count"]): + print(f" {t:12s} n={info['count']:>2d} " + f"ok={info['success_rate']*100:>3.0f}% " + f"avg={info['avg_duration']//60}m{info['avg_duration']%60:02d}s") + + if repeats: + print(f"\n REPEAT FAILURES:") + for rf in repeats[:3]: + print(f" #{rf['issue']} failed {rf['failure_count']}x") + + print(f"\n RECOMMENDATIONS ({len(recommendations)}):") + for i, rec in enumerate(recommendations, 1): + sev = {"high": "🔴", "medium": "🟡", "low": "🟢", "info": "ℹ️ "}.get(rec["severity"], "?") + print(f" {sev} {rec['finding']}") + print(f" → {rec['recommendation']}") + + print(f"\n Written to: {INSIGHTS_FILE}") + + +if __name__ == "__main__": + main()