[MUDA] Issue #350 — Weekly fleet waste audit #351
31
.gitea/workflows/muda-audit.yml
Normal file
31
.gitea/workflows/muda-audit.yml
Normal file
@@ -0,0 +1,31 @@
|
||||
name: MUDA Weekly Waste Audit
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: "0 21 * * 0" # Sunday at 21:00 UTC
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
muda-audit:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout repo
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.11"
|
||||
|
||||
- name: Run MUDA audit
|
||||
env:
|
||||
GITEA_URL: "https://forge.alexanderwhitestone.com"
|
||||
run: |
|
||||
chmod +x bin/muda-audit.sh
|
||||
./bin/muda-audit.sh
|
||||
|
||||
- name: Upload audit report
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: muda-audit-report
|
||||
path: reports/muda-audit-*.json
|
||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -8,3 +8,6 @@
|
||||
*.db-wal
|
||||
*.db-shm
|
||||
__pycache__/
|
||||
|
||||
# Generated audit reports
|
||||
reports/
|
||||
|
||||
20
bin/muda-audit.sh
Executable file
20
bin/muda-audit.sh
Executable file
@@ -0,0 +1,20 @@
|
||||
#!/usr/bin/env bash
|
||||
# muda-audit.sh — Weekly waste audit wrapper
|
||||
# Runs scripts/muda_audit.py from the repo root.
|
||||
# Designed for cron or Gitea Actions.
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
REPO_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)"
|
||||
|
||||
cd "$REPO_ROOT"
|
||||
|
||||
# Ensure python3 is available
|
||||
if ! command -v python3 >/dev/null 2>&1; then
|
||||
echo "ERROR: python3 not found" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Run the audit
|
||||
python3 "${REPO_ROOT}/scripts/muda_audit.py" "$@"
|
||||
@@ -81,33 +81,7 @@
|
||||
"last_error": null,
|
||||
"deliver": "local",
|
||||
"origin": null,
|
||||
"state": "scheduled"
|
||||
},
|
||||
{
|
||||
"id": "5e9d952871bc",
|
||||
"name": "Agent Status Check",
|
||||
"prompt": "Check which tmux panes are idle vs working, report utilization",
|
||||
"schedule": {
|
||||
"kind": "interval",
|
||||
"minutes": 10,
|
||||
"display": "every 10m"
|
||||
},
|
||||
"schedule_display": "every 10m",
|
||||
"repeat": {
|
||||
"times": null,
|
||||
"completed": 8
|
||||
},
|
||||
"enabled": false,
|
||||
"created_at": "2026-03-24T11:28:46.409727-04:00",
|
||||
"next_run_at": "2026-03-24T15:45:58.108921-04:00",
|
||||
"last_run_at": "2026-03-24T15:35:58.108921-04:00",
|
||||
"last_status": "ok",
|
||||
"last_error": null,
|
||||
"deliver": "local",
|
||||
"origin": null,
|
||||
"state": "paused",
|
||||
"paused_at": "2026-03-24T16:23:03.869047-04:00",
|
||||
"paused_reason": "Dashboard repo frozen - loops redirected to the-nexus",
|
||||
"state": "scheduled",
|
||||
"skills": [],
|
||||
"skill": null
|
||||
},
|
||||
@@ -132,8 +106,38 @@
|
||||
"last_status": null,
|
||||
"last_error": null,
|
||||
"deliver": "local",
|
||||
"origin": null
|
||||
"origin": null,
|
||||
"skills": [],
|
||||
"skill": null
|
||||
},
|
||||
{
|
||||
"id": "muda-audit-weekly",
|
||||
"name": "Muda Audit",
|
||||
"prompt": "Run the Muda Audit script at /root/wizards/ezra/workspace/timmy-config/fleet/muda-audit.sh. The script measures the 7 wastes across the fleet and posts a report to Telegram. Report whether it succeeded or failed.",
|
||||
"schedule": {
|
||||
"kind": "cron",
|
||||
"expr": "0 21 * * 0",
|
||||
"display": "0 21 * * 0"
|
||||
},
|
||||
"schedule_display": "0 21 * * 0",
|
||||
"repeat": {
|
||||
"times": null,
|
||||
"completed": 0
|
||||
},
|
||||
"enabled": true,
|
||||
"created_at": "2026-04-07T15:00:00+00:00",
|
||||
"next_run_at": null,
|
||||
"last_run_at": null,
|
||||
"last_status": null,
|
||||
"last_error": null,
|
||||
"deliver": "local",
|
||||
"origin": null,
|
||||
"state": "scheduled",
|
||||
"paused_at": null,
|
||||
"paused_reason": null,
|
||||
"skills": [],
|
||||
"skill": null
|
||||
}
|
||||
],
|
||||
"updated_at": "2026-03-24T16:23:03.869797-04:00"
|
||||
"updated_at": "2026-04-07T15:00:00+00:00"
|
||||
}
|
||||
2
cron/muda-audit.crontab
Normal file
2
cron/muda-audit.crontab
Normal file
@@ -0,0 +1,2 @@
|
||||
# Muda Audit — run every Sunday at 21:00
|
||||
0 21 * * 0 cd /root/wizards/ezra/workspace/timmy-config && bash fleet/muda-audit.sh >> /tmp/muda-audit.log 2>&1
|
||||
19
fleet/muda-audit.sh
Executable file
19
fleet/muda-audit.sh
Executable file
@@ -0,0 +1,19 @@
|
||||
#!/usr/bin/env bash
|
||||
# muda-audit.sh — Fleet waste elimination audit
|
||||
# Part of Epic #345, Issue #350
|
||||
#
|
||||
# Measures the 7 wastes (Muda) across the Timmy Foundation fleet:
|
||||
# 1. Overproduction 2. Waiting 3. Transport
|
||||
# 4. Overprocessing 5. Inventory 6. Motion 7. Defects
|
||||
#
|
||||
# Posts report to Telegram and persists week-over-week metrics.
|
||||
# Should be invoked weekly (Sunday night) via cron.
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
|
||||
# Ensure Python can find gitea_client.py in the repo root
|
||||
export PYTHONPATH="${SCRIPT_DIR}/..:${PYTHONPATH:-}"
|
||||
|
||||
exec python3 "${SCRIPT_DIR}/muda_audit.py" "$@"
|
||||
661
fleet/muda_audit.py
Executable file
661
fleet/muda_audit.py
Executable file
@@ -0,0 +1,661 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Muda Audit — Fleet Waste Elimination
|
||||
Measures the 7 wastes across Timmy_Foundation repos and posts a weekly report.
|
||||
|
||||
Part of Epic: #345
|
||||
Issue: #350
|
||||
|
||||
Wastes:
|
||||
1. Overproduction — agent issues created vs closed
|
||||
2. Waiting — rate-limited API attempts from loop logs
|
||||
3. Transport — issues closed-and-redirected to other repos
|
||||
4. Overprocessing— PR diff size outliers (>500 lines for non-epics)
|
||||
5. Inventory — issues open >30 days with no activity
|
||||
6. Motion — git clone/rebase operations per issue from logs
|
||||
7. Defects — PRs closed without merge vs merged
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import urllib.request
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
# Add repo root to path so we can import gitea_client
|
||||
_REPO_ROOT = Path(__file__).resolve().parent.parent
|
||||
sys.path.insert(0, str(_REPO_ROOT))
|
||||
|
||||
from gitea_client import GiteaClient, GiteaError # noqa: E402
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Config
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
ORG = "Timmy_Foundation"
|
||||
AGENT_LOGINS = {
|
||||
"allegro",
|
||||
"antigravity",
|
||||
"bezalel",
|
||||
"claude",
|
||||
"codex-agent",
|
||||
"ezra",
|
||||
"gemini",
|
||||
"google",
|
||||
"grok",
|
||||
"groq",
|
||||
"hermes",
|
||||
"kimi",
|
||||
"manus",
|
||||
"perplexity",
|
||||
}
|
||||
AGENT_LOGINS_HUMAN = {
|
||||
"claude": "Claude",
|
||||
"codex-agent": "Codex",
|
||||
"ezra": "Ezra",
|
||||
"gemini": "Gemini",
|
||||
"google": "Google",
|
||||
"grok": "Grok",
|
||||
"groq": "Groq",
|
||||
"hermes": "Hermes",
|
||||
"kimi": "Kimi",
|
||||
"manus": "Manus",
|
||||
"perplexity": "Perplexity",
|
||||
"allegro": "Allegro",
|
||||
"antigravity": "Antigravity",
|
||||
"bezalel": "Bezalel",
|
||||
}
|
||||
|
||||
TELEGRAM_CHAT = "-1003664764329"
|
||||
TELEGRAM_TOKEN_FILE = Path.home() / ".hermes" / "telegram_token"
|
||||
|
||||
METRICS_DIR = Path(os.path.expanduser("~/.local/timmy/muda-audit"))
|
||||
METRICS_FILE = METRICS_DIR / "metrics.json"
|
||||
|
||||
LOG_PATHS = [
|
||||
Path.home() / ".hermes" / "logs" / "claude-loop.log",
|
||||
Path.home() / ".hermes" / "logs" / "gemini-loop.log",
|
||||
Path.home() / ".hermes" / "logs" / "agent.log",
|
||||
Path.home() / ".hermes" / "logs" / "errors.log",
|
||||
Path.home() / ".hermes" / "logs" / "gateway.log",
|
||||
]
|
||||
|
||||
# Patterns that indicate an issue was redirected / transported
|
||||
TRANSPORT_PATTERNS = [
|
||||
re.compile(r"redirect", re.IGNORECASE),
|
||||
re.compile(r"moved to", re.IGNORECASE),
|
||||
re.compile(r"wrong repo", re.IGNORECASE),
|
||||
re.compile(r"belongs in", re.IGNORECASE),
|
||||
re.compile(r"should be in", re.IGNORECASE),
|
||||
re.compile(r"transported", re.IGNORECASE),
|
||||
re.compile(r"relocated", re.IGNORECASE),
|
||||
]
|
||||
|
||||
RATE_LIMIT_PATTERNS = [
|
||||
re.compile(r"rate.limit", re.IGNORECASE),
|
||||
re.compile(r"ratelimit", re.IGNORECASE),
|
||||
re.compile(r"429"),
|
||||
re.compile(r"too many requests", re.IGNORECASE),
|
||||
re.compile(r"rate limit exceeded", re.IGNORECASE),
|
||||
]
|
||||
|
||||
MOTION_PATTERNS = [
|
||||
re.compile(r"git clone", re.IGNORECASE),
|
||||
re.compile(r"git rebase", re.IGNORECASE),
|
||||
re.compile(r"rebasing", re.IGNORECASE),
|
||||
re.compile(r"cloning into", re.IGNORECASE),
|
||||
]
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def iso_now() -> str:
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
|
||||
def parse_iso(dt_str: str) -> datetime:
|
||||
dt_str = dt_str.replace("Z", "+00:00")
|
||||
return datetime.fromisoformat(dt_str)
|
||||
|
||||
|
||||
def since_days_ago(days: int) -> datetime:
|
||||
return datetime.now(timezone.utc) - timedelta(days=days)
|
||||
|
||||
|
||||
def fmt_num(n: float) -> str:
|
||||
return f"{n:.1f}" if isinstance(n, float) else str(n)
|
||||
|
||||
|
||||
def send_telegram(message: str) -> bool:
|
||||
if not TELEGRAM_TOKEN_FILE.exists():
|
||||
print("[WARN] Telegram token not found; skipping notification.")
|
||||
return False
|
||||
token = TELEGRAM_TOKEN_FILE.read_text().strip()
|
||||
url = f"https://api.telegram.org/bot{token}/sendMessage"
|
||||
body = json.dumps(
|
||||
{
|
||||
"chat_id": TELEGRAM_CHAT,
|
||||
"text": message,
|
||||
"parse_mode": "Markdown",
|
||||
"disable_web_page_preview": True,
|
||||
}
|
||||
).encode()
|
||||
req = urllib.request.Request(
|
||||
url, data=body, headers={"Content-Type": "application/json"}, method="POST"
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||||
resp.read()
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"[WARN] Telegram send failed: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def load_previous_metrics() -> dict | None:
|
||||
if not METRICS_FILE.exists():
|
||||
return None
|
||||
try:
|
||||
history = json.loads(METRICS_FILE.read_text())
|
||||
if history and isinstance(history, list):
|
||||
return history[-1]
|
||||
except (json.JSONDecodeError, OSError):
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def save_metrics(record: dict) -> None:
|
||||
METRICS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
history: list[dict] = []
|
||||
if METRICS_FILE.exists():
|
||||
try:
|
||||
history = json.loads(METRICS_FILE.read_text())
|
||||
if not isinstance(history, list):
|
||||
history = []
|
||||
except (json.JSONDecodeError, OSError):
|
||||
history = []
|
||||
history.append(record)
|
||||
history = history[-52:]
|
||||
METRICS_FILE.write_text(json.dumps(history, indent=2))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Gitea helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def paginate_all(func, *args, **kwargs) -> list[Any]:
|
||||
page = 1
|
||||
limit = kwargs.pop("limit", 50)
|
||||
results: list[Any] = []
|
||||
while True:
|
||||
batch = func(*args, limit=limit, page=page, **kwargs)
|
||||
if not batch:
|
||||
break
|
||||
results.extend(batch)
|
||||
if len(batch) < limit:
|
||||
break
|
||||
page += 1
|
||||
return results
|
||||
|
||||
|
||||
def list_org_repos(client: GiteaClient, org: str) -> list[str]:
|
||||
repos = paginate_all(client.list_org_repos, org, limit=50)
|
||||
return [r["name"] for r in repos if not r.get("archived", False)]
|
||||
|
||||
|
||||
def count_issues_created_by_agents(client: GiteaClient, repo: str, since: datetime) -> int:
|
||||
issues = paginate_all(client.list_issues, repo, state="all", sort="created", direction="desc", limit=50)
|
||||
count = 0
|
||||
for issue in issues:
|
||||
created = parse_iso(issue.created_at)
|
||||
if created < since:
|
||||
break
|
||||
if issue.user.login in AGENT_LOGINS:
|
||||
count += 1
|
||||
return count
|
||||
|
||||
|
||||
def count_issues_closed(client: GiteaClient, repo: str, since: datetime) -> int:
|
||||
issues = paginate_all(client.list_issues, repo, state="closed", sort="updated", direction="desc", limit=50)
|
||||
count = 0
|
||||
for issue in issues:
|
||||
updated = parse_iso(issue.updated_at)
|
||||
if updated < since:
|
||||
break
|
||||
count += 1
|
||||
return count
|
||||
|
||||
|
||||
def count_inventory_issues(client: GiteaClient, repo: str, stale_days: int = 30) -> int:
|
||||
cutoff = since_days_ago(stale_days)
|
||||
issues = paginate_all(client.list_issues, repo, state="open", sort="updated", direction="asc", limit=50)
|
||||
count = 0
|
||||
for issue in issues:
|
||||
updated = parse_iso(issue.updated_at)
|
||||
if updated < cutoff:
|
||||
count += 1
|
||||
else:
|
||||
break
|
||||
return count
|
||||
|
||||
|
||||
def count_transport_issues(client: GiteaClient, repo: str, since: datetime) -> int:
|
||||
issues = client.list_issues(repo, state="closed", sort="updated", direction="desc", limit=20)
|
||||
transport = 0
|
||||
for issue in issues:
|
||||
if parse_iso(issue.updated_at) < since:
|
||||
break
|
||||
try:
|
||||
comments = client.list_comments(repo, issue.number)
|
||||
except GiteaError:
|
||||
continue
|
||||
for comment in comments:
|
||||
body = comment.body or ""
|
||||
if any(p.search(body) for p in TRANSPORT_PATTERNS):
|
||||
transport += 1
|
||||
break
|
||||
return transport
|
||||
|
||||
|
||||
def get_pr_diff_size(client: GiteaClient, repo: str, pr_number: int) -> int:
|
||||
try:
|
||||
files = client.get_pull_files(repo, pr_number)
|
||||
return sum(f.additions + f.deletions for f in files)
|
||||
except GiteaError:
|
||||
return 0
|
||||
|
||||
|
||||
def measure_overprocessing(client: GiteaClient, repo: str, since: datetime) -> dict:
|
||||
pulls = paginate_all(client.list_pulls, repo, state="all", sort="newest", limit=30)
|
||||
sizes: list[int] = []
|
||||
outliers: list[tuple[int, str, int]] = []
|
||||
for pr in pulls:
|
||||
created = parse_iso(pr.created_at) if pr.created_at else since - timedelta(days=8)
|
||||
if created < since:
|
||||
break
|
||||
diff_size = get_pr_diff_size(client, repo, pr.number)
|
||||
sizes.append(diff_size)
|
||||
if diff_size > 500 and not any(w in pr.title.lower() for w in ("epic", "[epic]")):
|
||||
outliers.append((pr.number, pr.title, diff_size))
|
||||
avg = round(sum(sizes) / len(sizes), 1) if sizes else 0.0
|
||||
return {"avg_lines": avg, "outliers": outliers, "count": len(sizes)}
|
||||
|
||||
|
||||
def measure_defects(client: GiteaClient, repo: str, since: datetime) -> dict:
|
||||
pulls = paginate_all(client.list_pulls, repo, state="closed", sort="newest", limit=50)
|
||||
merged = 0
|
||||
closed_unmerged = 0
|
||||
for pr in pulls:
|
||||
created = parse_iso(pr.created_at) if pr.created_at else since - timedelta(days=8)
|
||||
if created < since:
|
||||
break
|
||||
if pr.merged:
|
||||
merged += 1
|
||||
else:
|
||||
closed_unmerged += 1
|
||||
return {"merged": merged, "closed_unmerged": closed_unmerged}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Log parsing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def parse_logs_for_patterns(since: datetime, patterns: list[re.Pattern]) -> list[str]:
|
||||
matches: list[str] = []
|
||||
for log_path in LOG_PATHS:
|
||||
if not log_path.exists():
|
||||
continue
|
||||
try:
|
||||
with open(log_path, "r", errors="ignore") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
ts = None
|
||||
m = re.match(r"^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})", line)
|
||||
if m:
|
||||
try:
|
||||
ts = datetime.strptime(m.group(1), "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
|
||||
except ValueError:
|
||||
pass
|
||||
if ts and ts < since:
|
||||
continue
|
||||
if any(p.search(line) for p in patterns):
|
||||
matches.append(line)
|
||||
except OSError:
|
||||
continue
|
||||
return matches
|
||||
|
||||
|
||||
def measure_waiting(since: datetime) -> dict:
|
||||
lines = parse_logs_for_patterns(since, RATE_LIMIT_PATTERNS)
|
||||
by_agent: dict[str, int] = {}
|
||||
total = len(lines)
|
||||
for line in lines:
|
||||
agent = "unknown"
|
||||
for name in AGENT_LOGINS_HUMAN.values():
|
||||
if name.lower() in line.lower():
|
||||
agent = name.lower()
|
||||
break
|
||||
if agent == "unknown":
|
||||
if "claude" in line.lower():
|
||||
agent = "claude"
|
||||
elif "gemini" in line.lower():
|
||||
agent = "gemini"
|
||||
elif "groq" in line.lower():
|
||||
agent = "groq"
|
||||
elif "kimi" in line.lower():
|
||||
agent = "kimi"
|
||||
by_agent[agent] = by_agent.get(agent, 0) + 1
|
||||
return {"total": total, "by_agent": by_agent}
|
||||
|
||||
|
||||
def measure_motion(since: datetime) -> dict:
|
||||
lines = parse_logs_for_patterns(since, MOTION_PATTERNS)
|
||||
by_issue: dict[str, int] = {}
|
||||
total = len(lines)
|
||||
issue_pattern = re.compile(r"issue[_\s-]?(\d+)", re.IGNORECASE)
|
||||
branch_pattern = re.compile(r"\b([a-z]+)/issue[_\s-]?(\d+)\b", re.IGNORECASE)
|
||||
for line in lines:
|
||||
issue_key = None
|
||||
m = branch_pattern.search(line)
|
||||
if m:
|
||||
issue_key = f"{m.group(1).lower()}/issue-{m.group(2)}"
|
||||
else:
|
||||
m = issue_pattern.search(line)
|
||||
if m:
|
||||
issue_key = f"issue-{m.group(1)}"
|
||||
if issue_key:
|
||||
by_issue[issue_key] = by_issue.get(issue_key, 0) + 1
|
||||
else:
|
||||
by_issue["unknown"] = by_issue.get("unknown", 0) + 1
|
||||
flagged = {k: v for k, v in by_issue.items() if v > 3 and k != "unknown"}
|
||||
return {"total": total, "by_issue": by_issue, "flagged": flagged}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Report builder
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def build_report(metrics: dict, prev: dict | None) -> str:
|
||||
lines: list[str] = []
|
||||
lines.append("*🗑️ MUDA AUDIT — Weekly Waste Report*")
|
||||
lines.append(f"Week ending {metrics['week_ending'][:10]}\n")
|
||||
|
||||
def trend_arrow(current: float, previous: float) -> str:
|
||||
if previous == 0:
|
||||
return ""
|
||||
if current < previous:
|
||||
return " ↓"
|
||||
if current > previous:
|
||||
return " ↑"
|
||||
return " →"
|
||||
|
||||
prev_w = prev or {}
|
||||
|
||||
op = metrics["overproduction"]
|
||||
op_prev = prev_w.get("overproduction", {})
|
||||
ratio = op["ratio"]
|
||||
ratio_prev = op_prev.get("ratio", 0.0)
|
||||
lines.append(
|
||||
f"*1. Overproduction:* {op['agent_created']} agent issues created / {op['closed']} closed"
|
||||
f" (ratio {fmt_num(ratio)}{trend_arrow(ratio, ratio_prev)})"
|
||||
)
|
||||
|
||||
w = metrics["waiting"]
|
||||
w_prev = prev_w.get("waiting", {})
|
||||
w_total_prev = w_prev.get("total", 0)
|
||||
lines.append(
|
||||
f"*2. Waiting:* {w['total']} rate-limit hits this week{trend_arrow(w['total'], w_total_prev)}"
|
||||
)
|
||||
if w["by_agent"]:
|
||||
top = sorted(w["by_agent"].items(), key=lambda x: x[1], reverse=True)[:3]
|
||||
lines.append(" Top offenders: " + ", ".join(f"{k}({v})" for k, v in top))
|
||||
|
||||
t = metrics["transport"]
|
||||
t_prev = prev_w.get("transport", {})
|
||||
t_total_prev = t_prev.get("total", 0)
|
||||
lines.append(
|
||||
f"*3. Transport:* {t['total']} issues closed-and-redirected{trend_arrow(t['total'], t_total_prev)}"
|
||||
)
|
||||
|
||||
ov = metrics["overprocessing"]
|
||||
ov_prev = prev_w.get("overprocessing", {})
|
||||
avg_prev = ov_prev.get("avg_lines", 0.0)
|
||||
lines.append(
|
||||
f"*4. Overprocessing:* Avg PR diff {fmt_num(ov['avg_lines'])} lines"
|
||||
f"{trend_arrow(ov['avg_lines'], avg_prev)}, {len(ov['outliers'])} outliers >500 lines"
|
||||
)
|
||||
|
||||
inv = metrics["inventory"]
|
||||
inv_prev = prev_w.get("inventory", {})
|
||||
inv_total_prev = inv_prev.get("total", 0)
|
||||
lines.append(
|
||||
f"*5. Inventory:* {inv['total']} stale issues open >30 days{trend_arrow(inv['total'], inv_total_prev)}"
|
||||
)
|
||||
|
||||
m = metrics["motion"]
|
||||
m_prev = prev_w.get("motion", {})
|
||||
m_total_prev = m_prev.get("total", 0)
|
||||
lines.append(
|
||||
f"*6. Motion:* {m['total']} git clone/rebase ops this week{trend_arrow(m['total'], m_total_prev)}"
|
||||
)
|
||||
if m["flagged"]:
|
||||
lines.append(f" Flagged: {len(m['flagged'])} issues with >3 ops")
|
||||
|
||||
d = metrics["defects"]
|
||||
d_prev = prev_w.get("defects", {})
|
||||
defect_rate = d["defect_rate"]
|
||||
defect_rate_prev = d_prev.get("defect_rate", 0.0)
|
||||
lines.append(
|
||||
f"*7. Defects:* {d['merged']} merged, {d['closed_unmerged']} abandoned"
|
||||
f" (defect rate {fmt_num(defect_rate)}%{trend_arrow(defect_rate, defect_rate_prev)})"
|
||||
)
|
||||
|
||||
lines.append("\n*🔥 Top 3 Elimination Suggestions:*")
|
||||
for i, suggestion in enumerate(metrics["eliminations"], 1):
|
||||
lines.append(f"{i}. {suggestion}")
|
||||
|
||||
lines.append("\n_Week over week: waste metrics should decrease. If an arrow points up, investigate._")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def compute_eliminations(metrics: dict) -> list[str]:
|
||||
suggestions: list[tuple[str, float]] = []
|
||||
|
||||
op = metrics["overproduction"]
|
||||
if op["ratio"] > 1.0:
|
||||
suggestions.append(
|
||||
(
|
||||
"Overproduction: Stop agent loops from creating issues faster than they close them."
|
||||
f" Cap new issue creation when open backlog >{op['closed'] * 2}.",
|
||||
op["ratio"],
|
||||
)
|
||||
)
|
||||
|
||||
w = metrics["waiting"]
|
||||
if w["total"] > 10:
|
||||
top = max(w["by_agent"].items(), key=lambda x: x[1])
|
||||
suggestions.append(
|
||||
(
|
||||
f"Waiting: {top[0]} is burning cycles on rate limits ({top[1]} hits)."
|
||||
" Add exponential backoff or reduce worker count.",
|
||||
w["total"],
|
||||
)
|
||||
)
|
||||
|
||||
t = metrics["transport"]
|
||||
if t["total"] > 0:
|
||||
suggestions.append(
|
||||
(
|
||||
"Transport: Issues are being filed in the wrong repos."
|
||||
" Add a repo-scoping gate before any agent creates an issue.",
|
||||
t["total"] * 2,
|
||||
)
|
||||
)
|
||||
|
||||
ov = metrics["overprocessing"]
|
||||
if ov["outliers"]:
|
||||
suggestions.append(
|
||||
(
|
||||
f"Overprocessing: {len(ov['outliers'])} PRs exceeded 500 lines for non-epics."
|
||||
" Enforce a 200-line soft limit unless the issue is tagged 'epic'.",
|
||||
len(ov["outliers"]) * 1.5,
|
||||
)
|
||||
)
|
||||
|
||||
inv = metrics["inventory"]
|
||||
if inv["total"] > 20:
|
||||
suggestions.append(
|
||||
(
|
||||
f"Inventory: {inv['total']} issues are dead stock (>30 days)."
|
||||
" Run a stale-issue sweep and auto-close or consolidate.",
|
||||
inv["total"],
|
||||
)
|
||||
)
|
||||
|
||||
m = metrics["motion"]
|
||||
if m["flagged"]:
|
||||
suggestions.append(
|
||||
(
|
||||
f"Motion: {len(m['flagged'])} issues required excessive clone/rebase ops."
|
||||
" Cache worktrees and reuse branches across retries.",
|
||||
len(m["flagged"]) * 1.5,
|
||||
)
|
||||
)
|
||||
|
||||
d = metrics["defects"]
|
||||
total_prs = d["merged"] + d["closed_unmerged"]
|
||||
if total_prs > 0 and d["defect_rate"] > 20:
|
||||
suggestions.append(
|
||||
(
|
||||
f"Defects: {d['defect_rate']:.0f}% of PRs were abandoned."
|
||||
" Require a pre-PR scoping check to prevent unmergeable work.",
|
||||
d["defect_rate"],
|
||||
)
|
||||
)
|
||||
|
||||
suggestions.sort(key=lambda x: x[1], reverse=True)
|
||||
return [s[0] for s in suggestions[:3]] if suggestions else [
|
||||
"No major waste detected this week. Maintain current guardrails.",
|
||||
"Continue monitoring agent loop logs for emerging rate-limit patterns.",
|
||||
"Keep PR diff sizes under review during weekly standup.",
|
||||
]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def run_audit() -> dict:
|
||||
client = GiteaClient()
|
||||
since = since_days_ago(7)
|
||||
week_ending = datetime.now(timezone.utc).date().isoformat()
|
||||
|
||||
print("[muda] Fetching repo list...")
|
||||
repo_names = list_org_repos(client, ORG)
|
||||
print(f"[muda] Scanning {len(repo_names)} repos")
|
||||
|
||||
agent_created = 0
|
||||
issues_closed = 0
|
||||
transport_total = 0
|
||||
inventory_total = 0
|
||||
all_overprocessing: list[dict] = []
|
||||
all_defects_merged = 0
|
||||
all_defects_closed = 0
|
||||
|
||||
for name in repo_names:
|
||||
repo = f"{ORG}/{name}"
|
||||
print(f"[muda] {repo}")
|
||||
try:
|
||||
agent_created += count_issues_created_by_agents(client, repo, since)
|
||||
issues_closed += count_issues_closed(client, repo, since)
|
||||
transport_total += count_transport_issues(client, repo, since)
|
||||
inventory_total += count_inventory_issues(client, repo, 30)
|
||||
|
||||
op_proc = measure_overprocessing(client, repo, since)
|
||||
all_overprocessing.append(op_proc)
|
||||
|
||||
defects = measure_defects(client, repo, since)
|
||||
all_defects_merged += defects["merged"]
|
||||
all_defects_closed += defects["closed_unmerged"]
|
||||
except GiteaError as e:
|
||||
print(f" [WARN] {repo}: {e}")
|
||||
continue
|
||||
|
||||
waiting = measure_waiting(since)
|
||||
motion = measure_motion(since)
|
||||
|
||||
total_prs = all_defects_merged + all_defects_closed
|
||||
defect_rate = round((all_defects_closed / total_prs) * 100, 1) if total_prs else 0.0
|
||||
|
||||
avg_lines = 0.0
|
||||
total_op_count = sum(op["count"] for op in all_overprocessing)
|
||||
if total_op_count:
|
||||
avg_lines = round(
|
||||
sum(op["avg_lines"] * op["count"] for op in all_overprocessing) / total_op_count, 1
|
||||
)
|
||||
all_outliers = [o for op in all_overprocessing for o in op["outliers"]]
|
||||
|
||||
ratio = round(agent_created / issues_closed, 2) if issues_closed else float(agent_created)
|
||||
|
||||
metrics = {
|
||||
"week_ending": week_ending,
|
||||
"timestamp": iso_now(),
|
||||
"overproduction": {
|
||||
"agent_created": agent_created,
|
||||
"closed": issues_closed,
|
||||
"ratio": ratio,
|
||||
},
|
||||
"waiting": waiting,
|
||||
"transport": {"total": transport_total},
|
||||
"overprocessing": {
|
||||
"avg_lines": avg_lines,
|
||||
"outliers": all_outliers,
|
||||
"count": total_op_count,
|
||||
},
|
||||
"inventory": {"total": inventory_total},
|
||||
"motion": motion,
|
||||
"defects": {
|
||||
"merged": all_defects_merged,
|
||||
"closed_unmerged": all_defects_closed,
|
||||
"defect_rate": defect_rate,
|
||||
},
|
||||
}
|
||||
|
||||
metrics["eliminations"] = compute_eliminations(metrics)
|
||||
return metrics
|
||||
|
||||
|
||||
def main() -> int:
|
||||
print("[muda] Starting Muda Audit...")
|
||||
metrics = run_audit()
|
||||
prev = load_previous_metrics()
|
||||
report = build_report(metrics, prev)
|
||||
|
||||
print("\n" + "=" * 50)
|
||||
print(report)
|
||||
print("=" * 50)
|
||||
|
||||
save_metrics(metrics)
|
||||
sent = send_telegram(report)
|
||||
if sent:
|
||||
print("\n[OK] Report posted to Telegram.")
|
||||
else:
|
||||
print("\n[WARN] Telegram notification not sent.")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
@@ -143,6 +143,10 @@ class PullRequest:
|
||||
mergeable: bool = False
|
||||
merged: bool = False
|
||||
changed_files: int = 0
|
||||
additions: int = 0
|
||||
deletions: int = 0
|
||||
created_at: str = ""
|
||||
closed_at: str = ""
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, d: dict) -> "PullRequest":
|
||||
@@ -159,6 +163,10 @@ class PullRequest:
|
||||
mergeable=d.get("mergeable", False),
|
||||
merged=d.get("merged", False) or False,
|
||||
changed_files=d.get("changed_files", 0),
|
||||
additions=d.get("additions", 0),
|
||||
deletions=d.get("deletions", 0),
|
||||
created_at=d.get("created_at", ""),
|
||||
closed_at=d.get("closed_at", ""),
|
||||
)
|
||||
|
||||
|
||||
@@ -290,9 +298,9 @@ class GiteaClient:
|
||||
|
||||
# -- Repos ---------------------------------------------------------------
|
||||
|
||||
def list_org_repos(self, org: str, limit: int = 50) -> list[dict]:
|
||||
def list_org_repos(self, org: str, limit: int = 50, page: int = 1) -> list[dict]:
|
||||
"""List repos in an organization."""
|
||||
return self._get(f"/orgs/{org}/repos", limit=limit)
|
||||
return self._get(f"/orgs/{org}/repos", limit=limit, page=page)
|
||||
|
||||
# -- Issues --------------------------------------------------------------
|
||||
|
||||
|
||||
610
scripts/muda_audit.py
Executable file
610
scripts/muda_audit.py
Executable file
@@ -0,0 +1,610 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
muda_audit.py — Weekly waste audit for the Timmy Foundation fleet.
|
||||
|
||||
Measures 7 wastes (Muda) across Gitea repos and agent logs:
|
||||
1. Overproduction — issues created vs closed (ratio > 1.0 = waste)
|
||||
2. Waiting — rate-limit hits from agent logs
|
||||
3. Transport — issues closed with redirect keywords
|
||||
4. Overprocessing — PR diff size outliers (>500 lines)
|
||||
5. Inventory — open issues stale >30 days
|
||||
6. Motion — git clone/rebase churn from logs
|
||||
7. Defects — PRs closed without merge vs merged
|
||||
|
||||
Outputs JSON report, persists week-over-week metrics, and optionally posts to Telegram.
|
||||
Part of Epic #345, Issue #350.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import urllib.request
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
|
||||
# Add repo root to path so we can import gitea_client
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent
|
||||
sys.path.insert(0, str(REPO_ROOT))
|
||||
|
||||
from gitea_client import GiteaClient, GiteaError
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Configuration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
ORG = "Timmy_Foundation"
|
||||
REPOS = [
|
||||
"the-nexus",
|
||||
".profile",
|
||||
"timmy-config",
|
||||
"timmy-home",
|
||||
"the-door",
|
||||
"turboquant",
|
||||
"hermes-agent",
|
||||
"timmy-academy",
|
||||
"wolf",
|
||||
"the-testament",
|
||||
"the-beacon",
|
||||
]
|
||||
|
||||
AGENT_LOG_PATHS = [
|
||||
"/root/wizards/*/home/logs/*.log",
|
||||
"/root/wizards/*/logs/*.log",
|
||||
"/root/wizards/*/.hermes/logs/*.log",
|
||||
]
|
||||
|
||||
REDIRECT_KEYWORDS = [
|
||||
"moved to", "belongs in", "redirected to", "closing in favor of",
|
||||
"wrong repo", "should be in", "transfer to", "repost to",
|
||||
]
|
||||
|
||||
TELEGRAM_CHAT = "-1003664764329"
|
||||
TELEGRAM_TOKEN_PATHS = [
|
||||
Path.home() / ".config" / "telegram" / "special_bot",
|
||||
Path.home() / ".hermes" / "telegram_bot_token",
|
||||
]
|
||||
|
||||
METRICS_DIR = Path.home() / ".local" / "timmy" / "muda-audit"
|
||||
METRICS_FILE = METRICS_DIR / "metrics.json"
|
||||
|
||||
DAYS_BACK = 7
|
||||
STALE_DAYS = 30
|
||||
OVERPROCESSING_THRESHOLD = 500
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def now_utc() -> datetime:
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
def parse_iso(ts: str) -> datetime:
|
||||
if ts.endswith("Z"):
|
||||
ts = ts[:-1] + "+00:00"
|
||||
return datetime.fromisoformat(ts)
|
||||
|
||||
|
||||
def within_days(ts: str, days: int) -> bool:
|
||||
try:
|
||||
return (now_utc() - parse_iso(ts)) <= timedelta(days=days)
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def older_than_days(ts: str, days: int) -> bool:
|
||||
try:
|
||||
return (now_utc() - parse_iso(ts)) >= timedelta(days=days)
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def paginate_issues(client: GiteaClient, repo: str, state: str, limit_per_page: int = 50, max_pages: int = 4):
|
||||
"""Yield all issues across pages."""
|
||||
full_repo = f"{ORG}/{repo}"
|
||||
for page in range(1, max_pages + 1):
|
||||
batch = client.list_issues(full_repo, state=state, limit=limit_per_page, page=page, sort="created", direction="desc")
|
||||
if not batch:
|
||||
break
|
||||
for issue in batch:
|
||||
yield issue
|
||||
if len(batch) < limit_per_page:
|
||||
break
|
||||
|
||||
|
||||
def paginate_prs(client: GiteaClient, repo: str, state: str, limit_per_page: int = 50, max_pages: int = 3):
|
||||
"""Yield all PRs across pages."""
|
||||
full_repo = f"{ORG}/{repo}"
|
||||
for page in range(1, max_pages + 1):
|
||||
batch = client.list_pulls(full_repo, state=state, limit=limit_per_page, page=page, sort="newest")
|
||||
if not batch:
|
||||
break
|
||||
for pr in batch:
|
||||
yield pr
|
||||
if len(batch) < limit_per_page:
|
||||
break
|
||||
|
||||
|
||||
def read_telegram_token() -> str | None:
|
||||
for path in TELEGRAM_TOKEN_PATHS:
|
||||
if path.exists():
|
||||
return path.read_text().strip()
|
||||
return os.environ.get("TELEGRAM_BOT_TOKEN") or None
|
||||
|
||||
|
||||
def send_telegram(message: str) -> bool:
|
||||
token = read_telegram_token()
|
||||
if not token:
|
||||
print("[WARN] No Telegram token found; skipping notification.")
|
||||
return False
|
||||
url = f"https://api.telegram.org/bot{token}/sendMessage"
|
||||
payload = json.dumps({
|
||||
"chat_id": TELEGRAM_CHAT,
|
||||
"text": message,
|
||||
"parse_mode": "Markdown",
|
||||
"disable_web_page_preview": True,
|
||||
}).encode()
|
||||
req = urllib.request.Request(url, data=payload, method="POST", headers={"Content-Type": "application/json"})
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||||
return resp.status == 200
|
||||
except Exception as e:
|
||||
print(f"[WARN] Telegram send failed: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def find_log_files() -> list[Path]:
|
||||
files = []
|
||||
for pattern in AGENT_LOG_PATHS:
|
||||
for p in glob.glob(pattern):
|
||||
path = Path(p)
|
||||
try:
|
||||
if path.stat().st_size > 0:
|
||||
files.append(path)
|
||||
except OSError:
|
||||
pass
|
||||
return files
|
||||
|
||||
|
||||
def grep_logs(pattern: str, files: list[Path]) -> dict[str, int]:
|
||||
"""Return count of matches per agent (derived from path)."""
|
||||
counts: dict[str, int] = defaultdict(int)
|
||||
for f in files:
|
||||
parts = f.parts
|
||||
try:
|
||||
idx = parts.index("wizards")
|
||||
agent = parts[idx + 1]
|
||||
except (ValueError, IndexError):
|
||||
agent = "unknown"
|
||||
try:
|
||||
with open(f, "r", errors="ignore") as fh:
|
||||
for line in fh:
|
||||
if pattern in line.lower():
|
||||
counts[agent] += 1
|
||||
except Exception:
|
||||
pass
|
||||
return dict(counts)
|
||||
|
||||
|
||||
def summarize_counts(counts: dict[str, int]) -> str:
|
||||
if not counts:
|
||||
return "none detected"
|
||||
items = sorted(counts.items(), key=lambda x: -x[1])
|
||||
return ", ".join(f"{k}: {v}" for k, v in items[:5])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Week-over-week persistence
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def load_previous_metrics() -> dict | None:
|
||||
if not METRICS_FILE.exists():
|
||||
return None
|
||||
try:
|
||||
history = json.loads(METRICS_FILE.read_text())
|
||||
if history and isinstance(history, list):
|
||||
return history[-1]
|
||||
except (json.JSONDecodeError, OSError):
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def save_metrics(record: dict) -> None:
|
||||
METRICS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
history: list[dict] = []
|
||||
if METRICS_FILE.exists():
|
||||
try:
|
||||
history = json.loads(METRICS_FILE.read_text())
|
||||
if not isinstance(history, list):
|
||||
history = []
|
||||
except (json.JSONDecodeError, OSError):
|
||||
history = []
|
||||
history.append(record)
|
||||
history = history[-52:] # keep one year of weekly reports
|
||||
METRICS_FILE.write_text(json.dumps(history, indent=2))
|
||||
|
||||
|
||||
def trend_arrow(current: float, previous: float) -> str:
|
||||
if previous == 0:
|
||||
return ""
|
||||
if current < previous:
|
||||
return " ↓"
|
||||
if current > previous:
|
||||
return " ↑"
|
||||
return " →"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Waste metrics
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def measure_overproduction(client: GiteaClient) -> dict:
|
||||
created = 0
|
||||
closed = 0
|
||||
created_by_repo: dict[str, int] = defaultdict(int)
|
||||
closed_by_repo: dict[str, int] = defaultdict(int)
|
||||
|
||||
for repo in REPOS:
|
||||
try:
|
||||
for issue in paginate_issues(client, repo, state="all", max_pages=3):
|
||||
if within_days(issue.created_at, DAYS_BACK):
|
||||
created += 1
|
||||
created_by_repo[repo] += 1
|
||||
if issue.state == "closed" and within_days(issue.updated_at, DAYS_BACK):
|
||||
closed += 1
|
||||
closed_by_repo[repo] += 1
|
||||
except GiteaError as e:
|
||||
print(f"[WARN] Overproduction fetch failed for {repo}: {e}")
|
||||
|
||||
ratio = round(created / closed, 2) if closed > 0 else (created if created > 0 else 0.0)
|
||||
return {
|
||||
"waste": "Overproduction",
|
||||
"created": created,
|
||||
"closed": closed,
|
||||
"ratio": ratio,
|
||||
"top_repo": max(created_by_repo, key=created_by_repo.get) if created_by_repo else None,
|
||||
"healthy": ratio <= 1.0,
|
||||
}
|
||||
|
||||
|
||||
def measure_waiting(_client: GiteaClient) -> dict:
|
||||
files = find_log_files()
|
||||
patterns = ["rate limit", "ratelimit", "429", "too many requests"]
|
||||
total_by_agent: dict[str, int] = defaultdict(int)
|
||||
for pat in patterns:
|
||||
counts = grep_logs(pat, files)
|
||||
for agent, cnt in counts.items():
|
||||
total_by_agent[agent] += cnt
|
||||
|
||||
total_hits = sum(total_by_agent.values())
|
||||
return {
|
||||
"waste": "Waiting",
|
||||
"rate_limit_hits": dict(total_by_agent),
|
||||
"total_hits": total_hits,
|
||||
"log_files_scanned": len(files),
|
||||
"healthy": total_hits == 0,
|
||||
}
|
||||
|
||||
|
||||
def measure_transport(client: GiteaClient) -> dict:
|
||||
redirected = 0
|
||||
examples: list[str] = []
|
||||
for repo in REPOS:
|
||||
checked = 0
|
||||
try:
|
||||
for issue in paginate_issues(client, repo, state="closed", max_pages=2):
|
||||
if not within_days(issue.updated_at, DAYS_BACK):
|
||||
continue
|
||||
checked += 1
|
||||
if checked > 20:
|
||||
break
|
||||
text = (issue.body or "").lower()
|
||||
if any(kw in text for kw in REDIRECT_KEYWORDS):
|
||||
redirected += 1
|
||||
examples.append(f"{repo}#{issue.number}")
|
||||
continue
|
||||
try:
|
||||
comments = client.list_comments(f"{ORG}/{repo}", issue.number)
|
||||
for c in comments:
|
||||
if any(kw in (c.body or "").lower() for kw in REDIRECT_KEYWORDS):
|
||||
redirected += 1
|
||||
examples.append(f"{repo}#{issue.number}")
|
||||
break
|
||||
except GiteaError:
|
||||
pass
|
||||
except GiteaError as e:
|
||||
print(f"[WARN] Transport fetch failed for {repo}: {e}")
|
||||
|
||||
return {
|
||||
"waste": "Transport",
|
||||
"redirected_issues": redirected,
|
||||
"examples": examples[:5],
|
||||
"healthy": redirected == 0,
|
||||
}
|
||||
|
||||
|
||||
def measure_overprocessing(client: GiteaClient) -> dict:
|
||||
pr_details: list[dict] = []
|
||||
flagged: list[str] = []
|
||||
total_lines = 0
|
||||
|
||||
for repo in REPOS:
|
||||
try:
|
||||
scanned = 0
|
||||
for pr in paginate_prs(client, repo, state="all", max_pages=2):
|
||||
if not within_days(pr.created_at or "", DAYS_BACK):
|
||||
continue
|
||||
scanned += 1
|
||||
if scanned > 10:
|
||||
break
|
||||
full_repo = f"{ORG}/{repo}"
|
||||
try:
|
||||
files = client.get_pull_files(full_repo, pr.number)
|
||||
except GiteaError:
|
||||
files = []
|
||||
lines = sum(f.additions + f.deletions for f in files)
|
||||
total_lines += lines
|
||||
pr_details.append({
|
||||
"repo": repo,
|
||||
"pr": pr.number,
|
||||
"title": pr.title,
|
||||
"lines": lines,
|
||||
})
|
||||
is_epic = "epic" in (pr.title or "").lower()
|
||||
if lines > OVERPROCESSING_THRESHOLD and not is_epic:
|
||||
flagged.append(f"{repo}#{pr.number} ({lines} lines)")
|
||||
except GiteaError as e:
|
||||
print(f"[WARN] Overprocessing fetch failed for {repo}: {e}")
|
||||
|
||||
avg_lines = round(total_lines / len(pr_details), 1) if pr_details else 0.0
|
||||
return {
|
||||
"waste": "Overprocessing",
|
||||
"prs_scanned": len(pr_details),
|
||||
"avg_lines_changed": avg_lines,
|
||||
"flagged_outliers": flagged,
|
||||
"healthy": len(flagged) == 0,
|
||||
}
|
||||
|
||||
|
||||
def measure_inventory(client: GiteaClient) -> dict:
|
||||
stale = 0
|
||||
by_repo: dict[str, int] = defaultdict(int)
|
||||
for repo in REPOS:
|
||||
try:
|
||||
for issue in paginate_issues(client, repo, state="open", max_pages=4):
|
||||
if older_than_days(issue.updated_at, STALE_DAYS):
|
||||
stale += 1
|
||||
by_repo[repo] += 1
|
||||
except GiteaError as e:
|
||||
print(f"[WARN] Inventory fetch failed for {repo}: {e}")
|
||||
|
||||
top_repo = max(by_repo, key=by_repo.get) if by_repo else None
|
||||
return {
|
||||
"waste": "Inventory",
|
||||
"stale_issues": stale,
|
||||
"by_repo": dict(by_repo),
|
||||
"top_repo": top_repo,
|
||||
"healthy": stale == 0,
|
||||
}
|
||||
|
||||
|
||||
def measure_motion(_client: GiteaClient) -> dict:
|
||||
files = find_log_files()
|
||||
clone_counts = grep_logs("git clone", files)
|
||||
rebase_counts = grep_logs("git rebase", files)
|
||||
fetch_counts = grep_logs("git fetch", files)
|
||||
|
||||
total_motion = sum(clone_counts.values()) + sum(rebase_counts.values()) + sum(fetch_counts.values())
|
||||
|
||||
return {
|
||||
"waste": "Motion",
|
||||
"git_clones": clone_counts,
|
||||
"git_rebases": rebase_counts,
|
||||
"git_fetches": fetch_counts,
|
||||
"total_motion_events": total_motion,
|
||||
"log_files_scanned": len(files),
|
||||
"healthy": total_motion < 50,
|
||||
}
|
||||
|
||||
|
||||
def measure_defects(client: GiteaClient) -> dict:
|
||||
merged = 0
|
||||
closed_without_merge = 0
|
||||
for repo in REPOS:
|
||||
try:
|
||||
for pr in paginate_prs(client, repo, state="closed", max_pages=2):
|
||||
if not within_days(pr.created_at or "", DAYS_BACK):
|
||||
continue
|
||||
if pr.merged:
|
||||
merged += 1
|
||||
else:
|
||||
closed_without_merge += 1
|
||||
except GiteaError as e:
|
||||
print(f"[WARN] Defects fetch failed for {repo}: {e}")
|
||||
|
||||
total = merged + closed_without_merge
|
||||
close_rate = round(closed_without_merge / total, 2) if total > 0 else 0.0
|
||||
return {
|
||||
"waste": "Defects",
|
||||
"merged": merged,
|
||||
"closed_without_merge": closed_without_merge,
|
||||
"close_rate": close_rate,
|
||||
"healthy": close_rate < 0.25,
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Report generation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
SUGGESTIONS = {
|
||||
"Overproduction": "Pause issue-generation loops until backlog shrinks. Review auto-issue bots.",
|
||||
"Waiting": "Add exponential backoff to API clients. Reduce loop frequency for rate-limited agents.",
|
||||
"Transport": "Enforce repo-boundary check before issue creation. Close with redirect template.",
|
||||
"Overprocessing": "Scope tickets tighter. Flag >500-line PRs for pre-review split.",
|
||||
"Inventory": "Bulk-close or consolidate stale issues. Set 30-day auto-close for untouched items.",
|
||||
"Motion": "Cache workspace directories across issues. Limit clones to 1 per issue branch.",
|
||||
"Defects": "Require smoke tests before PR. Rebase before merge to reduce conflict closures.",
|
||||
}
|
||||
|
||||
|
||||
def compute_top_eliminations(metrics: list[dict]) -> list[str]:
|
||||
"""Pick the top 3 unhealthiest wastes and return concrete suggestions."""
|
||||
unhealthies = [m for m in metrics if not m.get("healthy", True)]
|
||||
# Sort by severity heuristic
|
||||
def severity(m: dict) -> float:
|
||||
if m["waste"] == "Overproduction":
|
||||
return m.get("ratio", 0)
|
||||
if m["waste"] == "Waiting":
|
||||
return m.get("total_hits", 0) / 10
|
||||
if m["waste"] == "Transport":
|
||||
return m.get("redirected_issues", 0)
|
||||
if m["waste"] == "Overprocessing":
|
||||
return len(m.get("flagged_outliers", []))
|
||||
if m["waste"] == "Inventory":
|
||||
return m.get("stale_issues", 0) / 10
|
||||
if m["waste"] == "Motion":
|
||||
return m.get("total_motion_events", 0) / 20
|
||||
if m["waste"] == "Defects":
|
||||
return m.get("close_rate", 0) * 10
|
||||
return 0.0
|
||||
|
||||
unhealthies.sort(key=severity, reverse=True)
|
||||
suggestions = []
|
||||
for m in unhealthies[:3]:
|
||||
suggestions.append(SUGGESTIONS.get(m["waste"], "Review and reduce."))
|
||||
if not suggestions:
|
||||
suggestions = [
|
||||
"No major waste detected this week. Maintain current guardrails.",
|
||||
"Continue monitoring agent loop logs for emerging rate-limit patterns.",
|
||||
"Keep PR diff sizes under review during weekly standup.",
|
||||
]
|
||||
return suggestions
|
||||
|
||||
|
||||
def build_report(metrics: list[dict]) -> dict:
|
||||
wastes = [m for m in metrics if not m.get("healthy", True)]
|
||||
report = {
|
||||
"report_type": "MUDA Weekly Waste Audit",
|
||||
"generated_at": now_utc().isoformat(),
|
||||
"period_days": DAYS_BACK,
|
||||
"metrics": metrics,
|
||||
"waste_count": len(wastes),
|
||||
"top_wastes": wastes,
|
||||
}
|
||||
return report
|
||||
|
||||
|
||||
def format_telegram(report: dict, prev: dict | None = None) -> str:
|
||||
lines = [
|
||||
f"*🗑 MUDA Audit — {report['generated_at'][:10]}*",
|
||||
f"Period: last {report['period_days']} days",
|
||||
"",
|
||||
]
|
||||
|
||||
prev_metrics = {m["waste"]: m for m in (prev.get("metrics", []) if prev else [])}
|
||||
|
||||
for m in report["metrics"]:
|
||||
emoji = "✅" if m.get("healthy") else "⚠️"
|
||||
name = m["waste"]
|
||||
pm = prev_metrics.get(name, {})
|
||||
|
||||
if name == "Overproduction":
|
||||
ratio_prev = pm.get("ratio", 0.0)
|
||||
arrow = trend_arrow(m["ratio"], ratio_prev)
|
||||
lines.append(f"{emoji} *Overproduction*: {m['created']} created / {m['closed']} closed = ratio {m['ratio']}{arrow}")
|
||||
elif name == "Waiting":
|
||||
hits_prev = pm.get("total_hits", 0)
|
||||
arrow = trend_arrow(m["total_hits"], hits_prev)
|
||||
lines.append(f"{emoji} *Waiting*: {m['total_hits']} rate-limit hits ({summarize_counts(m['rate_limit_hits'])}){arrow}")
|
||||
elif name == "Transport":
|
||||
trans_prev = pm.get("redirected_issues", 0)
|
||||
arrow = trend_arrow(m["redirected_issues"], trans_prev)
|
||||
lines.append(f"{emoji} *Transport*: {m['redirected_issues']} redirected issues{arrow}")
|
||||
elif name == "Overprocessing":
|
||||
avg_prev = pm.get("avg_lines_changed", 0.0)
|
||||
arrow = trend_arrow(m["avg_lines_changed"], avg_prev)
|
||||
lines.append(f"{emoji} *Overprocessing*: avg {m['avg_lines_changed']} lines/PR, {len(m['flagged_outliers'])} outliers{arrow}")
|
||||
elif name == "Inventory":
|
||||
inv_prev = pm.get("stale_issues", 0)
|
||||
arrow = trend_arrow(m["stale_issues"], inv_prev)
|
||||
lines.append(f"{emoji} *Inventory*: {m['stale_issues']} stale issues (>30d){arrow}")
|
||||
elif name == "Motion":
|
||||
motion_prev = pm.get("total_motion_events", 0)
|
||||
arrow = trend_arrow(m["total_motion_events"], motion_prev)
|
||||
lines.append(f"{emoji} *Motion*: {m['total_motion_events']} git ops ({summarize_counts(m['git_clones'])} clones){arrow}")
|
||||
elif name == "Defects":
|
||||
close_prev = pm.get("close_rate", 0.0)
|
||||
arrow = trend_arrow(m["close_rate"], close_prev)
|
||||
total_abandoned = m["closed_without_merge"] + m["merged"]
|
||||
lines.append(f"{emoji} *Defects*: {m['close_rate']*100:.0f}% closed without merge ({m['closed_without_merge']}/{total_abandoned}){arrow}")
|
||||
|
||||
lines.append("")
|
||||
eliminations = compute_top_eliminations(report["metrics"])
|
||||
lines.append("*Top 3 eliminations:*")
|
||||
for i, suggestion in enumerate(eliminations, 1):
|
||||
lines.append(f"{i}. {suggestion}")
|
||||
|
||||
lines.append("")
|
||||
lines.append("_Week over week: waste metrics should decrease. If an arrow points up, investigate._")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def main():
|
||||
client = GiteaClient()
|
||||
if not client.ping():
|
||||
print("[ERROR] Gitea is unreachable. Aborting audit.")
|
||||
sys.exit(1)
|
||||
|
||||
print("[INFO] Starting MUDA waste audit...")
|
||||
metrics = [
|
||||
measure_overproduction(client),
|
||||
measure_waiting(client),
|
||||
measure_transport(client),
|
||||
measure_overprocessing(client),
|
||||
measure_inventory(client),
|
||||
measure_motion(client),
|
||||
measure_defects(client),
|
||||
]
|
||||
|
||||
report = build_report(metrics)
|
||||
prev = load_previous_metrics()
|
||||
|
||||
# Write JSON report
|
||||
reports_dir = REPO_ROOT / "reports"
|
||||
reports_dir.mkdir(exist_ok=True)
|
||||
json_path = reports_dir / f"muda-audit-{now_utc().strftime('%Y%m%d')}.json"
|
||||
json_path.write_text(json.dumps(report, indent=2))
|
||||
print(f"[INFO] Report written to {json_path}")
|
||||
|
||||
# Send Telegram
|
||||
telegram_msg = format_telegram(report, prev)
|
||||
if send_telegram(telegram_msg):
|
||||
print("[INFO] Telegram notification sent.")
|
||||
else:
|
||||
print("[WARN] Telegram notification failed or skipped.")
|
||||
|
||||
# Persist metrics for week-over-week tracking
|
||||
save_metrics({
|
||||
"week_ending": now_utc().date().isoformat(),
|
||||
"generated_at": report["generated_at"],
|
||||
"metrics": metrics,
|
||||
})
|
||||
|
||||
# Print summary to stdout
|
||||
print("\n" + "=" * 60)
|
||||
print(telegram_msg)
|
||||
print("=" * 60)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user