[MUDA] Issue #350 — Weekly fleet waste audit #351

Merged
Timmy merged 2 commits from ezra/issue-350 into main 2026-04-07 15:34:18 +00:00
9 changed files with 1389 additions and 31 deletions

View File

@@ -0,0 +1,31 @@
name: MUDA Weekly Waste Audit
on:
schedule:
- cron: "0 21 * * 0" # Sunday at 21:00 UTC
workflow_dispatch:
jobs:
muda-audit:
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Run MUDA audit
env:
GITEA_URL: "https://forge.alexanderwhitestone.com"
run: |
chmod +x bin/muda-audit.sh
./bin/muda-audit.sh
- name: Upload audit report
uses: actions/upload-artifact@v4
with:
name: muda-audit-report
path: reports/muda-audit-*.json

3
.gitignore vendored
View File

@@ -8,3 +8,6 @@
*.db-wal
*.db-shm
__pycache__/
# Generated audit reports
reports/

20
bin/muda-audit.sh Executable file
View File

@@ -0,0 +1,20 @@
#!/usr/bin/env bash
# muda-audit.sh — Weekly waste audit wrapper
# Runs scripts/muda_audit.py from the repo root.
# Designed for cron or Gitea Actions.
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
REPO_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)"
cd "$REPO_ROOT"
# Ensure python3 is available
if ! command -v python3 >/dev/null 2>&1; then
echo "ERROR: python3 not found" >&2
exit 1
fi
# Run the audit
python3 "${REPO_ROOT}/scripts/muda_audit.py" "$@"

View File

@@ -81,33 +81,7 @@
"last_error": null,
"deliver": "local",
"origin": null,
"state": "scheduled"
},
{
"id": "5e9d952871bc",
"name": "Agent Status Check",
"prompt": "Check which tmux panes are idle vs working, report utilization",
"schedule": {
"kind": "interval",
"minutes": 10,
"display": "every 10m"
},
"schedule_display": "every 10m",
"repeat": {
"times": null,
"completed": 8
},
"enabled": false,
"created_at": "2026-03-24T11:28:46.409727-04:00",
"next_run_at": "2026-03-24T15:45:58.108921-04:00",
"last_run_at": "2026-03-24T15:35:58.108921-04:00",
"last_status": "ok",
"last_error": null,
"deliver": "local",
"origin": null,
"state": "paused",
"paused_at": "2026-03-24T16:23:03.869047-04:00",
"paused_reason": "Dashboard repo frozen - loops redirected to the-nexus",
"state": "scheduled",
"skills": [],
"skill": null
},
@@ -132,8 +106,38 @@
"last_status": null,
"last_error": null,
"deliver": "local",
"origin": null
"origin": null,
"skills": [],
"skill": null
},
{
"id": "muda-audit-weekly",
"name": "Muda Audit",
"prompt": "Run the Muda Audit script at /root/wizards/ezra/workspace/timmy-config/fleet/muda-audit.sh. The script measures the 7 wastes across the fleet and posts a report to Telegram. Report whether it succeeded or failed.",
"schedule": {
"kind": "cron",
"expr": "0 21 * * 0",
"display": "0 21 * * 0"
},
"schedule_display": "0 21 * * 0",
"repeat": {
"times": null,
"completed": 0
},
"enabled": true,
"created_at": "2026-04-07T15:00:00+00:00",
"next_run_at": null,
"last_run_at": null,
"last_status": null,
"last_error": null,
"deliver": "local",
"origin": null,
"state": "scheduled",
"paused_at": null,
"paused_reason": null,
"skills": [],
"skill": null
}
],
"updated_at": "2026-03-24T16:23:03.869797-04:00"
"updated_at": "2026-04-07T15:00:00+00:00"
}

2
cron/muda-audit.crontab Normal file
View File

@@ -0,0 +1,2 @@
# Muda Audit — run every Sunday at 21:00
0 21 * * 0 cd /root/wizards/ezra/workspace/timmy-config && bash fleet/muda-audit.sh >> /tmp/muda-audit.log 2>&1

19
fleet/muda-audit.sh Executable file
View File

@@ -0,0 +1,19 @@
#!/usr/bin/env bash
# muda-audit.sh — Fleet waste elimination audit
# Part of Epic #345, Issue #350
#
# Measures the 7 wastes (Muda) across the Timmy Foundation fleet:
# 1. Overproduction 2. Waiting 3. Transport
# 4. Overprocessing 5. Inventory 6. Motion 7. Defects
#
# Posts report to Telegram and persists week-over-week metrics.
# Should be invoked weekly (Sunday night) via cron.
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# Ensure Python can find gitea_client.py in the repo root
export PYTHONPATH="${SCRIPT_DIR}/..:${PYTHONPATH:-}"
exec python3 "${SCRIPT_DIR}/muda_audit.py" "$@"

661
fleet/muda_audit.py Executable file
View File

