#!/usr/bin/env python3 """ muda_audit.py — Weekly waste audit for the Timmy Foundation fleet. Measures 7 wastes (Muda) across Gitea repos and agent logs: 1. Overproduction — issues created vs closed (ratio > 1.0 = waste) 2. Waiting — rate-limit hits from agent logs 3. Transport — issues closed with redirect keywords 4. Overprocessing — PR diff size outliers (>500 lines) 5. Inventory — open issues stale >30 days 6. Motion — git clone/rebase churn from logs 7. Defects — PRs closed without merge vs merged Outputs JSON report, persists week-over-week metrics, and optionally posts to Telegram. Part of Epic #345, Issue #350. """ from __future__ import annotations import glob import json import os import sys import urllib.request from collections import defaultdict from datetime import datetime, timedelta, timezone from pathlib import Path # Add repo root to path so we can import gitea_client REPO_ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(REPO_ROOT)) from gitea_client import GiteaClient, GiteaError # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- ORG = "Timmy_Foundation" REPOS = [ "the-nexus", ".profile", "timmy-config", "timmy-home", "the-door", "turboquant", "hermes-agent", "timmy-academy", "wolf", "the-testament", "the-beacon", ] AGENT_LOG_PATHS = [ "/root/wizards/*/home/logs/*.log", "/root/wizards/*/logs/*.log", "/root/wizards/*/.hermes/logs/*.log", ] REDIRECT_KEYWORDS = [ "moved to", "belongs in", "redirected to", "closing in favor of", "wrong repo", "should be in", "transfer to", "repost to", ] TELEGRAM_CHAT = "-1003664764329" TELEGRAM_TOKEN_PATHS = [ Path.home() / ".config" / "telegram" / "special_bot", Path.home() / ".hermes" / "telegram_bot_token", ] METRICS_DIR = Path.home() / ".local" / "timmy" / "muda-audit" METRICS_FILE = METRICS_DIR / "metrics.json" DAYS_BACK = 7 STALE_DAYS = 30 OVERPROCESSING_THRESHOLD = 500 # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def now_utc() -> datetime: return datetime.now(timezone.utc) def parse_iso(ts: str) -> datetime: if ts.endswith("Z"): ts = ts[:-1] + "+00:00" return datetime.fromisoformat(ts) def within_days(ts: str, days: int) -> bool: try: return (now_utc() - parse_iso(ts)) <= timedelta(days=days) except Exception: return False def older_than_days(ts: str, days: int) -> bool: try: return (now_utc() - parse_iso(ts)) >= timedelta(days=days) except Exception: return False def paginate_issues(client: GiteaClient, repo: str, state: str, limit_per_page: int = 50, max_pages: int = 4): """Yield all issues across pages.""" full_repo = f"{ORG}/{repo}" for page in range(1, max_pages + 1): batch = client.list_issues(full_repo, state=state, limit=limit_per_page, page=page, sort="created", direction="desc") if not batch: break for issue in batch: yield issue if len(batch) < limit_per_page: break def paginate_prs(client: GiteaClient, repo: str, state: str, limit_per_page: int = 50, max_pages: int = 3): """Yield all PRs across pages.""" full_repo = f"{ORG}/{repo}" for page in range(1, max_pages + 1): batch = client.list_pulls(full_repo, state=state, limit=limit_per_page, page=page, sort="newest") if not batch: break for pr in batch: yield pr if len(batch) < limit_per_page: break def read_telegram_token() -> str | None: for path in TELEGRAM_TOKEN_PATHS: if path.exists(): return path.read_text().strip() return os.environ.get("TELEGRAM_BOT_TOKEN") or None def send_telegram(message: str) -> bool: token = read_telegram_token() if not token: print("[WARN] No Telegram token found; skipping notification.") return False url = f"https://api.telegram.org/bot{token}/sendMessage" payload = json.dumps({ "chat_id": TELEGRAM_CHAT, "text": message, "parse_mode": "Markdown", "disable_web_page_preview": True, }).encode() req = urllib.request.Request(url, data=payload, method="POST", headers={"Content-Type": "application/json"}) try: with urllib.request.urlopen(req, timeout=15) as resp: return resp.status == 200 except Exception as e: print(f"[WARN] Telegram send failed: {e}") return False def find_log_files() -> list[Path]: files = [] for pattern in AGENT_LOG_PATHS: for p in glob.glob(pattern): path = Path(p) try: if path.stat().st_size > 0: files.append(path) except OSError: pass return files def grep_logs(pattern: str, files: list[Path]) -> dict[str, int]: """Return count of matches per agent (derived from path).""" counts: dict[str, int] = defaultdict(int) for f in files: parts = f.parts try: idx = parts.index("wizards") agent = parts[idx + 1] except (ValueError, IndexError): agent = "unknown" try: with open(f, "r", errors="ignore") as fh: for line in fh: if pattern in line.lower(): counts[agent] += 1 except Exception: pass return dict(counts) def summarize_counts(counts: dict[str, int]) -> str: if not counts: return "none detected" items = sorted(counts.items(), key=lambda x: -x[1]) return ", ".join(f"{k}: {v}" for k, v in items[:5]) # --------------------------------------------------------------------------- # Week-over-week persistence # --------------------------------------------------------------------------- def load_previous_metrics() -> dict | None: if not METRICS_FILE.exists(): return None try: history = json.loads(METRICS_FILE.read_text()) if history and isinstance(history, list): return history[-1] except (json.JSONDecodeError, OSError): pass return None def save_metrics(record: dict) -> None: METRICS_DIR.mkdir(parents=True, exist_ok=True) history: list[dict] = [] if METRICS_FILE.exists(): try: history = json.loads(METRICS_FILE.read_text()) if not isinstance(history, list): history = [] except (json.JSONDecodeError, OSError): history = [] history.append(record) history = history[-52:] # keep one year of weekly reports METRICS_FILE.write_text(json.dumps(history, indent=2)) def trend_arrow(current: float, previous: float) -> str: if previous == 0: return "" if current < previous: return " ↓" if current > previous: return " ↑" return " →" # --------------------------------------------------------------------------- # Waste metrics # --------------------------------------------------------------------------- def measure_overproduction(client: GiteaClient) -> dict: created = 0 closed = 0 created_by_repo: dict[str, int] = defaultdict(int) closed_by_repo: dict[str, int] = defaultdict(int) for repo in REPOS: try: for issue in paginate_issues(client, repo, state="all", max_pages=3): if within_days(issue.created_at, DAYS_BACK): created += 1 created_by_repo[repo] += 1 if issue.state == "closed" and within_days(issue.updated_at, DAYS_BACK): closed += 1 closed_by_repo[repo] += 1 except GiteaError as e: print(f"[WARN] Overproduction fetch failed for {repo}: {e}") ratio = round(created / closed, 2) if closed > 0 else (created if created > 0 else 0.0) return { "waste": "Overproduction", "created": created, "closed": closed, "ratio": ratio, "top_repo": max(created_by_repo, key=created_by_repo.get) if created_by_repo else None, "healthy": ratio <= 1.0, } def measure_waiting(_client: GiteaClient) -> dict: files = find_log_files() patterns = ["rate limit", "ratelimit", "429", "too many requests"] total_by_agent: dict[str, int] = defaultdict(int) for pat in patterns: counts = grep_logs(pat, files) for agent, cnt in counts.items(): total_by_agent[agent] += cnt total_hits = sum(total_by_agent.values()) return { "waste": "Waiting", "rate_limit_hits": dict(total_by_agent), "total_hits": total_hits, "log_files_scanned": len(files), "healthy": total_hits == 0, } def measure_transport(client: GiteaClient) -> dict: redirected = 0 examples: list[str] = [] for repo in REPOS: checked = 0 try: for issue in paginate_issues(client, repo, state="closed", max_pages=2): if not within_days(issue.updated_at, DAYS_BACK): continue checked += 1 if checked > 20: break text = (issue.body or "").lower() if any(kw in text for kw in REDIRECT_KEYWORDS): redirected += 1 examples.append(f"{repo}#{issue.number}") continue try: comments = client.list_comments(f"{ORG}/{repo}", issue.number) for c in comments: if any(kw in (c.body or "").lower() for kw in REDIRECT_KEYWORDS): redirected += 1 examples.append(f"{repo}#{issue.number}") break except GiteaError: pass except GiteaError as e: print(f"[WARN] Transport fetch failed for {repo}: {e}") return { "waste": "Transport", "redirected_issues": redirected, "examples": examples[:5], "healthy": redirected == 0, } def measure_overprocessing(client: GiteaClient) -> dict: pr_details: list[dict] = [] flagged: list[str] = [] total_lines = 0 for repo in REPOS: try: scanned = 0 for pr in paginate_prs(client, repo, state="all", max_pages=2): if not within_days(pr.created_at or "", DAYS_BACK): continue scanned += 1 if scanned > 10: break full_repo = f"{ORG}/{repo}" try: files = client.get_pull_files(full_repo, pr.number) except GiteaError: files = [] lines = sum(f.additions + f.deletions for f in files) total_lines += lines pr_details.append({ "repo": repo, "pr": pr.number, "title": pr.title, "lines": lines, }) is_epic = "epic" in (pr.title or "").lower() if lines > OVERPROCESSING_THRESHOLD and not is_epic: flagged.append(f"{repo}#{pr.number} ({lines} lines)") except GiteaError as e: print(f"[WARN] Overprocessing fetch failed for {repo}: {e}") avg_lines = round(total_lines / len(pr_details), 1) if pr_details else 0.0 return { "waste": "Overprocessing", "prs_scanned": len(pr_details), "avg_lines_changed": avg_lines, "flagged_outliers": flagged, "healthy": len(flagged) == 0, } def measure_inventory(client: GiteaClient) -> dict: stale = 0 by_repo: dict[str, int] = defaultdict(int) for repo in REPOS: try: for issue in paginate_issues(client, repo, state="open", max_pages=4): if older_than_days(issue.updated_at, STALE_DAYS): stale += 1 by_repo[repo] += 1 except GiteaError as e: print(f"[WARN] Inventory fetch failed for {repo}: {e}") top_repo = max(by_repo, key=by_repo.get) if by_repo else None return { "waste": "Inventory", "stale_issues": stale, "by_repo": dict(by_repo), "top_repo": top_repo, "healthy": stale == 0, } def measure_motion(_client: GiteaClient) -> dict: files = find_log_files() clone_counts = grep_logs("git clone", files) rebase_counts = grep_logs("git rebase", files) fetch_counts = grep_logs("git fetch", files) total_motion = sum(clone_counts.values()) + sum(rebase_counts.values()) + sum(fetch_counts.values()) return { "waste": "Motion", "git_clones": clone_counts, "git_rebases": rebase_counts, "git_fetches": fetch_counts, "total_motion_events": total_motion, "log_files_scanned": len(files), "healthy": total_motion < 50, } def measure_defects(client: GiteaClient) -> dict: merged = 0 closed_without_merge = 0 for repo in REPOS: try: for pr in paginate_prs(client, repo, state="closed", max_pages=2): if not within_days(pr.created_at or "", DAYS_BACK): continue if pr.merged: merged += 1 else: closed_without_merge += 1 except GiteaError as e: print(f"[WARN] Defects fetch failed for {repo}: {e}") total = merged + closed_without_merge close_rate = round(closed_without_merge / total, 2) if total > 0 else 0.0 return { "waste": "Defects", "merged": merged, "closed_without_merge": closed_without_merge, "close_rate": close_rate, "healthy": close_rate < 0.25, } # --------------------------------------------------------------------------- # Report generation # --------------------------------------------------------------------------- SUGGESTIONS = { "Overproduction": "Pause issue-generation loops until backlog shrinks. Review auto-issue bots.", "Waiting": "Add exponential backoff to API clients. Reduce loop frequency for rate-limited agents.", "Transport": "Enforce repo-boundary check before issue creation. Close with redirect template.", "Overprocessing": "Scope tickets tighter. Flag >500-line PRs for pre-review split.", "Inventory": "Bulk-close or consolidate stale issues. Set 30-day auto-close for untouched items.", "Motion": "Cache workspace directories across issues. Limit clones to 1 per issue branch.", "Defects": "Require smoke tests before PR. Rebase before merge to reduce conflict closures.", } def compute_top_eliminations(metrics: list[dict]) -> list[str]: """Pick the top 3 unhealthiest wastes and return concrete suggestions.""" unhealthies = [m for m in metrics if not m.get("healthy", True)] # Sort by severity heuristic def severity(m: dict) -> float: if m["waste"] == "Overproduction": return m.get("ratio", 0) if m["waste"] == "Waiting": return m.get("total_hits", 0) / 10 if m["waste"] == "Transport": return m.get("redirected_issues", 0) if m["waste"] == "Overprocessing": return len(m.get("flagged_outliers", [])) if m["waste"] == "Inventory": return m.get("stale_issues", 0) / 10 if m["waste"] == "Motion": return m.get("total_motion_events", 0) / 20 if m["waste"] == "Defects": return m.get("close_rate", 0) * 10 return 0.0 unhealthies.sort(key=severity, reverse=True) suggestions = [] for m in unhealthies[:3]: suggestions.append(SUGGESTIONS.get(m["waste"], "Review and reduce.")) if not suggestions: suggestions = [ "No major waste detected this week. Maintain current guardrails.", "Continue monitoring agent loop logs for emerging rate-limit patterns.", "Keep PR diff sizes under review during weekly standup.", ] return suggestions def build_report(metrics: list[dict]) -> dict: wastes = [m for m in metrics if not m.get("healthy", True)] report = { "report_type": "MUDA Weekly Waste Audit", "generated_at": now_utc().isoformat(), "period_days": DAYS_BACK, "metrics": metrics, "waste_count": len(wastes), "top_wastes": wastes, } return report def format_telegram(report: dict, prev: dict | None = None) -> str: lines = [ f"*🗑 MUDA Audit — {report['generated_at'][:10]}*", f"Period: last {report['period_days']} days", "", ] prev_metrics = {m["waste"]: m for m in (prev.get("metrics", []) if prev else [])} for m in report["metrics"]: emoji = "✅" if m.get("healthy") else "⚠️" name = m["waste"] pm = prev_metrics.get(name, {}) if name == "Overproduction": ratio_prev = pm.get("ratio", 0.0) arrow = trend_arrow(m["ratio"], ratio_prev) lines.append(f"{emoji} *Overproduction*: {m['created']} created / {m['closed']} closed = ratio {m['ratio']}{arrow}") elif name == "Waiting": hits_prev = pm.get("total_hits", 0) arrow = trend_arrow(m["total_hits"], hits_prev) lines.append(f"{emoji} *Waiting*: {m['total_hits']} rate-limit hits ({summarize_counts(m['rate_limit_hits'])}){arrow}") elif name == "Transport": trans_prev = pm.get("redirected_issues", 0) arrow = trend_arrow(m["redirected_issues"], trans_prev) lines.append(f"{emoji} *Transport*: {m['redirected_issues']} redirected issues{arrow}") elif name == "Overprocessing": avg_prev = pm.get("avg_lines_changed", 0.0) arrow = trend_arrow(m["avg_lines_changed"], avg_prev) lines.append(f"{emoji} *Overprocessing*: avg {m['avg_lines_changed']} lines/PR, {len(m['flagged_outliers'])} outliers{arrow}") elif name == "Inventory": inv_prev = pm.get("stale_issues", 0) arrow = trend_arrow(m["stale_issues"], inv_prev) lines.append(f"{emoji} *Inventory*: {m['stale_issues']} stale issues (>30d){arrow}") elif name == "Motion": motion_prev = pm.get("total_motion_events", 0) arrow = trend_arrow(m["total_motion_events"], motion_prev) lines.append(f"{emoji} *Motion*: {m['total_motion_events']} git ops ({summarize_counts(m['git_clones'])} clones){arrow}") elif name == "Defects": close_prev = pm.get("close_rate", 0.0) arrow = trend_arrow(m["close_rate"], close_prev) total_abandoned = m["closed_without_merge"] + m["merged"] lines.append(f"{emoji} *Defects*: {m['close_rate']*100:.0f}% closed without merge ({m['closed_without_merge']}/{total_abandoned}){arrow}") lines.append("") eliminations = compute_top_eliminations(report["metrics"]) lines.append("*Top 3 eliminations:*") for i, suggestion in enumerate(eliminations, 1): lines.append(f"{i}. {suggestion}") lines.append("") lines.append("_Week over week: waste metrics should decrease. If an arrow points up, investigate._") return "\n".join(lines) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): client = GiteaClient() if not client.ping(): print("[ERROR] Gitea is unreachable. Aborting audit.") sys.exit(1) print("[INFO] Starting MUDA waste audit...") metrics = [ measure_overproduction(client), measure_waiting(client), measure_transport(client), measure_overprocessing(client), measure_inventory(client), measure_motion(client), measure_defects(client), ] report = build_report(metrics) prev = load_previous_metrics() # Write JSON report reports_dir = REPO_ROOT / "reports" reports_dir.mkdir(exist_ok=True) json_path = reports_dir / f"muda-audit-{now_utc().strftime('%Y%m%d')}.json" json_path.write_text(json.dumps(report, indent=2)) print(f"[INFO] Report written to {json_path}") # Send Telegram telegram_msg = format_telegram(report, prev) if send_telegram(telegram_msg): print("[INFO] Telegram notification sent.") else: print("[WARN] Telegram notification failed or skipped.") # Persist metrics for week-over-week tracking save_metrics({ "week_ending": now_utc().date().isoformat(), "generated_at": report["generated_at"], "metrics": metrics, }) # Print summary to stdout print("\n" + "=" * 60) print(telegram_msg) print("=" * 60) if __name__ == "__main__": main()