#!/usr/bin/env python3 """ Kaizen Retro — Automated retrospective after every burn cycle. Reads overnight Gitea activity, fleet state, and loop logs. Generates ONE concrete improvement suggestion and posts it. Usage: python3 scripts/kaizen_retro.py [--dry-run] """ from __future__ import annotations import argparse import json import os import sys import urllib.error import urllib.request from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Optional # Ensure repo root is on path so we can import gitea_client REPO_ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(REPO_ROOT)) from gitea_client import GiteaClient, GiteaError # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- REPOS = [ "Timmy_Foundation/the-nexus", "Timmy_Foundation/timmy-config", "Timmy_Foundation/timmy-home", "Timmy_Foundation/the-door", "Timmy_Foundation/turboquant", "Timmy_Foundation/hermes-agent", "Timmy_Foundation/.profile", ] HERMES_HOME = Path.home() / ".hermes" TIMMY_HOME = Path.home() / ".timmy" WORKFORCE_STATE_PATH = HERMES_HOME / "workforce-state.json" FLEET_ROUTING_PATH = HERMES_HOME / "fleet-routing.json" CHANNEL_DIR_PATH = REPO_ROOT / "channel_directory.json" REPORTS_DIR = REPO_ROOT / "reports" MORNING_REPORT_REPO = "Timmy_Foundation/timmy-config" TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN") TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_HOME_CHANNEL", "-1003664764329") TELEGRAM_MAX_LEN = 4000 # leave headroom below the 4096 hard limit STALE_DAYS = 7 MAX_ATTEMPT_COMMENT_THRESHOLD = 5 ISSUE_TYPE_KEYWORDS = { "bug": ["bug", "fix", "crash", "error", "regression", "broken"], "feature": ["feature", "implement", "add", "support", "enable"], "docs": ["doc", "readme", "wiki", "guide", "documentation"], "kaizen": ["kaizen", "retro", "improvement", "continuous"], "devops": ["deploy", "ci", "cd", "docker", "server", "infra"], } BLOCKER_LABELS = {"blocked", "timeout", "stale", "help wanted", "wontfix", "duplicate"} # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def load_json(path: Path) -> Any: if not path.exists(): return None with open(path) as f: return json.load(f) def iso_day_ago(days: int = 1) -> str: return (datetime.now(timezone.utc) - timedelta(days=days)).isoformat() def classify_issue_type(issue: dict) -> str: title = (issue.get("title", "") or "").lower() body = (issue.get("body", "") or "").lower() labels = [l.get("name", "").lower() for l in issue.get("labels", []) or []] text = f"{title} {body} {' '.join(labels)}" words = set(text.split()) best = "other" best_score = 0 for kind, keywords in ISSUE_TYPE_KEYWORDS.items(): # Short keywords (<=3 chars) require whole-word match to avoid false positives like # "ci" inside "cleanup" or "cd" inside "abcde". score = sum( 1 for kw in keywords if (len(kw) <= 3 and kw in words) or (len(kw) > 3 and kw in text) ) # label match is stronger for label in labels: label_words = set(label.split()) if any( (len(kw) <= 3 and kw in label_words) or (len(kw) > 3 and kw in label) for kw in keywords ): score += 3 if score > best_score: best_score = score best = kind return best def is_max_attempts_candidate(issue: dict) -> bool: """Heuristic for issues that consumed excessive attempts.""" labels = {l.get("name", "").lower() for l in issue.get("labels", []) or []} if labels & BLOCKER_LABELS: return True if issue.get("comments", 0) >= MAX_ATTEMPT_COMMENT_THRESHOLD: return True created = issue.get("created_at") if created: try: created_dt = datetime.fromisoformat(created.replace("Z", "+00:00")) if datetime.now(timezone.utc) - created_dt > timedelta(days=STALE_DAYS): return True except Exception: pass return False def telegram_send(text: str, bot_token: str, chat_id: str) -> list[dict]: """Post text to Telegram, chunking if it exceeds the message limit.""" url = f"https://api.telegram.org/bot{bot_token}/sendMessage" chunks = [] if len(text) <= TELEGRAM_MAX_LEN: chunks = [text] else: # Split on newlines to preserve readability lines = text.splitlines(keepends=True) current = "" for line in lines: if len(current) + len(line) > TELEGRAM_MAX_LEN: if current: chunks.append(current) current = line else: current += line if current: chunks.append(current) results = [] for i, chunk in enumerate(chunks): prefix = f"*(part {i + 1}/{len(chunks)})*\n" if len(chunks) > 1 else "" payload = {"chat_id": chat_id, "text": prefix + chunk, "parse_mode": "Markdown"} data = json.dumps(payload).encode() req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}) with urllib.request.urlopen(req, timeout=30) as resp: results.append(json.loads(resp.read().decode())) return results def find_latest_morning_report_issue(client: GiteaClient) -> Optional[int]: try: issues = client.list_issues(MORNING_REPORT_REPO, state="open", sort="created", direction="desc", limit=20) for issue in issues: if "good morning report" in issue.title.lower() or "morning report" in issue.title.lower(): return issue.number # fallback to closed issues = client.list_issues(MORNING_REPORT_REPO, state="closed", sort="created", direction="desc", limit=20) for issue in issues: if "good morning report" in issue.title.lower() or "morning report" in issue.title.lower(): return issue.number except Exception: pass return None def fmt_pct(num: float, den: float) -> str: if den == 0: return "N/A" return f"{num/den:.0%}" # --------------------------------------------------------------------------- # Analysis # --------------------------------------------------------------------------- def gather_metrics(client: GiteaClient, since: str) -> dict: """Collect overnight metrics from Gitea.""" metrics = { "closed_issues": [], "merged_prs": [], "closed_prs": [], "open_issues": [], "max_attempts_issues": [], "by_agent": {}, "by_repo": {}, "by_type": {}, } for repo in REPOS: repo_short = repo.split("/")[1] metrics["by_repo"][repo_short] = { "closed": 0, "merged_prs": 0, "closed_prs": 0, "open": 0, "max_attempts": 0, "successes": 0, "failures": 0, } # Closed issues since window try: closed = client.list_issues(repo, state="closed", since=since, sort="updated", direction="desc", limit=100) for issue in closed: issue_dict = { "number": issue.number, "title": issue.title, "repo": repo_short, "type": classify_issue_type({"title": issue.title, "body": issue.body, "labels": [{"name": lb.name} for lb in issue.labels]}), "assignee": issue.assignees[0].login if issue.assignees else "unassigned", } metrics["closed_issues"].append(issue_dict) metrics["by_repo"][repo_short]["closed"] += 1 metrics["by_repo"][repo_short]["successes"] += 1 agent = issue_dict["assignee"] if agent not in metrics["by_agent"]: metrics["by_agent"][agent] = {"successes": 0, "failures": 0, "closed": 0, "repos": set()} metrics["by_agent"][agent]["successes"] += 1 metrics["by_agent"][agent]["closed"] += 1 metrics["by_agent"][agent]["repos"].add(repo_short) t = issue_dict["type"] if t not in metrics["by_type"]: metrics["by_type"][t] = {"successes": 0, "failures": 0, "total": 0} metrics["by_type"][t]["successes"] += 1 metrics["by_type"][t]["total"] += 1 except Exception as exc: print(f"Warning: could not load closed issues for {repo}: {exc}", file=sys.stderr) # Open issues (for stale / max-attempts detection) try: open_issues = client.list_issues(repo, state="open", sort="created", direction="desc", limit=100) metrics["by_repo"][repo_short]["open"] = len(open_issues) for issue in open_issues: issue_raw = { "number": issue.number, "title": issue.title, "labels": [{"name": lb.name} for lb in issue.labels], "comments": issue.comments, "created_at": issue.created_at, } if is_max_attempts_candidate(issue_raw): metrics["max_attempts_issues"].append({ "number": issue.number, "title": issue.title, "repo": repo_short, "type": classify_issue_type({"title": issue.title, "body": issue.body, "labels": issue_raw["labels"]}), "assignee": issue.assignees[0].login if issue.assignees else "unassigned", }) metrics["by_repo"][repo_short]["max_attempts"] += 1 metrics["by_repo"][repo_short]["failures"] += 1 agent = issue.assignees[0].login if issue.assignees else "unassigned" if agent not in metrics["by_agent"]: metrics["by_agent"][agent] = {"successes": 0, "failures": 0, "closed": 0, "repos": set()} metrics["by_agent"][agent]["failures"] += 1 metrics["by_agent"][agent]["repos"].add(repo_short) t = classify_issue_type({"title": issue.title, "body": issue.body, "labels": issue_raw["labels"]}) if t not in metrics["by_type"]: metrics["by_type"][t] = {"successes": 0, "failures": 0, "total": 0} metrics["by_type"][t]["failures"] += 1 metrics["by_type"][t]["total"] += 1 except Exception as exc: print(f"Warning: could not load open issues for {repo}: {exc}", file=sys.stderr) # PRs merged / closed since window (filter client-side; Gitea PR API ignores since) try: prs = client.list_pulls(repo, state="closed", sort="updated", limit=100) since_dt = datetime.fromisoformat(since.replace("Z", "+00:00")) for pr in prs: updated = pr.updated_at or pr.created_at or "" try: updated_dt = datetime.fromisoformat(updated.replace("Z", "+00:00")) if updated_dt < since_dt: continue except Exception: pass if pr.merged: metrics["merged_prs"].append({ "number": pr.number, "title": pr.title, "repo": repo_short, "user": pr.user.login if pr.user else "unknown", }) metrics["by_repo"][repo_short]["merged_prs"] += 1 else: metrics["closed_prs"].append({ "number": pr.number, "title": pr.title, "repo": repo_short, "user": pr.user.login if pr.user else "unknown", }) metrics["by_repo"][repo_short]["closed_prs"] += 1 except Exception as exc: print(f"Warning: could not load PRs for {repo}: {exc}", file=sys.stderr) # Convert sets to lists for JSON serialization for agent in metrics["by_agent"].values(): agent["repos"] = sorted(agent["repos"]) return metrics def load_workforce_state() -> dict: return load_json(WORKFORCE_STATE_PATH) or {} def load_fleet_routing() -> list[dict]: data = load_json(FLEET_ROUTING_PATH) if data and "agents" in data: return data["agents"] return [] def generate_suggestion(metrics: dict, fleet: list[dict]) -> str: """Generate ONE concrete improvement suggestion based on the data.""" by_agent = metrics["by_agent"] by_repo = metrics["by_repo"] by_type = metrics["by_type"] max_attempts = metrics["max_attempts_issues"] suggestions: list[str] = [] # 1. Agent with poor repo performance for agent, stats in by_agent.items(): total = stats["successes"] + stats["failures"] if total >= 3 and stats["successes"] == 0: repos = ", ".join(stats["repos"]) suggestions.append( f"🎯 **{agent}** has a 0% verify rate over the last cycle (0/{total}) on repos: {repos}. " f"Consider removing these repos from {agent}'s routing or providing targeted onboarding." ) # 2. Repo with highest failure concentration repo_failures = [(r, s) for r, s in by_repo.items() if s["failures"] > 0] if repo_failures: repo_failures.sort(key=lambda x: x[1]["failures"], reverse=True) worst_repo, worst_stats = repo_failures[0] total_repo = worst_stats["successes"] + worst_stats["failures"] if worst_stats["failures"] >= 2: suggestions.append( f"🎯 **{worst_repo}** has the most friction ({worst_stats['failures']} blocked/stale issues, " f"{fmt_pct(worst_stats['successes'], total_repo)} success). " f"Consider splitting issues in {worst_repo} into smaller chunks or assigning a stronger agent." ) # 3. Max-attempts pattern if len(max_attempts) >= 3: type_counts: dict[str, int] = {} for issue in max_attempts: type_counts[issue["type"]] = type_counts.get(issue["type"], 0) + 1 top_type = max(type_counts, key=type_counts.get) if type_counts else "unknown" suggestions.append( f"🎯 **{len(max_attempts)} issues** hit max-attempts or went stale. " f"The dominant type is **{top_type}**. " f"Consider adding acceptance criteria templates or pre-flight checklists for {top_type} issues." ) # 4. Issue type disparity for t, stats in by_type.items(): total = stats["total"] if total >= 3 and stats["successes"] == 0: suggestions.append( f"🎯 **{t}** issues have a 0% closure rate ({stats['failures']} stale). " f"Consider routing all {t} issues to a specialist agent or creating a dedicated playbook." ) # 5. Fleet routing gap (if fleet data exists) active_agents = {a["name"] for a in fleet if a.get("active")} assigned_agents = set(by_agent.keys()) idle_agents = active_agents - assigned_agents - {"unassigned"} if len(idle_agents) >= 2: suggestions.append( f"🎯 **{len(idle_agents)} active agents** have no assignments this cycle: {', '.join(idle_agents)}. " f"Consider expanding their repo lists or investigating why they aren't receiving work." ) if suggestions: return suggestions[0] # Fallback: celebrate or nudge total_closed = len(metrics["closed_issues"]) total_merged = len(metrics["merged_prs"]) if total_closed >= 5 or total_merged >= 3: return ( f"🎯 Strong cycle: {total_closed} issues closed, {total_merged} PRs merged. " f"Next improvement: write down the top 3 patterns that made this cycle successful so we can replicate them." ) return ( "🎯 Low activity this cycle. Next improvement: ensure at least one agent loop is actively polling " "for unassigned issues so work doesn't sit idle." ) def build_report(metrics: dict, suggestion: str, since: str) -> str: now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") period = since[:10] lines = [ f"# 🌀 Kaizen Retro — {now}", f"*Period: {period} → now*\n", "## Numbers", f"- **Issues closed:** {len(metrics['closed_issues'])}", f"- **PRs merged:** {len(metrics['merged_prs'])}", f"- **PRs closed without merge:** {len(metrics['closed_prs'])}", f"- **Max-attempts / stale issues:** {len(metrics['max_attempts_issues'])}", "", "## By Agent", ] for agent, stats in sorted(metrics["by_agent"].items(), key=lambda x: x[1]["successes"] + x[1]["failures"], reverse=True): total = stats["successes"] + stats["failures"] rate = fmt_pct(stats["successes"], total) lines.append(f"- **{agent}**: {stats['successes']} closed, {stats['failures']} stale / max-attempts — verify rate {rate}") lines.extend(["", "## By Repo"]) for repo, stats in sorted(metrics["by_repo"].items(), key=lambda x: x[1]["successes"] + x[1]["failures"], reverse=True): total = stats["successes"] + stats["failures"] if total == 0 and stats["open"] == 0: continue rate = fmt_pct(stats["successes"], total) lines.append( f"- **{repo}**: {stats['successes']} closed, {stats['failures']} stale, {stats['open']} open — verify rate {rate}" ) lines.extend(["", "## By Issue Type"]) for t, stats in sorted(metrics["by_type"].items(), key=lambda x: x[1]["total"], reverse=True): total = stats["total"] rate = fmt_pct(stats["successes"], total) lines.append(f"- **{t}**: {stats['successes']} closed, {stats['failures']} stale — verify rate {rate}") if metrics["max_attempts_issues"]: lines.extend(["", "## Max-Attempts / Stale Issues"]) for issue in metrics["max_attempts_issues"][:10]: lines.append(f"- {issue['repo']}#{issue['number']} ({issue['type']}, assignee: {issue['assignee']}) — {issue['title']}") if len(metrics["max_attempts_issues"]) > 10: lines.append(f"- … and {len(metrics['max_attempts_issues']) - 10} more") lines.extend(["", "## One Concrete Improvement", suggestion, ""]) return "\n".join(lines) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main() -> int: parser = argparse.ArgumentParser(description="Kaizen Retro — automated burn-cycle retrospective") parser.add_argument("--dry-run", action="store_true", help="Print report but do not post") parser.add_argument("--since", type=str, help="ISO timestamp for lookback window (default: 24h ago)") parser.add_argument("--post-to", type=str, help="Override Telegram chat ID") args = parser.parse_args() since = args.since or iso_day_ago(1) client = GiteaClient() print("Gathering metrics since", since) metrics = gather_metrics(client, since) fleet = load_fleet_routing() suggestion = generate_suggestion(metrics, fleet) report = build_report(metrics, suggestion, since) print(report) # Save JSON snapshot REPORTS_DIR.mkdir(parents=True, exist_ok=True) snapshot_path = REPORTS_DIR / f"kaizen-retro-{datetime.now(timezone.utc).strftime('%Y%m%d')}.json" snapshot = { "generated_at": datetime.now(timezone.utc).isoformat(), "since": since, "metrics": metrics, "suggestion": suggestion, "report_markdown": report, } with open(snapshot_path, "w") as f: json.dump(snapshot, f, indent=2) print(f"\nSnapshot saved to {snapshot_path}") if args.dry_run: return 0 # Post to Telegram chat_id = args.post_to or TELEGRAM_CHAT_ID bot_token = TELEGRAM_BOT_TOKEN if bot_token and chat_id: try: telegram_send(report, bot_token, chat_id) print("Posted to Telegram.") except Exception as exc: print(f"Failed to post to Telegram: {exc}", file=sys.stderr) else: print("Telegram not configured (set TELEGRAM_BOT_TOKEN and TELEGRAM_HOME_CHANNEL).", file=sys.stderr) # Comment on latest morning report issue morning_issue = find_latest_morning_report_issue(client) if morning_issue: try: client.create_comment(MORNING_REPORT_REPO, morning_issue, report) print(f"Commented on morning report issue #{morning_issue}.") except Exception as exc: print(f"Failed to comment on morning report issue: {exc}", file=sys.stderr) else: print("No morning report issue found to comment on.", file=sys.stderr) return 0 if __name__ == "__main__": sys.exit(main())