@@ -0,0 +1,661 @@
#!/usr/bin/env python3
"""
Muda Audit — Fleet Waste Elimination
Measures the 7 wastes across Timmy_Foundation repos and posts a weekly report.
Part of Epic: #345
Issue: #350
Wastes:
1. Overproduction — agent issues created vs closed
2. Waiting — rate-limited API attempts from loop logs
3. Transport — issues closed-and-redirected to other repos
4. Overprocessing— PR diff size outliers (>500 lines for non-epics)
5. Inventory — issues open >30 days with no activity
6. Motion — git clone/rebase operations per issue from logs
7. Defects — PRs closed without merge vs merged
"""
from __future__ import annotations
import json
import os
import re
import sys
import urllib.request
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
# Add repo root to path so we can import gitea_client
_REPO_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(_REPO_ROOT))
from gitea_client import GiteaClient, GiteaError # noqa: E402
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
ORG = "Timmy_Foundation"
AGENT_LOGINS = {
"allegro",
"antigravity",
"bezalel",
"claude",
"codex-agent",
"ezra",
"gemini",
"google",
"grok",
"groq",
"hermes",
"kimi",
"manus",
"perplexity",
}
AGENT_LOGINS_HUMAN = {
"claude": "Claude",
"codex-agent": "Codex",
"ezra": "Ezra",
"gemini": "Gemini",
"google": "Google",
"grok": "Grok",
"groq": "Groq",
"hermes": "Hermes",
"kimi": "Kimi",
"manus": "Manus",
"perplexity": "Perplexity",
"allegro": "Allegro",
"antigravity": "Antigravity",
"bezalel": "Bezalel",
}
TELEGRAM_CHAT = "-1003664764329"
TELEGRAM_TOKEN_FILE = Path.home() / ".hermes" / "telegram_token"
METRICS_DIR = Path(os.path.expanduser("~/.local/timmy/muda-audit"))
METRICS_FILE = METRICS_DIR / "metrics.json"
LOG_PATHS = [
Path.home() / ".hermes" / "logs" / "claude-loop.log",
Path.home() / ".hermes" / "logs" / "gemini-loop.log",
Path.home() / ".hermes" / "logs" / "agent.log",
Path.home() / ".hermes" / "logs" / "errors.log",
Path.home() / ".hermes" / "logs" / "gateway.log",
]
# Patterns that indicate an issue was redirected / transported
TRANSPORT_PATTERNS = [
re.compile(r"redirect", re.IGNORECASE),
re.compile(r"moved to", re.IGNORECASE),
re.compile(r"wrong repo", re.IGNORECASE),
re.compile(r"belongs in", re.IGNORECASE),
re.compile(r"should be in", re.IGNORECASE),
re.compile(r"transported", re.IGNORECASE),
re.compile(r"relocated", re.IGNORECASE),
]
RATE_LIMIT_PATTERNS = [
re.compile(r"rate.limit", re.IGNORECASE),
re.compile(r"ratelimit", re.IGNORECASE),
re.compile(r"429"),
re.compile(r"too many requests", re.IGNORECASE),
re.compile(r"rate limit exceeded", re.IGNORECASE),
]
MOTION_PATTERNS = [
re.compile(r"git clone", re.IGNORECASE),
re.compile(r"git rebase", re.IGNORECASE),
re.compile(r"rebasing", re.IGNORECASE),
re.compile(r"cloning into", re.IGNORECASE),
]
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def iso_now() -> str:
return datetime.now(timezone.utc).isoformat()
def parse_iso(dt_str: str) -> datetime:
dt_str = dt_str.replace("Z", "+00:00")
return datetime.fromisoformat(dt_str)
def since_days_ago(days: int) -> datetime:
return datetime.now(timezone.utc) - timedelta(days=days)
def fmt_num(n: float) -> str:
return f"{n:.1f}" if isinstance(n, float) else str(n)
def send_telegram(message: str) -> bool:
if not TELEGRAM_TOKEN_FILE.exists():
print("[WARN] Telegram token not found; skipping notification.")
return False
token = TELEGRAM_TOKEN_FILE.read_text().strip()
url = f"https://api.telegram.org/bot{token}/sendMessage"
body = json.dumps(
{
"chat_id": TELEGRAM_CHAT,
"text": message,
"parse_mode": "Markdown",
"disable_web_page_preview": True,
}
).encode()
req = urllib.request.Request(
url, data=body, headers={"Content-Type": "application/json"}, method="POST"
)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
resp.read()
return True
except Exception as e:
print(f"[WARN] Telegram send failed: {e}")
return False
def load_previous_metrics() -> dict | None:
if not METRICS_FILE.exists():
return None
try:
history = json.loads(METRICS_FILE.read_text())
if history and isinstance(history, list):
return history[-1]
except (json.JSONDecodeError, OSError):
pass
return None
def save_metrics(record: dict) -> None:
METRICS_DIR.mkdir(parents=True, exist_ok=True)
history: list[dict] = []
if METRICS_FILE.exists():
try:
history = json.loads(METRICS_FILE.read_text())
if not isinstance(history, list):
history = []
except (json.JSONDecodeError, OSError):
history = []
history.append(record)
history = history[-52:]
METRICS_FILE.write_text(json.dumps(history, indent=2))
# ---------------------------------------------------------------------------
# Gitea helpers
# ---------------------------------------------------------------------------
def paginate_all(func, *args, **kwargs) -> list[Any]:
page = 1
limit = kwargs.pop("limit", 50)
results: list[Any] = []
while True:
batch = func(*args, limit=limit, page=page, **kwargs)
if not batch:
break
results.extend(batch)
if len(batch) < limit:
break
page += 1
return results
def list_org_repos(client: GiteaClient, org: str) -> list[str]:
repos = paginate_all(client.list_org_repos, org, limit=50)
return [r["name"] for r in repos if not r.get("archived", False)]
def count_issues_created_by_agents(client: GiteaClient, repo: str, since: datetime) -> int:
issues = paginate_all(client.list_issues, repo, state="all", sort="created", direction="desc", limit=50)
count = 0
for issue in issues:
created = parse_iso(issue.created_at)
if created < since:
break
if issue.user.login in AGENT_LOGINS:
count += 1
return count
def count_issues_closed(client: GiteaClient, repo: str, since: datetime) -> int:
issues = paginate_all(client.list_issues, repo, state="closed", sort="updated", direction="desc", limit=50)
count = 0
for issue in issues:
updated = parse_iso(issue.updated_at)
if updated < since:
break
count += 1
return count
def count_inventory_issues(client: GiteaClient, repo: str, stale_days: int = 30) -> int:
cutoff = since_days_ago(stale_days)
issues = paginate_all(client.list_issues, repo, state="open", sort="updated", direction="asc", limit=50)
count = 0
for issue in issues:
updated = parse_iso(issue.updated_at)
if updated < cutoff:
count += 1
else:
break
return count
def count_transport_issues(client: GiteaClient, repo: str, since: datetime) -> int:
issues = client.list_issues(repo, state="closed", sort="updated", direction="desc", limit=20)
transport = 0
for issue in issues:
if parse_iso(issue.updated_at) < since:
break
try:
comments = client.list_comments(repo, issue.number)
except GiteaError:
continue
for comment in comments:
body = comment.body or ""
if any(p.search(body) for p in TRANSPORT_PATTERNS):
transport += 1
break
return transport
def get_pr_diff_size(client: GiteaClient, repo: str, pr_number: int) -> int:
try:
files = client.get_pull_files(repo, pr_number)
return sum(f.additions + f.deletions for f in files)
except GiteaError:
return 0
def measure_overprocessing(client: GiteaClient, repo: str, since: datetime) -> dict:
pulls = paginate_all(client.list_pulls, repo, state="all", sort="newest", limit=30)
sizes: list[int] = []
outliers: list[tuple[int, str, int]] = []
for pr in pulls:
created = parse_iso(pr.created_at) if pr.created_at else since - timedelta(days=8)
if created < since:
break
diff_size = get_pr_diff_size(client, repo, pr.number)
sizes.append(diff_size)
if diff_size > 500 and not any(w in pr.title.lower() for w in ("epic", "[epic]")):
outliers.append((pr.number, pr.title, diff_size))
avg = round(sum(sizes) / len(sizes), 1) if sizes else 0.0
return {"avg_lines": avg, "outliers": outliers, "count": len(sizes)}
def measure_defects(client: GiteaClient, repo: str, since: datetime) -> dict:
pulls = paginate_all(client.list_pulls, repo, state="closed", sort="newest", limit=50)
merged = 0
closed_unmerged = 0
for pr in pulls:
created = parse_iso(pr.created_at) if pr.created_at else since - timedelta(days=8)
if created < since:
break
if pr.merged:
merged += 1
else:
closed_unmerged += 1
return {"merged": merged, "closed_unmerged": closed_unmerged}
# ---------------------------------------------------------------------------
# Log parsing
# ---------------------------------------------------------------------------
def parse_logs_for_patterns(since: datetime, patterns: list[re.Pattern]) -> list[str]:
matches: list[str] = []
for log_path in LOG_PATHS:
if not log_path.exists():
continue
try:
with open(log_path, "r", errors="ignore") as f:
for line in f:
line = line.strip()
if not line:
continue
ts = None
m = re.match(r"^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})", line)
if m:
try:
ts = datetime.strptime(m.group(1), "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
except ValueError:
pass
if ts and ts < since:
continue
if any(p.search(line) for p in patterns):
matches.append(line)
except OSError:
continue
return matches
def measure_waiting(since: datetime) -> dict:
lines = parse_logs_for_patterns(since, RATE_LIMIT_PATTERNS)
by_agent: dict[str, int] = {}
total = len(lines)
for line in lines:
agent = "unknown"
for name in AGENT_LOGINS_HUMAN.values():
if name.lower() in line.lower():
agent = name.lower()
break
if agent == "unknown":
if "claude" in line.lower():
agent = "claude"
elif "gemini" in line.lower():
agent = "gemini"
elif "groq" in line.lower():
agent = "groq"
elif "kimi" in line.lower():
agent = "kimi"
by_agent[agent] = by_agent.get(agent, 0) + 1
return {"total": total, "by_agent": by_agent}
def measure_motion(since: datetime) -> dict:
lines = parse_logs_for_patterns(since, MOTION_PATTERNS)
by_issue: dict[str, int] = {}
total = len(lines)
issue_pattern = re.compile(r"issue[_\s-]?(\d+)", re.IGNORECASE)
branch_pattern = re.compile(r"\b([a-z]+)/issue[_\s-]?(\d+)\b", re.IGNORECASE)
for line in lines:
issue_key = None
m = branch_pattern.search(line)
if m:
issue_key = f"{m.group(1).lower()}/issue-{m.group(2)}"
else:
m = issue_pattern.search(line)
if m:
issue_key = f"issue-{m.group(1)}"
if issue_key:
by_issue[issue_key] = by_issue.get(issue_key, 0) + 1
else:
by_issue["unknown"] = by_issue.get("unknown", 0) + 1
flagged = {k: v for k, v in by_issue.items() if v > 3 and k != "unknown"}
return {"total": total, "by_issue": by_issue, "flagged": flagged}
# ---------------------------------------------------------------------------
# Report builder
# ---------------------------------------------------------------------------
def build_report(metrics: dict, prev: dict | None) -> str:
lines: list[str] = []
lines.append("*🗑️ MUDA AUDIT — Weekly Waste Report*")
lines.append(f"Week ending {metrics['week_ending'][:10]}\n")
def trend_arrow(current: float, previous: float) -> str:
if previous == 0:
return ""
if current < previous:
return ""
if current > previous:
return ""
return ""
prev_w = prev or {}
op = metrics["overproduction"]
op_prev = prev_w.get("overproduction", {})
ratio = op["ratio"]
ratio_prev = op_prev.get("ratio", 0.0)
lines.append(
f"*1. Overproduction:* {op['agent_created']} agent issues created / {op['closed']} closed"
f" (ratio {fmt_num(ratio)}{trend_arrow(ratio, ratio_prev)})"
)
w = metrics["waiting"]
w_prev = prev_w.get("waiting", {})
w_total_prev = w_prev.get("total", 0)
lines.append(
f"*2. Waiting:* {w['total']} rate-limit hits this week{trend_arrow(w['total'], w_total_prev)}"
)
if w["by_agent"]:
top = sorted(w["by_agent"].items(), key=lambda x: x[1], reverse=True)[:3]
lines.append(" Top offenders: " + ", ".join(f"{k}({v})" for k, v in top))
t = metrics["transport"]
t_prev = prev_w.get("transport", {})
t_total_prev = t_prev.get("total", 0)
lines.append(
f"*3. Transport:* {t['total']} issues closed-and-redirected{trend_arrow(t['total'], t_total_prev)}"
)
ov = metrics["overprocessing"]
ov_prev = prev_w.get("overprocessing", {})
avg_prev = ov_prev.get("avg_lines", 0.0)
lines.append(
f"*4. Overprocessing:* Avg PR diff {fmt_num(ov['avg_lines'])} lines"
f"{trend_arrow(ov['avg_lines'], avg_prev)}, {len(ov['outliers'])} outliers >500 lines"
)
inv = metrics["inventory"]
inv_prev = prev_w.get("inventory", {})
inv_total_prev = inv_prev.get("total", 0)
lines.append(
f"*5. Inventory:* {inv['total']} stale issues open >30 days{trend_arrow(inv['total'], inv_total_prev)}"
)
m = metrics["motion"]
m_prev = prev_w.get("motion", {})
m_total_prev = m_prev.get("total", 0)
lines.append(
f"*6. Motion:* {m['total']} git clone/rebase ops this week{trend_arrow(m['total'], m_total_prev)}"
)
if m["flagged"]:
lines.append(f" Flagged: {len(m['flagged'])} issues with >3 ops")
d = metrics["defects"]
d_prev = prev_w.get("defects", {})
defect_rate = d["defect_rate"]
defect_rate_prev = d_prev.get("defect_rate", 0.0)
lines.append(
f"*7. Defects:* {d['merged']} merged, {d['closed_unmerged']} abandoned"
f" (defect rate {fmt_num(defect_rate)}%{trend_arrow(defect_rate, defect_rate_prev)})"
)
lines.append("\n*🔥 Top 3 Elimination Suggestions:*")
for i, suggestion in enumerate(metrics["eliminations"], 1):
lines.append(f"{i}. {suggestion}")
lines.append("\n_Week over week: waste metrics should decrease. If an arrow points up, investigate._")
return "\n".join(lines)
def compute_eliminations(metrics: dict) -> list[str]:
suggestions: list[tuple[str, float]] = []
op = metrics["overproduction"]
if op["ratio"] > 1.0:
suggestions.append(
(
"Overproduction: Stop agent loops from creating issues faster than they close them."
f" Cap new issue creation when open backlog >{op['closed'] * 2}.",
op["ratio"],
)
)
w = metrics["waiting"]
if w["total"] > 10:
top = max(w["by_agent"].items(), key=lambda x: x[1])
suggestions.append(
(
f"Waiting: {top[0]} is burning cycles on rate limits ({top[1]} hits)."
" Add exponential backoff or reduce worker count.",
w["total"],
)
)
t = metrics["transport"]
if t["total"] > 0:
suggestions.append(
(
"Transport: Issues are being filed in the wrong repos."
" Add a repo-scoping gate before any agent creates an issue.",
t["total"] * 2,
)
)
ov = metrics["overprocessing"]
if ov["outliers"]:
suggestions.append(
(
f"Overprocessing: {len(ov['outliers'])} PRs exceeded 500 lines for non-epics."
" Enforce a 200-line soft limit unless the issue is tagged 'epic'.",
len(ov["outliers"]) * 1.5,
)
)
inv = metrics["inventory"]
if inv["total"] > 20:
suggestions.append(
(
f"Inventory: {inv['total']} issues are dead stock (>30 days)."
" Run a stale-issue sweep and auto-close or consolidate.",
inv["total"],
)
)
m = metrics["motion"]
if m["flagged"]:
suggestions.append(
(
f"Motion: {len(m['flagged'])} issues required excessive clone/rebase ops."
" Cache worktrees and reuse branches across retries.",
len(m["flagged"]) * 1.5,
)
)
d = metrics["defects"]
total_prs = d["merged"] + d["closed_unmerged"]
if total_prs > 0 and d["defect_rate"] > 20:
suggestions.append(
(
f"Defects: {d['defect_rate']:.0f}% of PRs were abandoned."
" Require a pre-PR scoping check to prevent unmergeable work.",
d["defect_rate"],
)
)
suggestions.sort(key=lambda x: x[1], reverse=True)
return [s[0] for s in suggestions[:3]] if suggestions else [
"No major waste detected this week. Maintain current guardrails.",
"Continue monitoring agent loop logs for emerging rate-limit patterns.",
"Keep PR diff sizes under review during weekly standup.",
]
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def run_audit() -> dict:
client = GiteaClient()
since = since_days_ago(7)
week_ending = datetime.now(timezone.utc).date().isoformat()
print("[muda] Fetching repo list...")
repo_names = list_org_repos(client, ORG)
print(f"[muda] Scanning {len(repo_names)} repos")
agent_created = 0
issues_closed = 0
transport_total = 0
inventory_total = 0
all_overprocessing: list[dict] = []
all_defects_merged = 0
all_defects_closed = 0
for name in repo_names:
repo = f"{ORG}/{name}"
print(f"[muda] {repo}")
try:
agent_created += count_issues_created_by_agents(client, repo, since)
issues_closed += count_issues_closed(client, repo, since)
transport_total += count_transport_issues(client, repo, since)
inventory_total += count_inventory_issues(client, repo, 30)
op_proc = measure_overprocessing(client, repo, since)
all_overprocessing.append(op_proc)
defects = measure_defects(client, repo, since)
all_defects_merged += defects["merged"]
all_defects_closed += defects["closed_unmerged"]
except GiteaError as e:
print(f" [WARN] {repo}: {e}")
continue
waiting = measure_waiting(since)
motion = measure_motion(since)
total_prs = all_defects_merged + all_defects_closed
defect_rate = round((all_defects_closed / total_prs) * 100, 1) if total_prs else 0.0
avg_lines = 0.0
total_op_count = sum(op["count"] for op in all_overprocessing)
if total_op_count:
avg_lines = round(
sum(op["avg_lines"] * op["count"] for op in all_overprocessing) / total_op_count, 1
)
all_outliers = [o for op in all_overprocessing for o in op["outliers"]]
ratio = round(agent_created / issues_closed, 2) if issues_closed else float(agent_created)
metrics = {
"week_ending": week_ending,
"timestamp": iso_now(),
"overproduction": {
"agent_created": agent_created,
"closed": issues_closed,
"ratio": ratio,
},
"waiting": waiting,
"transport": {"total": transport_total},
"overprocessing": {
"avg_lines": avg_lines,
"outliers": all_outliers,
"count": total_op_count,
},
"inventory": {"total": inventory_total},
"motion": motion,
"defects": {
"merged": all_defects_merged,
"closed_unmerged": all_defects_closed,
"defect_rate": defect_rate,
},
}
metrics["eliminations"] = compute_eliminations(metrics)
return metrics
def main() -> int:
print("[muda] Starting Muda Audit...")
metrics = run_audit()
prev = load_previous_metrics()
report = build_report(metrics, prev)
print("\n" + "=" * 50)
print(report)
print("=" * 50)
save_metrics(metrics)
sent = send_telegram(report)
if sent:
print("\n[OK] Report posted to Telegram.")
else:
print("\n[WARN] Telegram notification not sent.")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -143,6 +143,10 @@ class PullRequest:
mergeable: bool = False
merged: bool = False
changed_files: int = 0
additions: int = 0
deletions: int = 0
created_at: str = ""
closed_at: str = ""
@classmethod
def from_dict(cls, d: dict) -> "PullRequest":
@@ -159,6 +163,10 @@ class PullRequest:
mergeable=d.get("mergeable", False),
merged=d.get("merged", False) or False,
changed_files=d.get("changed_files", 0),
additions=d.get("additions", 0),
deletions=d.get("deletions", 0),
created_at=d.get("created_at", ""),
closed_at=d.get("closed_at", ""),
)
@@ -290,9 +298,9 @@ class GiteaClient:
# -- Repos ---------------------------------------------------------------
def list_org_repos(self, org: str, limit: int = 50) -> list[dict]:
def list_org_repos(self, org: str, limit: int = 50, page: int = 1) -> list[dict]:
"""List repos in an organization."""
return self._get(f"/orgs/{org}/repos", limit=limit)
return self._get(f"/orgs/{org}/repos", limit=limit, page=page)
# -- Issues --------------------------------------------------------------

