[MUDA] Issue #350 — Weekly fleet waste audit #351
31
.gitea/workflows/muda-audit.yml
Normal file
31
.gitea/workflows/muda-audit.yml
Normal file
@@ -0,0 +1,31 @@
|
||||
name: MUDA Weekly Waste Audit
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: "0 21 * * 0" # Sunday at 21:00 UTC
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
muda-audit:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout repo
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.11"
|
||||
|
||||
- name: Run MUDA audit
|
||||
env:
|
||||
GITEA_URL: "https://forge.alexanderwhitestone.com"
|
||||
run: |
|
||||
chmod +x bin/muda-audit.sh
|
||||
./bin/muda-audit.sh
|
||||
|
||||
- name: Upload audit report
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: muda-audit-report
|
||||
path: reports/muda-audit-*.json
|
||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -8,3 +8,6 @@
|
||||
*.db-wal
|
||||
*.db-shm
|
||||
__pycache__/
|
||||
|
||||
# Generated audit reports
|
||||
reports/
|
||||
|
||||
20
bin/muda-audit.sh
Executable file
20
bin/muda-audit.sh
Executable file
@@ -0,0 +1,20 @@
|
||||
#!/usr/bin/env bash
|
||||
# muda-audit.sh — Weekly waste audit wrapper
|
||||
# Runs scripts/muda_audit.py from the repo root.
|
||||
# Designed for cron or Gitea Actions.
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
REPO_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)"
|
||||
|
||||
cd "$REPO_ROOT"
|
||||
|
||||
# Ensure python3 is available
|
||||
if ! command -v python3 >/dev/null 2>&1; then
|
||||
echo "ERROR: python3 not found" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Run the audit
|
||||
python3 "${REPO_ROOT}/scripts/muda_audit.py" "$@"
|
||||
@@ -143,6 +143,10 @@ class PullRequest:
|
||||
mergeable: bool = False
|
||||
merged: bool = False
|
||||
changed_files: int = 0
|
||||
additions: int = 0
|
||||
deletions: int = 0
|
||||
created_at: str = ""
|
||||
closed_at: str = ""
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, d: dict) -> "PullRequest":
|
||||
@@ -159,6 +163,10 @@ class PullRequest:
|
||||
mergeable=d.get("mergeable", False),
|
||||
merged=d.get("merged", False) or False,
|
||||
changed_files=d.get("changed_files", 0),
|
||||
additions=d.get("additions", 0),
|
||||
deletions=d.get("deletions", 0),
|
||||
created_at=d.get("created_at", ""),
|
||||
closed_at=d.get("closed_at", ""),
|
||||
)
|
||||
|
||||
|
||||
@@ -290,9 +298,9 @@ class GiteaClient:
|
||||
|
||||
# -- Repos ---------------------------------------------------------------
|
||||
|
||||
def list_org_repos(self, org: str, limit: int = 50) -> list[dict]:
|
||||
def list_org_repos(self, org: str, limit: int = 50, page: int = 1) -> list[dict]:
|
||||
"""List repos in an organization."""
|
||||
return self._get(f"/orgs/{org}/repos", limit=limit)
|
||||
return self._get(f"/orgs/{org}/repos", limit=limit, page=page)
|
||||
|
||||
# -- Issues --------------------------------------------------------------
|
||||
|
||||
|
||||
610
scripts/muda_audit.py
Executable file
610
scripts/muda_audit.py
Executable file
@@ -0,0 +1,610 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
muda_audit.py — Weekly waste audit for the Timmy Foundation fleet.
|
||||
|
||||
Measures 7 wastes (Muda) across Gitea repos and agent logs:
|
||||
1. Overproduction — issues created vs closed (ratio > 1.0 = waste)
|
||||
2. Waiting — rate-limit hits from agent logs
|
||||
3. Transport — issues closed with redirect keywords
|
||||
4. Overprocessing — PR diff size outliers (>500 lines)
|
||||
5. Inventory — open issues stale >30 days
|
||||
6. Motion — git clone/rebase churn from logs
|
||||
7. Defects — PRs closed without merge vs merged
|
||||
|
||||
Outputs JSON report, persists week-over-week metrics, and optionally posts to Telegram.
|
||||
Part of Epic #345, Issue #350.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import urllib.request
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
|
||||
# Add repo root to path so we can import gitea_client
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent
|
||||
sys.path.insert(0, str(REPO_ROOT))
|
||||
|
||||
from gitea_client import GiteaClient, GiteaError
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Configuration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
ORG = "Timmy_Foundation"
|
||||
REPOS = [
|
||||
"the-nexus",
|
||||
".profile",
|
||||
"timmy-config",
|
||||
"timmy-home",
|
||||
"the-door",
|
||||
"turboquant",
|
||||
"hermes-agent",
|
||||
"timmy-academy",
|
||||
"wolf",
|
||||
"the-testament",
|
||||
"the-beacon",
|
||||
]
|
||||
|
||||
AGENT_LOG_PATHS = [
|
||||
"/root/wizards/*/home/logs/*.log",
|
||||
"/root/wizards/*/logs/*.log",
|
||||
"/root/wizards/*/.hermes/logs/*.log",
|
||||
]
|
||||
|
||||
REDIRECT_KEYWORDS = [
|
||||
"moved to", "belongs in", "redirected to", "closing in favor of",
|
||||
"wrong repo", "should be in", "transfer to", "repost to",
|
||||
]
|
||||
|
||||
TELEGRAM_CHAT = "-1003664764329"
|
||||
TELEGRAM_TOKEN_PATHS = [
|
||||
Path.home() / ".config" / "telegram" / "special_bot",
|
||||
Path.home() / ".hermes" / "telegram_bot_token",
|
||||
]
|
||||
|
||||
METRICS_DIR = Path.home() / ".local" / "timmy" / "muda-audit"
|
||||
METRICS_FILE = METRICS_DIR / "metrics.json"
|
||||
|
||||
DAYS_BACK = 7
|
||||
STALE_DAYS = 30
|
||||
OVERPROCESSING_THRESHOLD = 500
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def now_utc() -> datetime:
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
def parse_iso(ts: str) -> datetime:
|
||||
if ts.endswith("Z"):
|
||||
ts = ts[:-1] + "+00:00"
|
||||
return datetime.fromisoformat(ts)
|
||||
|
||||
|
||||
def within_days(ts: str, days: int) -> bool:
|
||||
try:
|
||||
return (now_utc() - parse_iso(ts)) <= timedelta(days=days)
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def older_than_days(ts: str, days: int) -> bool:
|
||||
try:
|
||||
return (now_utc() - parse_iso(ts)) >= timedelta(days=days)
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def paginate_issues(client: GiteaClient, repo: str, state: str, limit_per_page: int = 50, max_pages: int = 4):
|
||||
"""Yield all issues across pages."""
|
||||
full_repo = f"{ORG}/{repo}"
|
||||
for page in range(1, max_pages + 1):
|
||||
batch = client.list_issues(full_repo, state=state, limit=limit_per_page, page=page, sort="created", direction="desc")
|
||||
if not batch:
|
||||
break
|
||||
for issue in batch:
|
||||
yield issue
|
||||
if len(batch) < limit_per_page:
|
||||
break
|
||||
|
||||
|
||||
def paginate_prs(client: GiteaClient, repo: str, state: str, limit_per_page: int = 50, max_pages: int = 3):
|
||||
"""Yield all PRs across pages."""
|
||||
full_repo = f"{ORG}/{repo}"
|
||||
for page in range(1, max_pages + 1):
|
||||
batch = client.list_pulls(full_repo, state=state, limit=limit_per_page, page=page, sort="newest")
|
||||
if not batch:
|
||||
break
|
||||
for pr in batch:
|
||||
yield pr
|
||||
if len(batch) < limit_per_page:
|
||||
break
|
||||
|
||||
|
||||
def read_telegram_token() -> str | None:
|
||||
for path in TELEGRAM_TOKEN_PATHS:
|
||||
if path.exists():
|
||||
return path.read_text().strip()
|
||||
return os.environ.get("TELEGRAM_BOT_TOKEN") or None
|
||||
|
||||
|
||||
def send_telegram(message: str) -> bool:
|
||||
token = read_telegram_token()
|
||||
if not token:
|
||||
print("[WARN] No Telegram token found; skipping notification.")
|
||||
return False
|
||||
url = f"https://api.telegram.org/bot{token}/sendMessage"
|
||||
payload = json.dumps({
|
||||
"chat_id": TELEGRAM_CHAT,
|
||||
"text": message,
|
||||
"parse_mode": "Markdown",
|
||||
"disable_web_page_preview": True,
|
||||
}).encode()
|
||||
req = urllib.request.Request(url, data=payload, method="POST", headers={"Content-Type": "application/json"})
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||||
return resp.status == 200
|
||||
except Exception as e:
|
||||
print(f"[WARN] Telegram send failed: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def find_log_files() -> list[Path]:
|
||||
files = []
|
||||
for pattern in AGENT_LOG_PATHS:
|
||||
for p in glob.glob(pattern):
|
||||
path = Path(p)
|
||||
try:
|
||||
if path.stat().st_size > 0:
|
||||
files.append(path)
|
||||
except OSError:
|
||||
pass
|
||||
return files
|
||||
|
||||
|
||||
def grep_logs(pattern: str, files: list[Path]) -> dict[str, int]:
|
||||
"""Return count of matches per agent (derived from path)."""
|
||||
counts: dict[str, int] = defaultdict(int)
|
||||
for f in files:
|
||||
parts = f.parts
|
||||
try:
|
||||
idx = parts.index("wizards")
|
||||
agent = parts[idx + 1]
|
||||
except (ValueError, IndexError):
|
||||
agent = "unknown"
|
||||
try:
|
||||
with open(f, "r", errors="ignore") as fh:
|
||||
for line in fh:
|
||||
if pattern in line.lower():
|
||||
counts[agent] += 1
|
||||
except Exception:
|
||||
pass
|
||||
return dict(counts)
|
||||
|
||||
|
||||
def summarize_counts(counts: dict[str, int]) -> str:
|
||||
if not counts:
|
||||
return "none detected"
|
||||
items = sorted(counts.items(), key=lambda x: -x[1])
|
||||
return ", ".join(f"{k}: {v}" for k, v in items[:5])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Week-over-week persistence
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def load_previous_metrics() -> dict | None:
|
||||
if not METRICS_FILE.exists():
|
||||
return None
|
||||
try:
|
||||
history = json.loads(METRICS_FILE.read_text())
|
||||
if history and isinstance(history, list):
|
||||
return history[-1]
|
||||
except (json.JSONDecodeError, OSError):
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def save_metrics(record: dict) -> None:
|
||||
METRICS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
history: list[dict] = []
|
||||
if METRICS_FILE.exists():
|
||||
try:
|
||||
history = json.loads(METRICS_FILE.read_text())
|
||||
if not isinstance(history, list):
|
||||
history = []
|
||||
except (json.JSONDecodeError, OSError):
|
||||
history = []
|
||||
history.append(record)
|
||||
history = history[-52:] # keep one year of weekly reports
|
||||
METRICS_FILE.write_text(json.dumps(history, indent=2))
|
||||
|
||||
|
||||
def trend_arrow(current: float, previous: float) -> str:
|
||||
if previous == 0:
|
||||
return ""
|
||||
if current < previous:
|
||||
return " ↓"
|
||||
if current > previous:
|
||||
return " ↑"
|
||||
return " →"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Waste metrics
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def measure_overproduction(client: GiteaClient) -> dict:
|
||||
created = 0
|
||||
closed = 0
|
||||
created_by_repo: dict[str, int] = defaultdict(int)
|
||||
closed_by_repo: dict[str, int] = defaultdict(int)
|
||||
|
||||
for repo in REPOS:
|
||||
try:
|
||||
for issue in paginate_issues(client, repo, state="all", max_pages=3):
|
||||
if within_days(issue.created_at, DAYS_BACK):
|
||||
created += 1
|
||||
created_by_repo[repo] += 1
|
||||
if issue.state == "closed" and within_days(issue.updated_at, DAYS_BACK):
|
||||
closed += 1
|
||||
closed_by_repo[repo] += 1
|
||||
except GiteaError as e:
|
||||
print(f"[WARN] Overproduction fetch failed for {repo}: {e}")
|
||||
|
||||
ratio = round(created / closed, 2) if closed > 0 else (created if created > 0 else 0.0)
|
||||
return {
|
||||
"waste": "Overproduction",
|
||||
"created": created,
|
||||
"closed": closed,
|
||||
"ratio": ratio,
|
||||
"top_repo": max(created_by_repo, key=created_by_repo.get) if created_by_repo else None,
|
||||
"healthy": ratio <= 1.0,
|
||||
}
|
||||
|
||||
|
||||
def measure_waiting(_client: GiteaClient) -> dict:
|
||||
files = find_log_files()
|
||||
patterns = ["rate limit", "ratelimit", "429", "too many requests"]
|
||||
total_by_agent: dict[str, int] = defaultdict(int)
|
||||
for pat in patterns:
|
||||
counts = grep_logs(pat, files)
|
||||
for agent, cnt in counts.items():
|
||||
total_by_agent[agent] += cnt
|
||||
|
||||
total_hits = sum(total_by_agent.values())
|
||||
return {
|
||||
"waste": "Waiting",
|
||||
"rate_limit_hits": dict(total_by_agent),
|
||||
"total_hits": total_hits,
|
||||
"log_files_scanned": len(files),
|
||||
"healthy": total_hits == 0,
|
||||
}
|
||||
|
||||
|
||||
def measure_transport(client: GiteaClient) -> dict:
|
||||
redirected = 0
|
||||
examples: list[str] = []
|
||||
for repo in REPOS:
|
||||
checked = 0
|
||||
try:
|
||||
for issue in paginate_issues(client, repo, state="closed", max_pages=2):
|
||||
if not within_days(issue.updated_at, DAYS_BACK):
|
||||
continue
|
||||
checked += 1
|
||||
if checked > 20:
|
||||
break
|
||||
text = (issue.body or "").lower()
|
||||
if any(kw in text for kw in REDIRECT_KEYWORDS):
|
||||
redirected += 1
|
||||
examples.append(f"{repo}#{issue.number}")
|
||||
continue
|
||||
try:
|
||||
comments = client.list_comments(f"{ORG}/{repo}", issue.number)
|
||||
for c in comments:
|
||||
if any(kw in (c.body or "").lower() for kw in REDIRECT_KEYWORDS):
|
||||
redirected += 1
|
||||
examples.append(f"{repo}#{issue.number}")
|
||||
break
|
||||
except GiteaError:
|
||||
pass
|
||||
except GiteaError as e:
|
||||
print(f"[WARN] Transport fetch failed for {repo}: {e}")
|
||||
|
||||
return {
|
||||
"waste": "Transport",
|
||||
"redirected_issues": redirected,
|
||||
"examples": examples[:5],
|
||||
"healthy": redirected == 0,
|
||||
}
|
||||
|
||||
|
||||
def measure_overprocessing(client: GiteaClient) -> dict:
|
||||
pr_details: list[dict] = []
|
||||
flagged: list[str] = []
|
||||
total_lines = 0
|
||||
|
||||
for repo in REPOS:
|
||||
try:
|
||||
scanned = 0
|
||||
for pr in paginate_prs(client, repo, state="all", max_pages=2):
|
||||
if not within_days(pr.created_at or "", DAYS_BACK):
|
||||
continue
|
||||
scanned += 1
|
||||
if scanned > 10:
|
||||
break
|
||||
full_repo = f"{ORG}/{repo}"
|
||||
try:
|
||||
files = client.get_pull_files(full_repo, pr.number)
|
||||
except GiteaError:
|
||||
files = []
|
||||
lines = sum(f.additions + f.deletions for f in files)
|
||||
total_lines += lines
|
||||
pr_details.append({
|
||||
"repo": repo,
|
||||
"pr": pr.number,
|
||||
"title": pr.title,
|
||||
"lines": lines,
|
||||
})
|
||||
is_epic = "epic" in (pr.title or "").lower()
|
||||
if lines > OVERPROCESSING_THRESHOLD and not is_epic:
|
||||
flagged.append(f"{repo}#{pr.number} ({lines} lines)")
|
||||
except GiteaError as e:
|
||||
print(f"[WARN] Overprocessing fetch failed for {repo}: {e}")
|
||||
|
||||
avg_lines = round(total_lines / len(pr_details), 1) if pr_details else 0.0
|
||||
return {
|
||||
"waste": "Overprocessing",
|
||||
"prs_scanned": len(pr_details),
|
||||
"avg_lines_changed": avg_lines,
|
||||
"flagged_outliers": flagged,
|
||||
"healthy": len(flagged) == 0,
|
||||
}
|
||||
|
||||
|
||||
def measure_inventory(client: GiteaClient) -> dict:
|
||||
stale = 0
|
||||
by_repo: dict[str, int] = defaultdict(int)
|
||||
for repo in REPOS:
|
||||
try:
|
||||
for issue in paginate_issues(client, repo, state="open", max_pages=4):
|
||||
if older_than_days(issue.updated_at, STALE_DAYS):
|
||||
stale += 1
|
||||
by_repo[repo] += 1
|
||||
except GiteaError as e:
|
||||
print(f"[WARN] Inventory fetch failed for {repo}: {e}")
|
||||
|
||||
top_repo = max(by_repo, key=by_repo.get) if by_repo else None
|
||||
return {
|
||||
"waste": "Inventory",
|
||||
"stale_issues": stale,
|
||||
"by_repo": dict(by_repo),
|
||||
"top_repo": top_repo,
|
||||
"healthy": stale == 0,
|
||||
}
|
||||
|
||||
|
||||
def measure_motion(_client: GiteaClient) -> dict:
|
||||
files = find_log_files()
|
||||
clone_counts = grep_logs("git clone", files)
|
||||
rebase_counts = grep_logs("git rebase", files)
|
||||
fetch_counts = grep_logs("git fetch", files)
|
||||
|
||||
total_motion = sum(clone_counts.values()) + sum(rebase_counts.values()) + sum(fetch_counts.values())
|
||||
|
||||
return {
|
||||
"waste": "Motion",
|
||||
"git_clones": clone_counts,
|
||||
"git_rebases": rebase_counts,
|
||||
"git_fetches": fetch_counts,
|
||||
"total_motion_events": total_motion,
|
||||
"log_files_scanned": len(files),
|
||||
"healthy": total_motion < 50,
|
||||
}
|
||||
|
||||
|
||||
def measure_defects(client: GiteaClient) -> dict:
|
||||
merged = 0
|
||||
closed_without_merge = 0
|
||||
for repo in REPOS:
|
||||
try:
|
||||
for pr in paginate_prs(client, repo, state="closed", max_pages=2):
|
||||
if not within_days(pr.created_at or "", DAYS_BACK):
|
||||
continue
|
||||
if pr.merged:
|
||||
merged += 1
|
||||
else:
|
||||
closed_without_merge += 1
|
||||
except GiteaError as e:
|
||||
print(f"[WARN] Defects fetch failed for {repo}: {e}")
|
||||
|
||||
total = merged + closed_without_merge
|
||||
close_rate = round(closed_without_merge / total, 2) if total > 0 else 0.0
|
||||
return {
|
||||
"waste": "Defects",
|
||||
"merged": merged,
|
||||
"closed_without_merge": closed_without_merge,
|
||||
"close_rate": close_rate,
|
||||
"healthy": close_rate < 0.25,
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Report generation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
SUGGESTIONS = {
|
||||
"Overproduction": "Pause issue-generation loops until backlog shrinks. Review auto-issue bots.",
|
||||
"Waiting": "Add exponential backoff to API clients. Reduce loop frequency for rate-limited agents.",
|
||||
"Transport": "Enforce repo-boundary check before issue creation. Close with redirect template.",
|
||||
"Overprocessing": "Scope tickets tighter. Flag >500-line PRs for pre-review split.",
|
||||
"Inventory": "Bulk-close or consolidate stale issues. Set 30-day auto-close for untouched items.",
|
||||
"Motion": "Cache workspace directories across issues. Limit clones to 1 per issue branch.",
|
||||
"Defects": "Require smoke tests before PR. Rebase before merge to reduce conflict closures.",
|
||||
}
|
||||
|
||||
|
||||
def compute_top_eliminations(metrics: list[dict]) -> list[str]:
|
||||
"""Pick the top 3 unhealthiest wastes and return concrete suggestions."""
|
||||
unhealthies = [m for m in metrics if not m.get("healthy", True)]
|
||||
# Sort by severity heuristic
|
||||
def severity(m: dict) -> float:
|
||||
if m["waste"] == "Overproduction":
|
||||
return m.get("ratio", 0)
|
||||
if m["waste"] == "Waiting":
|
||||
return m.get("total_hits", 0) / 10
|
||||
if m["waste"] == "Transport":
|
||||
return m.get("redirected_issues", 0)
|
||||
if m["waste"] == "Overprocessing":
|
||||
return len(m.get("flagged_outliers", []))
|
||||
if m["waste"] == "Inventory":
|
||||
return m.get("stale_issues", 0) / 10
|
||||
if m["waste"] == "Motion":
|
||||
return m.get("total_motion_events", 0) / 20
|
||||
if m["waste"] == "Defects":
|
||||
return m.get("close_rate", 0) * 10
|
||||
return 0.0
|
||||
|
||||
unhealthies.sort(key=severity, reverse=True)
|
||||
suggestions = []
|
||||
for m in unhealthies[:3]:
|
||||
suggestions.append(SUGGESTIONS.get(m["waste"], "Review and reduce."))
|
||||
if not suggestions:
|
||||
suggestions = [
|
||||
"No major waste detected this week. Maintain current guardrails.",
|
||||
"Continue monitoring agent loop logs for emerging rate-limit patterns.",
|
||||
"Keep PR diff sizes under review during weekly standup.",
|
||||
]
|
||||
return suggestions
|
||||
|
||||
|
||||
def build_report(metrics: list[dict]) -> dict:
|
||||
wastes = [m for m in metrics if not m.get("healthy", True)]
|
||||
report = {
|
||||
"report_type": "MUDA Weekly Waste Audit",
|
||||
"generated_at": now_utc().isoformat(),
|
||||
"period_days": DAYS_BACK,
|
||||
"metrics": metrics,
|
||||
"waste_count": len(wastes),
|
||||
"top_wastes": wastes,
|
||||
}
|
||||
return report
|
||||
|
||||
|
||||
def format_telegram(report: dict, prev: dict | None = None) -> str:
|
||||
lines = [
|
||||
f"*🗑 MUDA Audit — {report['generated_at'][:10]}*",
|
||||
f"Period: last {report['period_days']} days",
|
||||
"",
|
||||
]
|
||||
|
||||
prev_metrics = {m["waste"]: m for m in (prev.get("metrics", []) if prev else [])}
|
||||
|
||||
for m in report["metrics"]:
|
||||
emoji = "✅" if m.get("healthy") else "⚠️"
|
||||
name = m["waste"]
|
||||
pm = prev_metrics.get(name, {})
|
||||
|
||||
if name == "Overproduction":
|
||||
ratio_prev = pm.get("ratio", 0.0)
|
||||
arrow = trend_arrow(m["ratio"], ratio_prev)
|
||||
lines.append(f"{emoji} *Overproduction*: {m['created']} created / {m['closed']} closed = ratio {m['ratio']}{arrow}")
|
||||
elif name == "Waiting":
|
||||
hits_prev = pm.get("total_hits", 0)
|
||||
arrow = trend_arrow(m["total_hits"], hits_prev)
|
||||
lines.append(f"{emoji} *Waiting*: {m['total_hits']} rate-limit hits ({summarize_counts(m['rate_limit_hits'])}){arrow}")
|
||||
elif name == "Transport":
|
||||
trans_prev = pm.get("redirected_issues", 0)
|
||||
arrow = trend_arrow(m["redirected_issues"], trans_prev)
|
||||
lines.append(f"{emoji} *Transport*: {m['redirected_issues']} redirected issues{arrow}")
|
||||
elif name == "Overprocessing":
|
||||
avg_prev = pm.get("avg_lines_changed", 0.0)
|
||||
arrow = trend_arrow(m["avg_lines_changed"], avg_prev)
|
||||
lines.append(f"{emoji} *Overprocessing*: avg {m['avg_lines_changed']} lines/PR, {len(m['flagged_outliers'])} outliers{arrow}")
|
||||
elif name == "Inventory":
|
||||
inv_prev = pm.get("stale_issues", 0)
|
||||
arrow = trend_arrow(m["stale_issues"], inv_prev)
|
||||
lines.append(f"{emoji} *Inventory*: {m['stale_issues']} stale issues (>30d){arrow}")
|
||||
elif name == "Motion":
|
||||
motion_prev = pm.get("total_motion_events", 0)
|
||||
arrow = trend_arrow(m["total_motion_events"], motion_prev)
|
||||
lines.append(f"{emoji} *Motion*: {m['total_motion_events']} git ops ({summarize_counts(m['git_clones'])} clones){arrow}")
|
||||
elif name == "Defects":
|
||||
close_prev = pm.get("close_rate", 0.0)
|
||||
arrow = trend_arrow(m["close_rate"], close_prev)
|
||||
total_abandoned = m["closed_without_merge"] + m["merged"]
|
||||
lines.append(f"{emoji} *Defects*: {m['close_rate']*100:.0f}% closed without merge ({m['closed_without_merge']}/{total_abandoned}){arrow}")
|
||||
|
||||
lines.append("")
|
||||
eliminations = compute_top_eliminations(report["metrics"])
|
||||
lines.append("*Top 3 eliminations:*")
|
||||
for i, suggestion in enumerate(eliminations, 1):
|
||||
lines.append(f"{i}. {suggestion}")
|
||||
|
||||
lines.append("")
|
||||
lines.append("_Week over week: waste metrics should decrease. If an arrow points up, investigate._")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def main():
|
||||
client = GiteaClient()
|
||||
if not client.ping():
|
||||
print("[ERROR] Gitea is unreachable. Aborting audit.")
|
||||
sys.exit(1)
|
||||
|
||||
print("[INFO] Starting MUDA waste audit...")
|
||||
metrics = [
|
||||
measure_overproduction(client),
|
||||
measure_waiting(client),
|
||||
measure_transport(client),
|
||||
measure_overprocessing(client),
|
||||
measure_inventory(client),
|
||||
measure_motion(client),
|
||||
measure_defects(client),
|
||||
]
|
||||
|
||||
report = build_report(metrics)
|
||||
prev = load_previous_metrics()
|
||||
|
||||
# Write JSON report
|
||||
reports_dir = REPO_ROOT / "reports"
|
||||
reports_dir.mkdir(exist_ok=True)
|
||||
json_path = reports_dir / f"muda-audit-{now_utc().strftime('%Y%m%d')}.json"
|
||||
json_path.write_text(json.dumps(report, indent=2))
|
||||
print(f"[INFO] Report written to {json_path}")
|
||||
|
||||
# Send Telegram
|
||||
telegram_msg = format_telegram(report, prev)
|
||||
if send_telegram(telegram_msg):
|
||||
print("[INFO] Telegram notification sent.")
|
||||
else:
|
||||
print("[WARN] Telegram notification failed or skipped.")
|
||||
|
||||
# Persist metrics for week-over-week tracking
|
||||
save_metrics({
|
||||
"week_ending": now_utc().date().isoformat(),
|
||||
"generated_at": report["generated_at"],
|
||||
"metrics": metrics,
|
||||
})
|
||||
|
||||
# Print summary to stdout
|
||||
print("\n" + "=" * 60)
|
||||
print(telegram_msg)
|
||||
print("=" * 60)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user