Compare commits

...

1 Commits

Author SHA1 Message Date
STEP35
5abcc308dd ci: add cron supervisor (timmy-config #513)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 20s
Smoke Test / smoke (pull_request) Failing after 15s
Validate Config / YAML Lint (pull_request) Failing after 10s
Validate Config / JSON Validate (pull_request) Successful in 12s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 37s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Shell Script Lint (pull_request) Failing after 42s
Validate Config / Cron Syntax Check (pull_request) Successful in 11s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 11s
Validate Config / Playbook Schema Validation (pull_request) Successful in 25s
Architecture Lint / Lint Repository (pull_request) Failing after 24s
PR Checklist / pr-checklist (pull_request) Successful in 3m55s
Autonomous health patrol for dev/timmy tmux sessions.
Monitors BUSY/READY/CRASHED/OVERFLOW states, alerts via Telegram
only on actionable events. Silent when all agents are working.

Closes #513
2026-04-25 22:33:08 -04:00

200
fleet/cron_supervisor.py Executable file
View File

@@ -0,0 +1,200 @@
#!/usr/bin/env python3
"""
Cron Supervisor — Autonomous tmux health patrol (timmy-config #513)
Runs every 5-10 minutes via cron. Checks both `dev` and `timmy` tmux sessions,
classifies each pane (BUSY/READY/CRASHED/DONE), and Telegram-alerts only on
actionable events: idle panes stuck >2 cycles, overflow warnings, or crashes.
Silent when all agents are actively working.
Design: Minimal, no dependencies (subprocess + stdlib). Caller must have tmux
access and ~/.config/gitea/token for Telegram alerts.
Usage:
python3 fleet/cron_supervisor.py
"""
import subprocess
import json
import os
import sys
import time
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Tuple
# ── Config ─────────────────────────────────────────────────────────────────────
SESSIONS = ["dev", "timmy"]
STATE_FILE = Path.home() / ".timmy" / "cron-supervisor-state.json"
TELEGRAM_TOKEN_FILE = Path.home() / ".config" / "gitea" / "token"
TELEGRAM_CHAT_ID = "-1003664764329"
ALERT_COOLDOWN = 1800
IDLE_CYCLE_LIMIT = 2
MAX_CONTEXT_PCT = 80
# ── Helpers ────────────────────────────────────────────────────────────────────
def run(cmd: str, timeout: int = 10) -> str:
try:
r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout)
return r.stdout.strip()
except Exception:
return ""
def load_state() -> Dict:
if STATE_FILE.exists():
try:
return json.loads(STATE_FILE.read_text())
except Exception:
pass
return {"panes": {}, "last_alert": {}}
def save_state(state: Dict):
STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
STATE_FILE.write_text(json.dumps(state, indent=2))
def send_telegram(message: str, tag: str) -> bool:
state = load_state()
now = time.time()
last = state.get("last_alert", {}).get(tag, 0)
if now - last < ALERT_COOLDOWN:
return False
token = TELEGRAM_TOKEN_FILE.read_text().strip() if TELEGRAM_TOKEN_FILE.exists() else ""
if not token:
return False
url = f"https://api.telegram.org/bot{token}/sendMessage"
payload = {"chat_id": TELEGRAM_CHAT_ID, "text": f"🚨 CronSupervisor \#{tag}\n{message}", "parse_mode": "Text"}
try:
req = urllib.request.Request(url, data=json.dumps(payload).encode(), headers={"Content-Type": "application/json"})
with urllib.request.urlopen(req, timeout=10) as resp:
result = json.loads(resp.read().decode())
if result.get("ok"):
state["last_alert"][tag] = now
save_state(state)
return True
except Exception:
pass
return False
def get_tmux_sessions() -> List[Dict]:
raw = run("tmux list-sessions -F '#{session_name}|#{session_windows}|#{session_created}'")
sessions = []
for line in raw.splitlines():
if not line.strip():
continue
parts = line.split("|")
if len(parts) >= 3:
sessions.append({"name": parts[0], "windows": int(parts[1]), "created": int(parts[2])})
return sessions
def get_tmux_panes(session_name: str) -> List[Dict]:
fmt = "#{session_name}|#{window_index}|#{pane_index}|#{pane_id}|#{pane_active}|#{pane_current_command}"
raw = run(f"tmux list-panes -t {session_name} -a -F '{fmt}'")
panes = []
for line in raw.splitlines():
if not line.strip():
continue
parts = line.split("|")
if len(parts) >= 6:
panes.append({
"session": parts[0], "window": int(parts[1]), "pane": int(parts[2]),
"pane_id": parts[3], "active": parts[4] == "1", "command": parts[5],
})
return panes
def capture_pane_last_lines(session: str, window: int, pane: int, lines: int = 40) -> str:
return run(f"tmux capture-pane -t {session}:{window}.{pane} -p -S -{lines} 2>/dev/null")
def classify_pane(pane: Dict, output: str) -> Tuple[str, str]:
lines = output.strip().split("\n")
last_line = lines[-1] if lines else ""
busy_indicators = ["", "", "", "processing", "running"]
if any(ind in last_line for ind in busy_indicators):
return ("BUSY", "timer/spinner active")
if not any(last_line.strip().endswith(s) for s in [">", ">>>", "$", "#"]):
if any(err in last_line.lower() for err in ["error:", "traceback", "failed", "exception"]):
return ("CRASHED", last_line[-100:])
return ("BUSY", "no prompt visible")
if any(last_line.strip().endswith(s) for s in [">", ">>>"]):
ctx_indicators = [l for l in lines[-10:] if any(m in l for m in ["context", "tokens", "%", "[", "]"])]
overflow = False
for l in ctx_indicators:
if "%" in l and any(c.isdigit() for c in l):
import re
m = re.search(r'(\d+)%', l)
if m and int(m.group(1)) >= MAX_CONTEXT_PCT:
overflow = True
break
if overflow:
return ("OVERFLOW", f"context >= {MAX_CONTEXT_PCT}%")
return ("READY", "prompt visible, context OK")
if any(keyword in output.lower() for keyword in ["error:", "traceback", "segfault", "connection refused"]):
return ("CRASHED", "error trace detected")
done_markers = ["shipped", "pr opened", "issue filed", "complete", "finished", "", "DONE"]
if any(m in output.lower() for m in done_markers):
return ("DONE", "completion marker found")
return ("UNKNOWN", "no clear state")
def main():
state = load_state()
pane_state = state.get("panes", {})
active_sessions = [s["name"] for s in get_tmux_sessions() if s["name"] in SESSIONS]
if not active_sessions:
print("❌ No target sessions found (dev/timmy).")
send_telegram("No dev/timmy sessions exist on this machine", "no_sessions")
sys.exit(1)
alerts_sent = 0
panes_checked = 0
actionable_events = []
for session in SESSIONS:
panes = get_tmux_panes(session)
if not panes:
continue
for pane in panes:
pane_key = f"{session}:{pane['window']}.{pane['pane']}"
panes_checked += 1
output = capture_pane_last_lines(session, pane["window"], pane["pane"])
status, reason = classify_pane(pane, output)
prev = pane_state.get(pane_key, {"status": None, "idle_cycles": 0})
idle_cycles = prev["idle_cycles"] + 1 if status in ("READY", "IDLE") else 0
pane_state[pane_key] = {"status": status, "idle_cycles": idle_cycles, "last_check": time.time()}
if idle_cycles >= IDLE_CYCLE_LIMIT and status in ("READY", "IDLE"):
tag = f"idle_{pane_key.replace(':', '_').replace('.', '_')}"
last_tail = output.splitlines()[-1][:120] if output else '(empty)'
msg = f"Pane {pane_key} idle for {idle_cycles} cycles — {reason}\nLast output: {last_tail}"
if send_telegram(msg, tag):
actionable_events.append(f"IDLE → {pane_key}")
alerts_sent += 1
if status == "OVERFLOW":
tag = f"overflow_{pane_key.replace(':', '_').replace('.', '_')}"
last_tail = output.splitlines()[-1][:120] if output else '(empty)'
msg = f"Pane {pane_key} context overflow detected — {reason}\nLast output: {last_tail}\nAction: Summarize and wrap up."
if send_telegram(msg, tag):
actionable_events.append(f"OVERFLOW → {pane_key}")
alerts_sent += 1
if status == "CRASHED":
tag = f"crashed_{pane_key.replace(':', '_').replace('.', '_')}"
tail_lines = output.splitlines()[-3:] if len(output.splitlines()) >= 3 else [output[-200:]]
msg = f"Pane {pane_key} CRASHED — {reason}\nLast output: {'|'.join(tail_lines)}"
if send_telegram(msg, tag):
actionable_events.append(f"CRASHED → {pane_key}")
alerts_sent += 1
state["panes"] = pane_state
save_state(state)
print(f"[{datetime.now().isoformat()}] Supervisor: {panes_checked} panes checked across {active_sessions}")
if actionable_events:
print(f" Actionable ({alerts_sent} alerts): {', '.join(actionable_events)}")
else:
print(" All clear — no actionable events.")
return 0
if __name__ == "__main__":
try:
sys.exit(main())
except Exception as e:
print(f"❌ Supervisor crashed: {e}")
send_telegram(f"CronSupervisor internal error: {e}", "internal_error")
sys.exit(1)