610
scripts/muda_audit.py Executable file
View File

@@ -0,0 +1,610 @@
#!/usr/bin/env python3
"""
muda_audit.py — Weekly waste audit for the Timmy Foundation fleet.
Measures 7 wastes (Muda) across Gitea repos and agent logs:
1. Overproduction — issues created vs closed (ratio > 1.0 = waste)
2. Waiting — rate-limit hits from agent logs
3. Transport — issues closed with redirect keywords
4. Overprocessing — PR diff size outliers (>500 lines)
5. Inventory — open issues stale >30 days
6. Motion — git clone/rebase churn from logs
7. Defects — PRs closed without merge vs merged
Outputs JSON report, persists week-over-week metrics, and optionally posts to Telegram.
Part of Epic #345, Issue #350.
"""
from __future__ import annotations
import glob
import json
import os
import sys
import urllib.request
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path
# Add repo root to path so we can import gitea_client
REPO_ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(REPO_ROOT))
from gitea_client import GiteaClient, GiteaError
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
ORG = "Timmy_Foundation"
REPOS = [
"the-nexus",
".profile",
"timmy-config",
"timmy-home",
"the-door",
"turboquant",
"hermes-agent",
"timmy-academy",
"wolf",
"the-testament",
"the-beacon",
]
AGENT_LOG_PATHS = [
"/root/wizards/*/home/logs/*.log",
"/root/wizards/*/logs/*.log",
"/root/wizards/*/.hermes/logs/*.log",
]
REDIRECT_KEYWORDS = [
"moved to", "belongs in", "redirected to", "closing in favor of",
"wrong repo", "should be in", "transfer to", "repost to",
]
TELEGRAM_CHAT = "-1003664764329"
TELEGRAM_TOKEN_PATHS = [
Path.home() / ".config" / "telegram" / "special_bot",
Path.home() / ".hermes" / "telegram_bot_token",
]
METRICS_DIR = Path.home() / ".local" / "timmy" / "muda-audit"
METRICS_FILE = METRICS_DIR / "metrics.json"
DAYS_BACK = 7
STALE_DAYS = 30
OVERPROCESSING_THRESHOLD = 500
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def now_utc() -> datetime:
return datetime.now(timezone.utc)
def parse_iso(ts: str) -> datetime:
if ts.endswith("Z"):
ts = ts[:-1] + "+00:00"
return datetime.fromisoformat(ts)
def within_days(ts: str, days: int) -> bool:
try:
return (now_utc() - parse_iso(ts)) <= timedelta(days=days)
except Exception:
return False
def older_than_days(ts: str, days: int) -> bool:
try:
return (now_utc() - parse_iso(ts)) >= timedelta(days=days)
except Exception:
return False
def paginate_issues(client: GiteaClient, repo: str, state: str, limit_per_page: int = 50, max_pages: int = 4):
"""Yield all issues across pages."""
full_repo = f"{ORG}/{repo}"
for page in range(1, max_pages + 1):
batch = client.list_issues(full_repo, state=state, limit=limit_per_page, page=page, sort="created", direction="desc")
if not batch:
break
for issue in batch:
yield issue
if len(batch) < limit_per_page:
break
def paginate_prs(client: GiteaClient, repo: str, state: str, limit_per_page: int = 50, max_pages: int = 3):
"""Yield all PRs across pages."""
full_repo = f"{ORG}/{repo}"
for page in range(1, max_pages + 1):
batch = client.list_pulls(full_repo, state=state, limit=limit_per_page, page=page, sort="newest")
if not batch:
break
for pr in batch:
yield pr
if len(batch) < limit_per_page:
break
def read_telegram_token() -> str | None:
for path in TELEGRAM_TOKEN_PATHS:
if path.exists():
return path.read_text().strip()
return os.environ.get("TELEGRAM_BOT_TOKEN") or None
def send_telegram(message: str) -> bool:
token = read_telegram_token()
if not token:
print("[WARN] No Telegram token found; skipping notification.")
return False
url = f"https://api.telegram.org/bot{token}/sendMessage"
payload = json.dumps({
"chat_id": TELEGRAM_CHAT,
"text": message,
"parse_mode": "Markdown",
"disable_web_page_preview": True,
}).encode()
req = urllib.request.Request(url, data=payload, method="POST", headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return resp.status == 200
except Exception as e:
print(f"[WARN] Telegram send failed: {e}")
return False
def find_log_files() -> list[Path]:
files = []
for pattern in AGENT_LOG_PATHS:
for p in glob.glob(pattern):
path = Path(p)
try:
if path.stat().st_size > 0:
files.append(path)
except OSError:
pass
return files
def grep_logs(pattern: str, files: list[Path]) -> dict[str, int]:
"""Return count of matches per agent (derived from path)."""
counts: dict[str, int] = defaultdict(int)
for f in files:
parts = f.parts
try:
idx = parts.index("wizards")
agent = parts[idx + 1]
except (ValueError, IndexError):
agent = "unknown"
try:
with open(f, "r", errors="ignore") as fh:
for line in fh:
if pattern in line.lower():
counts[agent] += 1
except Exception:
pass
return dict(counts)
def summarize_counts(counts: dict[str, int]) -> str:
if not counts:
return "none detected"
items = sorted(counts.items(), key=lambda x: -x[1])
return ", ".join(f"{k}: {v}" for k, v in items[:5])
# ---------------------------------------------------------------------------
# Week-over-week persistence
# ---------------------------------------------------------------------------
def load_previous_metrics() -> dict | None:
if not METRICS_FILE.exists():
return None
try:
history = json.loads(METRICS_FILE.read_text())
if history and isinstance(history, list):
return history[-1]
except (json.JSONDecodeError, OSError):
pass
return None
def save_metrics(record: dict) -> None:
METRICS_DIR.mkdir(parents=True, exist_ok=True)
history: list[dict] = []
if METRICS_FILE.exists():
try:
history = json.loads(METRICS_FILE.read_text())
if not isinstance(history, list):
history = []
except (json.JSONDecodeError, OSError):
history = []
history.append(record)
history = history[-52:] # keep one year of weekly reports
METRICS_FILE.write_text(json.dumps(history, indent=2))
def trend_arrow(current: float, previous: float) -> str:
if previous == 0:
return ""
if current < previous:
return ""
if current > previous:
return ""
return ""
# ---------------------------------------------------------------------------
# Waste metrics
# ---------------------------------------------------------------------------
def measure_overproduction(client: GiteaClient) -> dict:
created = 0
closed = 0
created_by_repo: dict[str, int] = defaultdict(int)
closed_by_repo: dict[str, int] = defaultdict(int)
for repo in REPOS:
try:
for issue in paginate_issues(client, repo, state="all", max_pages=3):
if within_days(issue.created_at, DAYS_BACK):
created += 1
created_by_repo[repo] += 1
if issue.state == "closed" and within_days(issue.updated_at, DAYS_BACK):
closed += 1
closed_by_repo[repo] += 1
except GiteaError as e:
print(f"[WARN] Overproduction fetch failed for {repo}: {e}")
ratio = round(created / closed, 2) if closed > 0 else (created if created > 0 else 0.0)
return {
"waste": "Overproduction",
"created": created,
"closed": closed,
"ratio": ratio,
"top_repo": max(created_by_repo, key=created_by_repo.get) if created_by_repo else None,
"healthy": ratio <= 1.0,
}
def measure_waiting(_client: GiteaClient) -> dict:
files = find_log_files()
patterns = ["rate limit", "ratelimit", "429", "too many requests"]
total_by_agent: dict[str, int] = defaultdict(int)
for pat in patterns:
counts = grep_logs(pat, files)
for agent, cnt in counts.items():
total_by_agent[agent] += cnt
total_hits = sum(total_by_agent.values())
return {
"waste": "Waiting",
"rate_limit_hits": dict(total_by_agent),
"total_hits": total_hits,
"log_files_scanned": len(files),
"healthy": total_hits == 0,
}
def measure_transport(client: GiteaClient) -> dict:
redirected = 0
examples: list[str] = []
for repo in REPOS:
checked = 0
try:
for issue in paginate_issues(client, repo, state="closed", max_pages=2):
if not within_days(issue.updated_at, DAYS_BACK):
continue
checked += 1
if checked > 20:
break
text = (issue.body or "").lower()
if any(kw in text for kw in REDIRECT_KEYWORDS):
redirected += 1
examples.append(f"{repo}#{issue.number}")
continue
try:
comments = client.list_comments(f"{ORG}/{repo}", issue.number)
for c in comments:
if any(kw in (c.body or "").lower() for kw in REDIRECT_KEYWORDS):
redirected += 1
examples.append(f"{repo}#{issue.number}")
break
except GiteaError:
pass
except GiteaError as e:
print(f"[WARN] Transport fetch failed for {repo}: {e}")
return {
"waste": "Transport",
"redirected_issues": redirected,
"examples": examples[:5],
"healthy": redirected == 0,
}
def measure_overprocessing(client: GiteaClient) -> dict:
pr_details: list[dict] = []
flagged: list[str] = []
total_lines = 0
for repo in REPOS:
try:
scanned = 0
for pr in paginate_prs(client, repo, state="all", max_pages=2):
if not within_days(pr.created_at or "", DAYS_BACK):
continue
scanned += 1
if scanned > 10:
break
full_repo = f"{ORG}/{repo}"
try:
files = client.get_pull_files(full_repo, pr.number)
except GiteaError:
files = []
lines = sum(f.additions + f.deletions for f in files)
total_lines += lines
pr_details.append({
"repo": repo,
"pr": pr.number,
"title": pr.title,
"lines": lines,
})
is_epic = "epic" in (pr.title or "").lower()
if lines > OVERPROCESSING_THRESHOLD and not is_epic:
flagged.append(f"{repo}#{pr.number} ({lines} lines)")
except GiteaError as e:
print(f"[WARN] Overprocessing fetch failed for {repo}: {e}")
avg_lines = round(total_lines / len(pr_details), 1) if pr_details else 0.0
return {
"waste": "Overprocessing",
"prs_scanned": len(pr_details),
"avg_lines_changed": avg_lines,
"flagged_outliers": flagged,
"healthy": len(flagged) == 0,
}
def measure_inventory(client: GiteaClient) -> dict:
stale = 0
by_repo: dict[str, int] = defaultdict(int)
for repo in REPOS:
try:
for issue in paginate_issues(client, repo, state="open", max_pages=4):
if older_than_days(issue.updated_at, STALE_DAYS):
stale += 1
by_repo[repo] += 1
except GiteaError as e:
print(f"[WARN] Inventory fetch failed for {repo}: {e}")
top_repo = max(by_repo, key=by_repo.get) if by_repo else None
return {
"waste": "Inventory",
"stale_issues": stale,
"by_repo": dict(by_repo),
"top_repo": top_repo,
"healthy": stale == 0,
}
def measure_motion(_client: GiteaClient) -> dict:
files = find_log_files()
clone_counts = grep_logs("git clone", files)
rebase_counts = grep_logs("git rebase", files)
fetch_counts = grep_logs("git fetch", files)
total_motion = sum(clone_counts.values()) + sum(rebase_counts.values()) + sum(fetch_counts.values())
return {
"waste": "Motion",
"git_clones": clone_counts,
"git_rebases": rebase_counts,
"git_fetches": fetch_counts,
"total_motion_events": total_motion,
"log_files_scanned": len(files),
"healthy": total_motion < 50,
}
def measure_defects(client: GiteaClient) -> dict:
merged = 0
closed_without_merge = 0
for repo in REPOS:
try:
for pr in paginate_prs(client, repo, state="closed", max_pages=2):
if not within_days(pr.created_at or "", DAYS_BACK):
continue
if pr.merged:
merged += 1
else:
closed_without_merge += 1
except GiteaError as e:
print(f"[WARN] Defects fetch failed for {repo}: {e}")
total = merged + closed_without_merge
close_rate = round(closed_without_merge / total, 2) if total > 0 else 0.0
return {
"waste": "Defects",
"merged": merged,
"closed_without_merge": closed_without_merge,
"close_rate": close_rate,
"healthy": close_rate < 0.25,
}
# ---------------------------------------------------------------------------
# Report generation
# ---------------------------------------------------------------------------
SUGGESTIONS = {
"Overproduction": "Pause issue-generation loops until backlog shrinks. Review auto-issue bots.",
"Waiting": "Add exponential backoff to API clients. Reduce loop frequency for rate-limited agents.",
"Transport": "Enforce repo-boundary check before issue creation. Close with redirect template.",
"Overprocessing": "Scope tickets tighter. Flag >500-line PRs for pre-review split.",
"Inventory": "Bulk-close or consolidate stale issues. Set 30-day auto-close for untouched items.",
"Motion": "Cache workspace directories across issues. Limit clones to 1 per issue branch.",
"Defects": "Require smoke tests before PR. Rebase before merge to reduce conflict closures.",
}
def compute_top_eliminations(metrics: list[dict]) -> list[str]:
"""Pick the top 3 unhealthiest wastes and return concrete suggestions."""
unhealthies = [m for m in metrics if not m.get("healthy", True)]
# Sort by severity heuristic
def severity(m: dict) -> float:
if m["waste"] == "Overproduction":
return m.get("ratio", 0)
if m["waste"] == "Waiting":
return m.get("total_hits", 0) / 10
if m["waste"] == "Transport":
return m.get("redirected_issues", 0)
if m["waste"] == "Overprocessing":
return len(m.get("flagged_outliers", []))
if m["waste"] == "Inventory":
return m.get("stale_issues", 0) / 10
if m["waste"] == "Motion":
return m.get("total_motion_events", 0) / 20
if m["waste"] == "Defects":
return m.get("close_rate", 0) * 10
return 0.0
unhealthies.sort(key=severity, reverse=True)
suggestions = []
for m in unhealthies[:3]:
suggestions.append(SUGGESTIONS.get(m["waste"], "Review and reduce."))
if not suggestions:
suggestions = [
"No major waste detected this week. Maintain current guardrails.",
"Continue monitoring agent loop logs for emerging rate-limit patterns.",
"Keep PR diff sizes under review during weekly standup.",
]
return suggestions
def build_report(metrics: list[dict]) -> dict:
wastes = [m for m in metrics if not m.get("healthy", True)]
report = {
"report_type": "MUDA Weekly Waste Audit",
"generated_at": now_utc().isoformat(),
"period_days": DAYS_BACK,
"metrics": metrics,
"waste_count": len(wastes),
"top_wastes": wastes,
}
return report
def format_telegram(report: dict, prev: dict | None = None) -> str:
lines = [
f"*🗑 MUDA Audit — {report['generated_at'][:10]}*",
f"Period: last {report['period_days']} days",
"",
]
prev_metrics = {m["waste"]: m for m in (prev.get("metrics", []) if prev else [])}
for m in report["metrics"]:
emoji = "" if m.get("healthy") else "⚠️"
name = m["waste"]
pm = prev_metrics.get(name, {})
if name == "Overproduction":
ratio_prev = pm.get("ratio", 0.0)
arrow = trend_arrow(m["ratio"], ratio_prev)
lines.append(f"{emoji} *Overproduction*: {m['created']} created / {m['closed']} closed = ratio {m['ratio']}{arrow}")
elif name == "Waiting":
hits_prev = pm.get("total_hits", 0)
arrow = trend_arrow(m["total_hits"], hits_prev)
lines.append(f"{emoji} *Waiting*: {m['total_hits']} rate-limit hits ({summarize_counts(m['rate_limit_hits'])}){arrow}")
elif name == "Transport":
trans_prev = pm.get("redirected_issues", 0)
arrow = trend_arrow(m["redirected_issues"], trans_prev)
lines.append(f"{emoji} *Transport*: {m['redirected_issues']} redirected issues{arrow}")
elif name == "Overprocessing":
avg_prev = pm.get("avg_lines_changed", 0.0)
arrow = trend_arrow(m["avg_lines_changed"], avg_prev)
lines.append(f"{emoji} *Overprocessing*: avg {m['avg_lines_changed']} lines/PR, {len(m['flagged_outliers'])} outliers{arrow}")
elif name == "Inventory":
inv_prev = pm.get("stale_issues", 0)
arrow = trend_arrow(m["stale_issues"], inv_prev)
lines.append(f"{emoji} *Inventory*: {m['stale_issues']} stale issues (>30d){arrow}")
elif name == "Motion":
motion_prev = pm.get("total_motion_events", 0)
arrow = trend_arrow(m["total_motion_events"], motion_prev)
lines.append(f"{emoji} *Motion*: {m['total_motion_events']} git ops ({summarize_counts(m['git_clones'])} clones){arrow}")
elif name == "Defects":
close_prev = pm.get("close_rate", 0.0)
arrow = trend_arrow(m["close_rate"], close_prev)
total_abandoned = m["closed_without_merge"] + m["merged"]
lines.append(f"{emoji} *Defects*: {m['close_rate']*100:.0f}% closed without merge ({m['closed_without_merge']}/{total_abandoned}){arrow}")
lines.append("")
eliminations = compute_top_eliminations(report["metrics"])
lines.append("*Top 3 eliminations:*")
for i, suggestion in enumerate(eliminations, 1):
lines.append(f"{i}. {suggestion}")
lines.append("")
lines.append("_Week over week: waste metrics should decrease. If an arrow points up, investigate._")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
client = GiteaClient()
if not client.ping():
print("[ERROR] Gitea is unreachable. Aborting audit.")
sys.exit(1)
print("[INFO] Starting MUDA waste audit...")
metrics = [
measure_overproduction(client),
measure_waiting(client),
measure_transport(client),
measure_overprocessing(client),
measure_inventory(client),
measure_motion(client),
measure_defects(client),
]
report = build_report(metrics)
prev = load_previous_metrics()
# Write JSON report
reports_dir = REPO_ROOT / "reports"
reports_dir.mkdir(exist_ok=True)
json_path = reports_dir / f"muda-audit-{now_utc().strftime('%Y%m%d')}.json"
json_path.write_text(json.dumps(report, indent=2))
print(f"[INFO] Report written to {json_path}")
# Send Telegram
telegram_msg = format_telegram(report, prev)
if send_telegram(telegram_msg):
print("[INFO] Telegram notification sent.")
else:
print("[WARN] Telegram notification failed or skipped.")
# Persist metrics for week-over-week tracking
save_metrics({
"week_ending": now_utc().date().isoformat(),
"generated_at": report["generated_at"],
"metrics": metrics,
})
# Print summary to stdout
print("\n" + "=" * 60)
print(telegram_msg)
print("=" * 60)
if __name__ == "__main__":
main()