Files
timmy-config/scripts/kaizen_retro.py
Ezra 2e64b160b5 [KAIZEN] Harden retro scheduling, chunking, and tests (#349)
- Add Kaizen Retro to cron/jobs.json with explicit local model/provider
- Add Telegram message chunking for reports approaching the 4096-char limit
- Fix classify_issue_type false positives on short substrings (ci in cleanup)
- Add 28 unit tests covering classification, max-attempts detection,
  suggestion generation, report formatting, and Telegram chunking
2026-04-07 15:58:58 +00:00

527 lines
21 KiB
Python

#!/usr/bin/env python3
"""
Kaizen Retro — Automated retrospective after every burn cycle.
Reads overnight Gitea activity, fleet state, and loop logs.
Generates ONE concrete improvement suggestion and posts it.
Usage:
python3 scripts/kaizen_retro.py [--dry-run]
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import urllib.error
import urllib.request
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Optional
# Ensure repo root is on path so we can import gitea_client
REPO_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(REPO_ROOT))
from gitea_client import GiteaClient, GiteaError
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
REPOS = [
"Timmy_Foundation/the-nexus",
"Timmy_Foundation/timmy-config",
"Timmy_Foundation/timmy-home",
"Timmy_Foundation/the-door",
"Timmy_Foundation/turboquant",
"Timmy_Foundation/hermes-agent",
"Timmy_Foundation/.profile",
]
HERMES_HOME = Path.home() / ".hermes"
TIMMY_HOME = Path.home() / ".timmy"
WORKFORCE_STATE_PATH = HERMES_HOME / "workforce-state.json"
FLEET_ROUTING_PATH = HERMES_HOME / "fleet-routing.json"
CHANNEL_DIR_PATH = REPO_ROOT / "channel_directory.json"
REPORTS_DIR = REPO_ROOT / "reports"
MORNING_REPORT_REPO = "Timmy_Foundation/timmy-config"
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN")
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_HOME_CHANNEL", "-1003664764329")
TELEGRAM_MAX_LEN = 4000 # leave headroom below the 4096 hard limit
STALE_DAYS = 7
MAX_ATTEMPT_COMMENT_THRESHOLD = 5
ISSUE_TYPE_KEYWORDS = {
"bug": ["bug", "fix", "crash", "error", "regression", "broken"],
"feature": ["feature", "implement", "add", "support", "enable"],
"docs": ["doc", "readme", "wiki", "guide", "documentation"],
"kaizen": ["kaizen", "retro", "improvement", "continuous"],
"devops": ["deploy", "ci", "cd", "docker", "server", "infra"],
}
BLOCKER_LABELS = {"blocked", "timeout", "stale", "help wanted", "wontfix", "duplicate"}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def load_json(path: Path) -> Any:
if not path.exists():
return None
with open(path) as f:
return json.load(f)
def iso_day_ago(days: int = 1) -> str:
return (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
def classify_issue_type(issue: dict) -> str:
title = (issue.get("title", "") or "").lower()
body = (issue.get("body", "") or "").lower()
labels = [l.get("name", "").lower() for l in issue.get("labels", []) or []]
text = f"{title} {body} {' '.join(labels)}"
words = set(text.split())
best = "other"
best_score = 0
for kind, keywords in ISSUE_TYPE_KEYWORDS.items():
# Short keywords (<=3 chars) require whole-word match to avoid false positives like
# "ci" inside "cleanup" or "cd" inside "abcde".
score = sum(
1 for kw in keywords
if (len(kw) <= 3 and kw in words) or (len(kw) > 3 and kw in text)
)
# label match is stronger
for label in labels:
label_words = set(label.split())
if any(
(len(kw) <= 3 and kw in label_words) or (len(kw) > 3 and kw in label)
for kw in keywords
):
score += 3
if score > best_score:
best_score = score
best = kind
return best
def is_max_attempts_candidate(issue: dict) -> bool:
"""Heuristic for issues that consumed excessive attempts."""
labels = {l.get("name", "").lower() for l in issue.get("labels", []) or []}
if labels & BLOCKER_LABELS:
return True
if issue.get("comments", 0) >= MAX_ATTEMPT_COMMENT_THRESHOLD:
return True
created = issue.get("created_at")
if created:
try:
created_dt = datetime.fromisoformat(created.replace("Z", "+00:00"))
if datetime.now(timezone.utc) - created_dt > timedelta(days=STALE_DAYS):
return True
except Exception:
pass
return False
def telegram_send(text: str, bot_token: str, chat_id: str) -> list[dict]:
"""Post text to Telegram, chunking if it exceeds the message limit."""
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
chunks = []
if len(text) <= TELEGRAM_MAX_LEN:
chunks = [text]
else:
# Split on newlines to preserve readability
lines = text.splitlines(keepends=True)
current = ""
for line in lines:
if len(current) + len(line) > TELEGRAM_MAX_LEN:
if current:
chunks.append(current)
current = line
else:
current += line
if current:
chunks.append(current)
results = []
for i, chunk in enumerate(chunks):
prefix = f"*(part {i + 1}/{len(chunks)})*\n" if len(chunks) > 1 else ""
payload = {"chat_id": chat_id, "text": prefix + chunk, "parse_mode": "Markdown"}
data = json.dumps(payload).encode()
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"})
with urllib.request.urlopen(req, timeout=30) as resp:
results.append(json.loads(resp.read().decode()))
return results
def find_latest_morning_report_issue(client: GiteaClient) -> Optional[int]:
try:
issues = client.list_issues(MORNING_REPORT_REPO, state="open", sort="created", direction="desc", limit=20)
for issue in issues:
if "good morning report" in issue.title.lower() or "morning report" in issue.title.lower():
return issue.number
# fallback to closed
issues = client.list_issues(MORNING_REPORT_REPO, state="closed", sort="created", direction="desc", limit=20)
for issue in issues:
if "good morning report" in issue.title.lower() or "morning report" in issue.title.lower():
return issue.number
except Exception:
pass
return None
def fmt_pct(num: float, den: float) -> str:
if den == 0:
return "N/A"
return f"{num/den:.0%}"
# ---------------------------------------------------------------------------
# Analysis
# ---------------------------------------------------------------------------
def gather_metrics(client: GiteaClient, since: str) -> dict:
"""Collect overnight metrics from Gitea."""
metrics = {
"closed_issues": [],
"merged_prs": [],
"closed_prs": [],
"open_issues": [],
"max_attempts_issues": [],
"by_agent": {},
"by_repo": {},
"by_type": {},
}
for repo in REPOS:
repo_short = repo.split("/")[1]
metrics["by_repo"][repo_short] = {
"closed": 0,
"merged_prs": 0,
"closed_prs": 0,
"open": 0,
"max_attempts": 0,
"successes": 0,
"failures": 0,
}
# Closed issues since window
try:
closed = client.list_issues(repo, state="closed", since=since, sort="updated", direction="desc", limit=100)
for issue in closed:
issue_dict = {
"number": issue.number,
"title": issue.title,
"repo": repo_short,
"type": classify_issue_type({"title": issue.title, "body": issue.body, "labels": [{"name": lb.name} for lb in issue.labels]}),
"assignee": issue.assignees[0].login if issue.assignees else "unassigned",
}
metrics["closed_issues"].append(issue_dict)
metrics["by_repo"][repo_short]["closed"] += 1
metrics["by_repo"][repo_short]["successes"] += 1
agent = issue_dict["assignee"]
if agent not in metrics["by_agent"]:
metrics["by_agent"][agent] = {"successes": 0, "failures": 0, "closed": 0, "repos": set()}
metrics["by_agent"][agent]["successes"] += 1
metrics["by_agent"][agent]["closed"] += 1
metrics["by_agent"][agent]["repos"].add(repo_short)
t = issue_dict["type"]
if t not in metrics["by_type"]:
metrics["by_type"][t] = {"successes": 0, "failures": 0, "total": 0}
metrics["by_type"][t]["successes"] += 1
metrics["by_type"][t]["total"] += 1
except Exception as exc:
print(f"Warning: could not load closed issues for {repo}: {exc}", file=sys.stderr)
# Open issues (for stale / max-attempts detection)
try:
open_issues = client.list_issues(repo, state="open", sort="created", direction="desc", limit=100)
metrics["by_repo"][repo_short]["open"] = len(open_issues)
for issue in open_issues:
issue_raw = {
"number": issue.number,
"title": issue.title,
"labels": [{"name": lb.name} for lb in issue.labels],
"comments": issue.comments,
"created_at": issue.created_at,
}
if is_max_attempts_candidate(issue_raw):
metrics["max_attempts_issues"].append({
"number": issue.number,
"title": issue.title,
"repo": repo_short,
"type": classify_issue_type({"title": issue.title, "body": issue.body, "labels": issue_raw["labels"]}),
"assignee": issue.assignees[0].login if issue.assignees else "unassigned",
})
metrics["by_repo"][repo_short]["max_attempts"] += 1
metrics["by_repo"][repo_short]["failures"] += 1
agent = issue.assignees[0].login if issue.assignees else "unassigned"
if agent not in metrics["by_agent"]:
metrics["by_agent"][agent] = {"successes": 0, "failures": 0, "closed": 0, "repos": set()}
metrics["by_agent"][agent]["failures"] += 1
metrics["by_agent"][agent]["repos"].add(repo_short)
t = classify_issue_type({"title": issue.title, "body": issue.body, "labels": issue_raw["labels"]})
if t not in metrics["by_type"]:
metrics["by_type"][t] = {"successes": 0, "failures": 0, "total": 0}
metrics["by_type"][t]["failures"] += 1
metrics["by_type"][t]["total"] += 1
except Exception as exc:
print(f"Warning: could not load open issues for {repo}: {exc}", file=sys.stderr)
# PRs merged / closed since window (filter client-side; Gitea PR API ignores since)
try:
prs = client.list_pulls(repo, state="closed", sort="updated", limit=100)
since_dt = datetime.fromisoformat(since.replace("Z", "+00:00"))
for pr in prs:
updated = pr.updated_at or pr.created_at or ""
try:
updated_dt = datetime.fromisoformat(updated.replace("Z", "+00:00"))
if updated_dt < since_dt:
continue
except Exception:
pass
if pr.merged:
metrics["merged_prs"].append({
"number": pr.number,
"title": pr.title,
"repo": repo_short,
"user": pr.user.login if pr.user else "unknown",
})
metrics["by_repo"][repo_short]["merged_prs"] += 1
else:
metrics["closed_prs"].append({
"number": pr.number,
"title": pr.title,
"repo": repo_short,
"user": pr.user.login if pr.user else "unknown",
})
metrics["by_repo"][repo_short]["closed_prs"] += 1
except Exception as exc:
print(f"Warning: could not load PRs for {repo}: {exc}", file=sys.stderr)
# Convert sets to lists for JSON serialization
for agent in metrics["by_agent"].values():
agent["repos"] = sorted(agent["repos"])
return metrics
def load_workforce_state() -> dict:
return load_json(WORKFORCE_STATE_PATH) or {}
def load_fleet_routing() -> list[dict]:
data = load_json(FLEET_ROUTING_PATH)
if data and "agents" in data:
return data["agents"]
return []
def generate_suggestion(metrics: dict, fleet: list[dict]) -> str:
"""Generate ONE concrete improvement suggestion based on the data."""
by_agent = metrics["by_agent"]
by_repo = metrics["by_repo"]
by_type = metrics["by_type"]
max_attempts = metrics["max_attempts_issues"]
suggestions: list[str] = []
# 1. Agent with poor repo performance
for agent, stats in by_agent.items():
total = stats["successes"] + stats["failures"]
if total >= 3 and stats["successes"] == 0:
repos = ", ".join(stats["repos"])
suggestions.append(
f"🎯 **{agent}** has a 0% verify rate over the last cycle (0/{total}) on repos: {repos}. "
f"Consider removing these repos from {agent}'s routing or providing targeted onboarding."
)
# 2. Repo with highest failure concentration
repo_failures = [(r, s) for r, s in by_repo.items() if s["failures"] > 0]
if repo_failures:
repo_failures.sort(key=lambda x: x[1]["failures"], reverse=True)
worst_repo, worst_stats = repo_failures[0]
total_repo = worst_stats["successes"] + worst_stats["failures"]
if worst_stats["failures"] >= 2:
suggestions.append(
f"🎯 **{worst_repo}** has the most friction ({worst_stats['failures']} blocked/stale issues, "
f"{fmt_pct(worst_stats['successes'], total_repo)} success). "
f"Consider splitting issues in {worst_repo} into smaller chunks or assigning a stronger agent."
)
# 3. Max-attempts pattern
if len(max_attempts) >= 3:
type_counts: dict[str, int] = {}
for issue in max_attempts:
type_counts[issue["type"]] = type_counts.get(issue["type"], 0) + 1
top_type = max(type_counts, key=type_counts.get) if type_counts else "unknown"
suggestions.append(
f"🎯 **{len(max_attempts)} issues** hit max-attempts or went stale. "
f"The dominant type is **{top_type}**. "
f"Consider adding acceptance criteria templates or pre-flight checklists for {top_type} issues."
)
# 4. Issue type disparity
for t, stats in by_type.items():
total = stats["total"]
if total >= 3 and stats["successes"] == 0:
suggestions.append(
f"🎯 **{t}** issues have a 0% closure rate ({stats['failures']} stale). "
f"Consider routing all {t} issues to a specialist agent or creating a dedicated playbook."
)
# 5. Fleet routing gap (if fleet data exists)
active_agents = {a["name"] for a in fleet if a.get("active")}
assigned_agents = set(by_agent.keys())
idle_agents = active_agents - assigned_agents - {"unassigned"}
if len(idle_agents) >= 2:
suggestions.append(
f"🎯 **{len(idle_agents)} active agents** have no assignments this cycle: {', '.join(idle_agents)}. "
f"Consider expanding their repo lists or investigating why they aren't receiving work."
)
if suggestions:
return suggestions[0]
# Fallback: celebrate or nudge
total_closed = len(metrics["closed_issues"])
total_merged = len(metrics["merged_prs"])
if total_closed >= 5 or total_merged >= 3:
return (
f"🎯 Strong cycle: {total_closed} issues closed, {total_merged} PRs merged. "
f"Next improvement: write down the top 3 patterns that made this cycle successful so we can replicate them."
)
return (
"🎯 Low activity this cycle. Next improvement: ensure at least one agent loop is actively polling "
"for unassigned issues so work doesn't sit idle."
)
def build_report(metrics: dict, suggestion: str, since: str) -> str:
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
period = since[:10]
lines = [
f"# 🌀 Kaizen Retro — {now}",
f"*Period: {period} → now*\n",
"## Numbers",
f"- **Issues closed:** {len(metrics['closed_issues'])}",
f"- **PRs merged:** {len(metrics['merged_prs'])}",
f"- **PRs closed without merge:** {len(metrics['closed_prs'])}",
f"- **Max-attempts / stale issues:** {len(metrics['max_attempts_issues'])}",
"",
"## By Agent",
]
for agent, stats in sorted(metrics["by_agent"].items(), key=lambda x: x[1]["successes"] + x[1]["failures"], reverse=True):
total = stats["successes"] + stats["failures"]
rate = fmt_pct(stats["successes"], total)
lines.append(f"- **{agent}**: {stats['successes']} closed, {stats['failures']} stale / max-attempts — verify rate {rate}")
lines.extend(["", "## By Repo"])
for repo, stats in sorted(metrics["by_repo"].items(), key=lambda x: x[1]["successes"] + x[1]["failures"], reverse=True):
total = stats["successes"] + stats["failures"]
if total == 0 and stats["open"] == 0:
continue
rate = fmt_pct(stats["successes"], total)
lines.append(
f"- **{repo}**: {stats['successes']} closed, {stats['failures']} stale, {stats['open']} open — verify rate {rate}"
)
lines.extend(["", "## By Issue Type"])
for t, stats in sorted(metrics["by_type"].items(), key=lambda x: x[1]["total"], reverse=True):
total = stats["total"]
rate = fmt_pct(stats["successes"], total)
lines.append(f"- **{t}**: {stats['successes']} closed, {stats['failures']} stale — verify rate {rate}")
if metrics["max_attempts_issues"]:
lines.extend(["", "## Max-Attempts / Stale Issues"])
for issue in metrics["max_attempts_issues"][:10]:
lines.append(f"- {issue['repo']}#{issue['number']} ({issue['type']}, assignee: {issue['assignee']}) — {issue['title']}")
if len(metrics["max_attempts_issues"]) > 10:
lines.append(f"- … and {len(metrics['max_attempts_issues']) - 10} more")
lines.extend(["", "## One Concrete Improvement", suggestion, ""])
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> int:
parser = argparse.ArgumentParser(description="Kaizen Retro — automated burn-cycle retrospective")
parser.add_argument("--dry-run", action="store_true", help="Print report but do not post")
parser.add_argument("--since", type=str, help="ISO timestamp for lookback window (default: 24h ago)")
parser.add_argument("--post-to", type=str, help="Override Telegram chat ID")
args = parser.parse_args()
since = args.since or iso_day_ago(1)
client = GiteaClient()
print("Gathering metrics since", since)
metrics = gather_metrics(client, since)
fleet = load_fleet_routing()
suggestion = generate_suggestion(metrics, fleet)
report = build_report(metrics, suggestion, since)
print(report)
# Save JSON snapshot
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
snapshot_path = REPORTS_DIR / f"kaizen-retro-{datetime.now(timezone.utc).strftime('%Y%m%d')}.json"
snapshot = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"since": since,
"metrics": metrics,
"suggestion": suggestion,
"report_markdown": report,
}
with open(snapshot_path, "w") as f:
json.dump(snapshot, f, indent=2)
print(f"\nSnapshot saved to {snapshot_path}")
if args.dry_run:
return 0
# Post to Telegram
chat_id = args.post_to or TELEGRAM_CHAT_ID
bot_token = TELEGRAM_BOT_TOKEN
if bot_token and chat_id:
try:
telegram_send(report, bot_token, chat_id)
print("Posted to Telegram.")
except Exception as exc:
print(f"Failed to post to Telegram: {exc}", file=sys.stderr)
else:
print("Telegram not configured (set TELEGRAM_BOT_TOKEN and TELEGRAM_HOME_CHANNEL).", file=sys.stderr)
# Comment on latest morning report issue
morning_issue = find_latest_morning_report_issue(client)
if morning_issue:
try:
client.create_comment(MORNING_REPORT_REPO, morning_issue, report)
print(f"Commented on morning report issue #{morning_issue}.")
except Exception as exc:
print(f"Failed to comment on morning report issue: {exc}", file=sys.stderr)
else:
print("No morning report issue found to comment on.", file=sys.stderr)
return 0
if __name__ == "__main__":
sys.exit(main())