#!/usr/bin/env python3 """Loop introspection — the self-improvement engine. Analyzes retro data across time windows to detect trends, extract patterns, and produce structured recommendations. Output is consumed by deep_triage and injected into the loop prompt context. This is the piece that closes the feedback loop: cycle_retro → introspect → deep_triage → loop behavior changes Run: python3 scripts/loop_introspect.py Output: .loop/retro/insights.json (structured insights + recommendations) Prints human-readable summary to stdout. Called by: deep_triage.sh (before the LLM triage), timmy-loop.sh (every 50 cycles) """ from __future__ import annotations import json import sys from collections import defaultdict from datetime import datetime, timezone, timedelta from pathlib import Path REPO_ROOT = Path(__file__).resolve().parent.parent CYCLES_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl" DEEP_TRIAGE_FILE = REPO_ROOT / ".loop" / "retro" / "deep-triage.jsonl" TRIAGE_FILE = REPO_ROOT / ".loop" / "retro" / "triage.jsonl" QUARANTINE_FILE = REPO_ROOT / ".loop" / "quarantine.json" INSIGHTS_FILE = REPO_ROOT / ".loop" / "retro" / "insights.json" # ── Helpers ────────────────────────────────────────────────────────────── def load_jsonl(path: Path) -> list[dict]: """Load a JSONL file, skipping bad lines.""" if not path.exists(): return [] entries = [] for line in path.read_text().strip().splitlines(): try: entries.append(json.loads(line)) except (json.JSONDecodeError, ValueError): continue return entries def parse_ts(ts_str: str) -> datetime | None: """Parse an ISO timestamp, tolerating missing tz.""" if not ts_str: return None try: dt = datetime.fromisoformat(ts_str.replace("Z", "+00:00")) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt except (ValueError, TypeError): return None def window(entries: list[dict], days: int) -> list[dict]: """Filter entries to the last N days.""" cutoff = datetime.now(timezone.utc) - timedelta(days=days) result = [] for e in entries: ts = parse_ts(e.get("timestamp", "")) if ts and ts >= cutoff: result.append(e) return result # ── Analysis functions ─────────────────────────────────────────────────── def compute_trends(cycles: list[dict]) -> dict: """Compare recent window (last 7d) vs older window (7-14d ago).""" recent = window(cycles, 7) older = window(cycles, 14) # Remove recent from older to get the 7-14d window recent_set = {(e.get("cycle"), e.get("timestamp")) for e in recent} older = [e for e in older if (e.get("cycle"), e.get("timestamp")) not in recent_set] def stats(entries): if not entries: return {"count": 0, "success_rate": None, "avg_duration": None, "lines_net": 0, "prs_merged": 0} successes = sum(1 for e in entries if e.get("success")) durations = [e["duration"] for e in entries if e.get("duration", 0) > 0] return { "count": len(entries), "success_rate": round(successes / len(entries), 3) if entries else None, "avg_duration": round(sum(durations) / len(durations)) if durations else None, "lines_net": sum(e.get("lines_added", 0) - e.get("lines_removed", 0) for e in entries), "prs_merged": sum(1 for e in entries if e.get("pr")), } recent_stats = stats(recent) older_stats = stats(older) trend = { "recent_7d": recent_stats, "previous_7d": older_stats, "velocity_change": None, "success_rate_change": None, "duration_change": None, } if recent_stats["count"] and older_stats["count"]: trend["velocity_change"] = recent_stats["count"] - older_stats["count"] if recent_stats["success_rate"] is not None and older_stats["success_rate"] is not None: trend["success_rate_change"] = round( recent_stats["success_rate"] - older_stats["success_rate"], 3 ) if recent_stats["avg_duration"] is not None and older_stats["avg_duration"] is not None: trend["duration_change"] = recent_stats["avg_duration"] - older_stats["avg_duration"] return trend def type_analysis(cycles: list[dict]) -> dict: """Per-type success rates and durations.""" by_type: dict[str, list[dict]] = defaultdict(list) for c in cycles: by_type[c.get("type", "unknown")].append(c) result = {} for t, entries in by_type.items(): durations = [e["duration"] for e in entries if e.get("duration", 0) > 0] successes = sum(1 for e in entries if e.get("success")) result[t] = { "count": len(entries), "success_rate": round(successes / len(entries), 3) if entries else 0, "avg_duration": round(sum(durations) / len(durations)) if durations else 0, "max_duration": max(durations) if durations else 0, } return result def repeat_failures(cycles: list[dict]) -> list[dict]: """Issues that have failed multiple times — quarantine candidates.""" failures: dict[int, list] = defaultdict(list) for c in cycles: if not c.get("success") and c.get("issue"): failures[c["issue"]].append({ "cycle": c.get("cycle"), "reason": c.get("reason", ""), "duration": c.get("duration", 0), }) # Only issues with 2+ failures return [ {"issue": k, "failure_count": len(v), "attempts": v} for k, v in sorted(failures.items(), key=lambda x: -len(x[1])) if len(v) >= 2 ] def duration_outliers(cycles: list[dict], threshold_multiple: float = 3.0) -> list[dict]: """Cycles that took way longer than average — something went wrong.""" durations = [c["duration"] for c in cycles if c.get("duration", 0) > 0] if len(durations) < 5: return [] avg = sum(durations) / len(durations) threshold = avg * threshold_multiple outliers = [] for c in cycles: dur = c.get("duration", 0) if dur > threshold: outliers.append({ "cycle": c.get("cycle"), "issue": c.get("issue"), "type": c.get("type"), "duration": dur, "avg_duration": round(avg), "multiple": round(dur / avg, 1) if avg > 0 else 0, "reason": c.get("reason", ""), }) return outliers def triage_effectiveness(deep_triages: list[dict]) -> dict: """How well is the deep triage performing?""" if not deep_triages: return {"runs": 0, "note": "No deep triage data yet"} total_reviewed = sum(d.get("issues_reviewed", 0) for d in deep_triages) total_refined = sum(len(d.get("issues_refined", [])) for d in deep_triages) total_created = sum(len(d.get("issues_created", [])) for d in deep_triages) total_closed = sum(len(d.get("issues_closed", [])) for d in deep_triages) timmy_available = sum(1 for d in deep_triages if d.get("timmy_available")) # Extract Timmy's feedback themes timmy_themes = [] for d in deep_triages: fb = d.get("timmy_feedback", "") if fb: timmy_themes.append(fb[:200]) return { "runs": len(deep_triages), "total_reviewed": total_reviewed, "total_refined": total_refined, "total_created": total_created, "total_closed": total_closed, "timmy_consultation_rate": round(timmy_available / len(deep_triages), 2), "timmy_recent_feedback": timmy_themes[-1] if timmy_themes else "", "timmy_feedback_history": timmy_themes, } def generate_recommendations( trends: dict, types: dict, repeats: list, outliers: list, triage_eff: dict, ) -> list[dict]: """Produce actionable recommendations from the analysis.""" recs = [] # 1. Success rate declining? src = trends.get("success_rate_change") if src is not None and src < -0.1: recs.append({ "severity": "high", "category": "reliability", "finding": f"Success rate dropped {abs(src)*100:.0f}pp in the last 7 days", "recommendation": "Review recent failures. Are issues poorly scoped? " "Is main unstable? Check if triage is producing bad work items.", }) # 2. Velocity dropping? vc = trends.get("velocity_change") if vc is not None and vc < -5: recs.append({ "severity": "medium", "category": "throughput", "finding": f"Velocity dropped by {abs(vc)} cycles vs previous week", "recommendation": "Check for loop stalls, long-running cycles, or queue starvation.", }) # 3. Duration creep? dc = trends.get("duration_change") if dc is not None and dc > 120: # 2+ minutes longer recs.append({ "severity": "medium", "category": "efficiency", "finding": f"Average cycle duration increased by {dc}s vs previous week", "recommendation": "Issues may be growing in scope. Enforce tighter decomposition " "in deep triage. Check if tests are getting slower.", }) # 4. Type-specific problems for t, info in types.items(): if info["count"] >= 3 and info["success_rate"] < 0.5: recs.append({ "severity": "high", "category": "type_reliability", "finding": f"'{t}' issues fail {(1-info['success_rate'])*100:.0f}% of the time " f"({info['count']} attempts)", "recommendation": f"'{t}' issues need better scoping or different approach. " f"Consider: tighter acceptance criteria, smaller scope, " f"or delegating to Kimi with more context.", }) if info["avg_duration"] > 600 and info["count"] >= 3: # >10 min avg recs.append({ "severity": "medium", "category": "type_efficiency", "finding": f"'{t}' issues average {info['avg_duration']//60}m{info['avg_duration']%60}s " f"(max {info['max_duration']//60}m)", "recommendation": f"Break '{t}' issues into smaller pieces. Target <5 min per cycle.", }) # 5. Repeat failures for rf in repeats[:3]: recs.append({ "severity": "high", "category": "repeat_failure", "finding": f"Issue #{rf['issue']} has failed {rf['failure_count']} times", "recommendation": "Quarantine or rewrite this issue. Repeated failure = " "bad scope or missing prerequisite.", }) # 6. Outliers if len(outliers) > 2: recs.append({ "severity": "medium", "category": "outliers", "finding": f"{len(outliers)} cycles took {outliers[0].get('multiple', '?')}x+ " f"longer than average", "recommendation": "Long cycles waste resources. Add timeout enforcement or " "break complex issues earlier.", }) # 7. Code growth recent = trends.get("recent_7d", {}) net = recent.get("lines_net", 0) if net > 500: recs.append({ "severity": "low", "category": "code_health", "finding": f"Net +{net} lines added in the last 7 days", "recommendation": "Lines of code is a liability. Balance feature work with " "refactoring. Target net-zero or negative line growth.", }) # 8. Triage health if triage_eff.get("runs", 0) == 0: recs.append({ "severity": "high", "category": "triage", "finding": "Deep triage has never run", "recommendation": "Enable deep triage (every 20 cycles). The loop needs " "LLM-driven issue refinement to stay effective.", }) # No recommendations = things are healthy if not recs: recs.append({ "severity": "info", "category": "health", "finding": "No significant issues detected", "recommendation": "System is healthy. Continue current patterns.", }) return recs # ── Main ───────────────────────────────────────────────────────────────── def main() -> None: cycles = load_jsonl(CYCLES_FILE) deep_triages = load_jsonl(DEEP_TRIAGE_FILE) if not cycles: print("[introspect] No cycle data found. Nothing to analyze.") return # Run all analyses trends = compute_trends(cycles) types = type_analysis(cycles) repeats = repeat_failures(cycles) outliers = duration_outliers(cycles) triage_eff = triage_effectiveness(deep_triages) recommendations = generate_recommendations(trends, types, repeats, outliers, triage_eff) insights = { "generated_at": datetime.now(timezone.utc).isoformat(), "total_cycles_analyzed": len(cycles), "trends": trends, "by_type": types, "repeat_failures": repeats[:5], "duration_outliers": outliers[:5], "triage_effectiveness": triage_eff, "recommendations": recommendations, } # Write insights INSIGHTS_FILE.parent.mkdir(parents=True, exist_ok=True) INSIGHTS_FILE.write_text(json.dumps(insights, indent=2) + "\n") # Current epoch from latest entry latest_epoch = "" for c in reversed(cycles): if c.get("epoch"): latest_epoch = c["epoch"] break # Human-readable output header = f"[introspect] Analyzed {len(cycles)} cycles" if latest_epoch: header += f" · current epoch: {latest_epoch}" print(header) print(f"\n TRENDS (7d vs previous 7d):") r7 = trends["recent_7d"] p7 = trends["previous_7d"] print(f" Cycles: {r7['count']:>3d} (was {p7['count']})") if r7["success_rate"] is not None: arrow = "↑" if (trends["success_rate_change"] or 0) > 0 else "↓" if (trends["success_rate_change"] or 0) < 0 else "→" print(f" Success rate: {r7['success_rate']*100:>4.0f}% {arrow}") if r7["avg_duration"] is not None: print(f" Avg duration: {r7['avg_duration']//60}m{r7['avg_duration']%60:02d}s") print(f" PRs merged: {r7['prs_merged']:>3d} (was {p7['prs_merged']})") print(f" Lines net: {r7['lines_net']:>+5d}") print(f"\n BY TYPE:") for t, info in sorted(types.items(), key=lambda x: -x[1]["count"]): print(f" {t:12s} n={info['count']:>2d} " f"ok={info['success_rate']*100:>3.0f}% " f"avg={info['avg_duration']//60}m{info['avg_duration']%60:02d}s") if repeats: print(f"\n REPEAT FAILURES:") for rf in repeats[:3]: print(f" #{rf['issue']} failed {rf['failure_count']}x") print(f"\n RECOMMENDATIONS ({len(recommendations)}):") for i, rec in enumerate(recommendations, 1): sev = {"high": "🔴", "medium": "🟡", "low": "🟢", "info": "ℹ️ "}.get(rec["severity"], "?") print(f" {sev} {rec['finding']}") print(f" → {rec['recommendation']}") print(f"\n Written to: {INSIGHTS_FILE}") if __name__ == "__main__": main()