Files
timmy-config/fleet/muda_audit.py
Ezra 6210e74af9 feat: Muda Audit — fleet waste elimination (#350)
Implements muda-audit.sh to measure the 7 wastes across the fleet:
1. Overproduction — agent issues created vs closed
2. Waiting — rate-limited API attempts from loop logs
3. Transport — issues closed-and-redirected
4. Overprocessing — PR diff size outliers (>500 lines for non-epics)
5. Inventory — issues open >30 days with no activity
6. Motion — git clone/rebase operations per issue from logs
7. Defects — PRs closed without merge vs merged

- fleet/muda_audit.py: core audit logic using gitea_client.py
- fleet/muda-audit.sh: thin bash wrapper
- cron/jobs.json: add Hermes cron job for weekly Sunday 21:00 runs
- cron/muda-audit.crontab: raw crontab snippet for host-level scheduling

Posts waste report to Telegram with week-over-week trends and top 3
elimination suggestions.

Part of Epic: #345
Closes: #350
2026-04-07 15:13:03 +00:00

662 lines
21 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Muda Audit — Fleet Waste Elimination
Measures the 7 wastes across Timmy_Foundation repos and posts a weekly report.
Part of Epic: #345
Issue: #350
Wastes:
1. Overproduction — agent issues created vs closed
2. Waiting — rate-limited API attempts from loop logs
3. Transport — issues closed-and-redirected to other repos
4. Overprocessing— PR diff size outliers (>500 lines for non-epics)
5. Inventory — issues open >30 days with no activity
6. Motion — git clone/rebase operations per issue from logs
7. Defects — PRs closed without merge vs merged
"""
from __future__ import annotations
import json
import os
import re
import sys
import urllib.request
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
# Add repo root to path so we can import gitea_client
_REPO_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(_REPO_ROOT))
from gitea_client import GiteaClient, GiteaError # noqa: E402
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
ORG = "Timmy_Foundation"
AGENT_LOGINS = {
"allegro",
"antigravity",
"bezalel",
"claude",
"codex-agent",
"ezra",
"gemini",
"google",
"grok",
"groq",
"hermes",
"kimi",
"manus",
"perplexity",
}
AGENT_LOGINS_HUMAN = {
"claude": "Claude",
"codex-agent": "Codex",
"ezra": "Ezra",
"gemini": "Gemini",
"google": "Google",
"grok": "Grok",
"groq": "Groq",
"hermes": "Hermes",
"kimi": "Kimi",
"manus": "Manus",
"perplexity": "Perplexity",
"allegro": "Allegro",
"antigravity": "Antigravity",
"bezalel": "Bezalel",
}
TELEGRAM_CHAT = "-1003664764329"
TELEGRAM_TOKEN_FILE = Path.home() / ".hermes" / "telegram_token"
METRICS_DIR = Path(os.path.expanduser("~/.local/timmy/muda-audit"))
METRICS_FILE = METRICS_DIR / "metrics.json"
LOG_PATHS = [
Path.home() / ".hermes" / "logs" / "claude-loop.log",
Path.home() / ".hermes" / "logs" / "gemini-loop.log",
Path.home() / ".hermes" / "logs" / "agent.log",
Path.home() / ".hermes" / "logs" / "errors.log",
Path.home() / ".hermes" / "logs" / "gateway.log",
]
# Patterns that indicate an issue was redirected / transported
TRANSPORT_PATTERNS = [
re.compile(r"redirect", re.IGNORECASE),
re.compile(r"moved to", re.IGNORECASE),
re.compile(r"wrong repo", re.IGNORECASE),
re.compile(r"belongs in", re.IGNORECASE),
re.compile(r"should be in", re.IGNORECASE),
re.compile(r"transported", re.IGNORECASE),
re.compile(r"relocated", re.IGNORECASE),
]
RATE_LIMIT_PATTERNS = [
re.compile(r"rate.limit", re.IGNORECASE),
re.compile(r"ratelimit", re.IGNORECASE),
re.compile(r"429"),
re.compile(r"too many requests", re.IGNORECASE),
re.compile(r"rate limit exceeded", re.IGNORECASE),
]
MOTION_PATTERNS = [
re.compile(r"git clone", re.IGNORECASE),
re.compile(r"git rebase", re.IGNORECASE),
re.compile(r"rebasing", re.IGNORECASE),
re.compile(r"cloning into", re.IGNORECASE),
]
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def iso_now() -> str:
return datetime.now(timezone.utc).isoformat()
def parse_iso(dt_str: str) -> datetime:
dt_str = dt_str.replace("Z", "+00:00")
return datetime.fromisoformat(dt_str)
def since_days_ago(days: int) -> datetime:
return datetime.now(timezone.utc) - timedelta(days=days)
def fmt_num(n: float) -> str:
return f"{n:.1f}" if isinstance(n, float) else str(n)
def send_telegram(message: str) -> bool:
if not TELEGRAM_TOKEN_FILE.exists():
print("[WARN] Telegram token not found; skipping notification.")
return False
token = TELEGRAM_TOKEN_FILE.read_text().strip()
url = f"https://api.telegram.org/bot{token}/sendMessage"
body = json.dumps(
{
"chat_id": TELEGRAM_CHAT,
"text": message,
"parse_mode": "Markdown",
"disable_web_page_preview": True,
}
).encode()
req = urllib.request.Request(
url, data=body, headers={"Content-Type": "application/json"}, method="POST"
)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
resp.read()
return True
except Exception as e:
print(f"[WARN] Telegram send failed: {e}")
return False
def load_previous_metrics() -> dict | None:
if not METRICS_FILE.exists():
return None
try:
history = json.loads(METRICS_FILE.read_text())
if history and isinstance(history, list):
return history[-1]
except (json.JSONDecodeError, OSError):
pass
return None
def save_metrics(record: dict) -> None:
METRICS_DIR.mkdir(parents=True, exist_ok=True)
history: list[dict] = []
if METRICS_FILE.exists():
try:
history = json.loads(METRICS_FILE.read_text())
if not isinstance(history, list):
history = []
except (json.JSONDecodeError, OSError):
history = []
history.append(record)
history = history[-52:]
METRICS_FILE.write_text(json.dumps(history, indent=2))
# ---------------------------------------------------------------------------
# Gitea helpers
# ---------------------------------------------------------------------------
def paginate_all(func, *args, **kwargs) -> list[Any]:
page = 1
limit = kwargs.pop("limit", 50)
results: list[Any] = []
while True:
batch = func(*args, limit=limit, page=page, **kwargs)
if not batch:
break
results.extend(batch)
if len(batch) < limit:
break
page += 1
return results
def list_org_repos(client: GiteaClient, org: str) -> list[str]:
repos = paginate_all(client.list_org_repos, org, limit=50)
return [r["name"] for r in repos if not r.get("archived", False)]
def count_issues_created_by_agents(client: GiteaClient, repo: str, since: datetime) -> int:
issues = paginate_all(client.list_issues, repo, state="all", sort="created", direction="desc", limit=50)
count = 0
for issue in issues:
created = parse_iso(issue.created_at)
if created < since:
break
if issue.user.login in AGENT_LOGINS:
count += 1
return count
def count_issues_closed(client: GiteaClient, repo: str, since: datetime) -> int:
issues = paginate_all(client.list_issues, repo, state="closed", sort="updated", direction="desc", limit=50)
count = 0
for issue in issues:
updated = parse_iso(issue.updated_at)
if updated < since:
break
count += 1
return count
def count_inventory_issues(client: GiteaClient, repo: str, stale_days: int = 30) -> int:
cutoff = since_days_ago(stale_days)
issues = paginate_all(client.list_issues, repo, state="open", sort="updated", direction="asc", limit=50)
count = 0
for issue in issues:
updated = parse_iso(issue.updated_at)
if updated < cutoff:
count += 1
else:
break
return count
def count_transport_issues(client: GiteaClient, repo: str, since: datetime) -> int:
issues = client.list_issues(repo, state="closed", sort="updated", direction="desc", limit=20)
transport = 0
for issue in issues:
if parse_iso(issue.updated_at) < since:
break
try:
comments = client.list_comments(repo, issue.number)
except GiteaError:
continue
for comment in comments:
body = comment.body or ""
if any(p.search(body) for p in TRANSPORT_PATTERNS):
transport += 1
break
return transport
def get_pr_diff_size(client: GiteaClient, repo: str, pr_number: int) -> int:
try:
files = client.get_pull_files(repo, pr_number)
return sum(f.additions + f.deletions for f in files)
except GiteaError:
return 0
def measure_overprocessing(client: GiteaClient, repo: str, since: datetime) -> dict:
pulls = paginate_all(client.list_pulls, repo, state="all", sort="newest", limit=30)
sizes: list[int] = []
outliers: list[tuple[int, str, int]] = []
for pr in pulls:
created = parse_iso(pr.created_at) if pr.created_at else since - timedelta(days=8)
if created < since:
break
diff_size = get_pr_diff_size(client, repo, pr.number)
sizes.append(diff_size)
if diff_size > 500 and not any(w in pr.title.lower() for w in ("epic", "[epic]")):
outliers.append((pr.number, pr.title, diff_size))
avg = round(sum(sizes) / len(sizes), 1) if sizes else 0.0
return {"avg_lines": avg, "outliers": outliers, "count": len(sizes)}
def measure_defects(client: GiteaClient, repo: str, since: datetime) -> dict:
pulls = paginate_all(client.list_pulls, repo, state="closed", sort="newest", limit=50)
merged = 0
closed_unmerged = 0
for pr in pulls:
created = parse_iso(pr.created_at) if pr.created_at else since - timedelta(days=8)
if created < since:
break
if pr.merged:
merged += 1
else:
closed_unmerged += 1
return {"merged": merged, "closed_unmerged": closed_unmerged}
# ---------------------------------------------------------------------------
# Log parsing
# ---------------------------------------------------------------------------
def parse_logs_for_patterns(since: datetime, patterns: list[re.Pattern]) -> list[str]:
matches: list[str] = []
for log_path in LOG_PATHS:
if not log_path.exists():
continue
try:
with open(log_path, "r", errors="ignore") as f:
for line in f:
line = line.strip()
if not line:
continue
ts = None
m = re.match(r"^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})", line)
if m:
try:
ts = datetime.strptime(m.group(1), "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
except ValueError:
pass
if ts and ts < since:
continue
if any(p.search(line) for p in patterns):
matches.append(line)
except OSError:
continue
return matches
def measure_waiting(since: datetime) -> dict:
lines = parse_logs_for_patterns(since, RATE_LIMIT_PATTERNS)
by_agent: dict[str, int] = {}
total = len(lines)
for line in lines:
agent = "unknown"
for name in AGENT_LOGINS_HUMAN.values():
if name.lower() in line.lower():
agent = name.lower()
break
if agent == "unknown":
if "claude" in line.lower():
agent = "claude"
elif "gemini" in line.lower():
agent = "gemini"
elif "groq" in line.lower():
agent = "groq"
elif "kimi" in line.lower():
agent = "kimi"
by_agent[agent] = by_agent.get(agent, 0) + 1
return {"total": total, "by_agent": by_agent}
def measure_motion(since: datetime) -> dict:
lines = parse_logs_for_patterns(since, MOTION_PATTERNS)
by_issue: dict[str, int] = {}
total = len(lines)
issue_pattern = re.compile(r"issue[_\s-]?(\d+)", re.IGNORECASE)
branch_pattern = re.compile(r"\b([a-z]+)/issue[_\s-]?(\d+)\b", re.IGNORECASE)
for line in lines:
issue_key = None
m = branch_pattern.search(line)
if m:
issue_key = f"{m.group(1).lower()}/issue-{m.group(2)}"
else:
m = issue_pattern.search(line)
if m:
issue_key = f"issue-{m.group(1)}"
if issue_key:
by_issue[issue_key] = by_issue.get(issue_key, 0) + 1
else:
by_issue["unknown"] = by_issue.get("unknown", 0) + 1
flagged = {k: v for k, v in by_issue.items() if v > 3 and k != "unknown"}
return {"total": total, "by_issue": by_issue, "flagged": flagged}
# ---------------------------------------------------------------------------
# Report builder
# ---------------------------------------------------------------------------
def build_report(metrics: dict, prev: dict | None) -> str:
lines: list[str] = []
lines.append("*🗑️ MUDA AUDIT — Weekly Waste Report*")
lines.append(f"Week ending {metrics['week_ending'][:10]}\n")
def trend_arrow(current: float, previous: float) -> str:
if previous == 0:
return ""
if current < previous:
return ""
if current > previous:
return ""
return ""
prev_w = prev or {}
op = metrics["overproduction"]
op_prev = prev_w.get("overproduction", {})
ratio = op["ratio"]
ratio_prev = op_prev.get("ratio", 0.0)
lines.append(
f"*1. Overproduction:* {op['agent_created']} agent issues created / {op['closed']} closed"
f" (ratio {fmt_num(ratio)}{trend_arrow(ratio, ratio_prev)})"
)
w = metrics["waiting"]
w_prev = prev_w.get("waiting", {})
w_total_prev = w_prev.get("total", 0)
lines.append(
f"*2. Waiting:* {w['total']} rate-limit hits this week{trend_arrow(w['total'], w_total_prev)}"
)
if w["by_agent"]:
top = sorted(w["by_agent"].items(), key=lambda x: x[1], reverse=True)[:3]
lines.append(" Top offenders: " + ", ".join(f"{k}({v})" for k, v in top))
t = metrics["transport"]
t_prev = prev_w.get("transport", {})
t_total_prev = t_prev.get("total", 0)
lines.append(
f"*3. Transport:* {t['total']} issues closed-and-redirected{trend_arrow(t['total'], t_total_prev)}"
)
ov = metrics["overprocessing"]
ov_prev = prev_w.get("overprocessing", {})
avg_prev = ov_prev.get("avg_lines", 0.0)
lines.append(
f"*4. Overprocessing:* Avg PR diff {fmt_num(ov['avg_lines'])} lines"
f"{trend_arrow(ov['avg_lines'], avg_prev)}, {len(ov['outliers'])} outliers >500 lines"
)
inv = metrics["inventory"]
inv_prev = prev_w.get("inventory", {})
inv_total_prev = inv_prev.get("total", 0)
lines.append(
f"*5. Inventory:* {inv['total']} stale issues open >30 days{trend_arrow(inv['total'], inv_total_prev)}"
)
m = metrics["motion"]
m_prev = prev_w.get("motion", {})
m_total_prev = m_prev.get("total", 0)
lines.append(
f"*6. Motion:* {m['total']} git clone/rebase ops this week{trend_arrow(m['total'], m_total_prev)}"
)
if m["flagged"]:
lines.append(f" Flagged: {len(m['flagged'])} issues with >3 ops")
d = metrics["defects"]
d_prev = prev_w.get("defects", {})
defect_rate = d["defect_rate"]
defect_rate_prev = d_prev.get("defect_rate", 0.0)
lines.append(
f"*7. Defects:* {d['merged']} merged, {d['closed_unmerged']} abandoned"
f" (defect rate {fmt_num(defect_rate)}%{trend_arrow(defect_rate, defect_rate_prev)})"
)
lines.append("\n*🔥 Top 3 Elimination Suggestions:*")
for i, suggestion in enumerate(metrics["eliminations"], 1):
lines.append(f"{i}. {suggestion}")
lines.append("\n_Week over week: waste metrics should decrease. If an arrow points up, investigate._")
return "\n".join(lines)
def compute_eliminations(metrics: dict) -> list[str]:
suggestions: list[tuple[str, float]] = []
op = metrics["overproduction"]
if op["ratio"] > 1.0:
suggestions.append(
(
"Overproduction: Stop agent loops from creating issues faster than they close them."
f" Cap new issue creation when open backlog >{op['closed'] * 2}.",
op["ratio"],
)
)
w = metrics["waiting"]
if w["total"] > 10:
top = max(w["by_agent"].items(), key=lambda x: x[1])
suggestions.append(
(
f"Waiting: {top[0]} is burning cycles on rate limits ({top[1]} hits)."
" Add exponential backoff or reduce worker count.",
w["total"],
)
)
t = metrics["transport"]
if t["total"] > 0:
suggestions.append(
(
"Transport: Issues are being filed in the wrong repos."
" Add a repo-scoping gate before any agent creates an issue.",
t["total"] * 2,
)
)
ov = metrics["overprocessing"]
if ov["outliers"]:
suggestions.append(
(
f"Overprocessing: {len(ov['outliers'])} PRs exceeded 500 lines for non-epics."
" Enforce a 200-line soft limit unless the issue is tagged 'epic'.",
len(ov["outliers"]) * 1.5,
)
)
inv = metrics["inventory"]
if inv["total"] > 20:
suggestions.append(
(
f"Inventory: {inv['total']} issues are dead stock (>30 days)."
" Run a stale-issue sweep and auto-close or consolidate.",
inv["total"],
)
)
m = metrics["motion"]
if m["flagged"]:
suggestions.append(
(
f"Motion: {len(m['flagged'])} issues required excessive clone/rebase ops."
" Cache worktrees and reuse branches across retries.",
len(m["flagged"]) * 1.5,
)
)
d = metrics["defects"]
total_prs = d["merged"] + d["closed_unmerged"]
if total_prs > 0 and d["defect_rate"] > 20:
suggestions.append(
(
f"Defects: {d['defect_rate']:.0f}% of PRs were abandoned."
" Require a pre-PR scoping check to prevent unmergeable work.",
d["defect_rate"],
)
)
suggestions.sort(key=lambda x: x[1], reverse=True)
return [s[0] for s in suggestions[:3]] if suggestions else [
"No major waste detected this week. Maintain current guardrails.",
"Continue monitoring agent loop logs for emerging rate-limit patterns.",
"Keep PR diff sizes under review during weekly standup.",
]
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def run_audit() -> dict:
client = GiteaClient()
since = since_days_ago(7)
week_ending = datetime.now(timezone.utc).date().isoformat()
print("[muda] Fetching repo list...")
repo_names = list_org_repos(client, ORG)
print(f"[muda] Scanning {len(repo_names)} repos")
agent_created = 0
issues_closed = 0
transport_total = 0
inventory_total = 0
all_overprocessing: list[dict] = []
all_defects_merged = 0
all_defects_closed = 0
for name in repo_names:
repo = f"{ORG}/{name}"
print(f"[muda] {repo}")
try:
agent_created += count_issues_created_by_agents(client, repo, since)
issues_closed += count_issues_closed(client, repo, since)
transport_total += count_transport_issues(client, repo, since)
inventory_total += count_inventory_issues(client, repo, 30)
op_proc = measure_overprocessing(client, repo, since)
all_overprocessing.append(op_proc)
defects = measure_defects(client, repo, since)
all_defects_merged += defects["merged"]
all_defects_closed += defects["closed_unmerged"]
except GiteaError as e:
print(f" [WARN] {repo}: {e}")
continue
waiting = measure_waiting(since)
motion = measure_motion(since)
total_prs = all_defects_merged + all_defects_closed
defect_rate = round((all_defects_closed / total_prs) * 100, 1) if total_prs else 0.0
avg_lines = 0.0
total_op_count = sum(op["count"] for op in all_overprocessing)
if total_op_count:
avg_lines = round(
sum(op["avg_lines"] * op["count"] for op in all_overprocessing) / total_op_count, 1
)
all_outliers = [o for op in all_overprocessing for o in op["outliers"]]
ratio = round(agent_created / issues_closed, 2) if issues_closed else float(agent_created)
metrics = {
"week_ending": week_ending,
"timestamp": iso_now(),
"overproduction": {
"agent_created": agent_created,
"closed": issues_closed,
"ratio": ratio,
},
"waiting": waiting,
"transport": {"total": transport_total},
"overprocessing": {
"avg_lines": avg_lines,
"outliers": all_outliers,
"count": total_op_count,
},
"inventory": {"total": inventory_total},
"motion": motion,
"defects": {
"merged": all_defects_merged,
"closed_unmerged": all_defects_closed,
"defect_rate": defect_rate,
},
}
metrics["eliminations"] = compute_eliminations(metrics)
return metrics
def main() -> int:
print("[muda] Starting Muda Audit...")
metrics = run_audit()
prev = load_previous_metrics()
report = build_report(metrics, prev)
print("\n" + "=" * 50)
print(report)
print("=" * 50)
save_metrics(metrics)
sent = send_telegram(report)
if sent:
print("\n[OK] Report posted to Telegram.")
else:
print("\n[WARN] Telegram notification not sent.")
return 0
if __name__ == "__main__":
raise SystemExit(main())