Compare commits
1 Commits
step35/443
...
step35/513
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5abcc308dd |
200
fleet/cron_supervisor.py
Executable file
200
fleet/cron_supervisor.py
Executable file
@@ -0,0 +1,200 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Cron Supervisor — Autonomous tmux health patrol (timmy-config #513)
|
||||
|
||||
Runs every 5-10 minutes via cron. Checks both `dev` and `timmy` tmux sessions,
|
||||
classifies each pane (BUSY/READY/CRASHED/DONE), and Telegram-alerts only on
|
||||
actionable events: idle panes stuck >2 cycles, overflow warnings, or crashes.
|
||||
|
||||
Silent when all agents are actively working.
|
||||
|
||||
Design: Minimal, no dependencies (subprocess + stdlib). Caller must have tmux
|
||||
access and ~/.config/gitea/token for Telegram alerts.
|
||||
|
||||
Usage:
|
||||
python3 fleet/cron_supervisor.py
|
||||
"""
|
||||
|
||||
import subprocess
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import urllib.request
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
# ── Config ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
SESSIONS = ["dev", "timmy"]
|
||||
STATE_FILE = Path.home() / ".timmy" / "cron-supervisor-state.json"
|
||||
TELEGRAM_TOKEN_FILE = Path.home() / ".config" / "gitea" / "token"
|
||||
TELEGRAM_CHAT_ID = "-1003664764329"
|
||||
ALERT_COOLDOWN = 1800
|
||||
IDLE_CYCLE_LIMIT = 2
|
||||
MAX_CONTEXT_PCT = 80
|
||||
|
||||
# ── Helpers ────────────────────────────────────────────────────────────────────
|
||||
|
||||
def run(cmd: str, timeout: int = 10) -> str:
|
||||
try:
|
||||
r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout)
|
||||
return r.stdout.strip()
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
def load_state() -> Dict:
|
||||
if STATE_FILE.exists():
|
||||
try:
|
||||
return json.loads(STATE_FILE.read_text())
|
||||
except Exception:
|
||||
pass
|
||||
return {"panes": {}, "last_alert": {}}
|
||||
|
||||
def save_state(state: Dict):
|
||||
STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||
STATE_FILE.write_text(json.dumps(state, indent=2))
|
||||
|
||||
def send_telegram(message: str, tag: str) -> bool:
|
||||
state = load_state()
|
||||
now = time.time()
|
||||
last = state.get("last_alert", {}).get(tag, 0)
|
||||
if now - last < ALERT_COOLDOWN:
|
||||
return False
|
||||
token = TELEGRAM_TOKEN_FILE.read_text().strip() if TELEGRAM_TOKEN_FILE.exists() else ""
|
||||
if not token:
|
||||
return False
|
||||
url = f"https://api.telegram.org/bot{token}/sendMessage"
|
||||
payload = {"chat_id": TELEGRAM_CHAT_ID, "text": f"🚨 CronSupervisor \#{tag}\n{message}", "parse_mode": "Text"}
|
||||
try:
|
||||
req = urllib.request.Request(url, data=json.dumps(payload).encode(), headers={"Content-Type": "application/json"})
|
||||
with urllib.request.urlopen(req, timeout=10) as resp:
|
||||
result = json.loads(resp.read().decode())
|
||||
if result.get("ok"):
|
||||
state["last_alert"][tag] = now
|
||||
save_state(state)
|
||||
return True
|
||||
except Exception:
|
||||
pass
|
||||
return False
|
||||
|
||||
def get_tmux_sessions() -> List[Dict]:
|
||||
raw = run("tmux list-sessions -F '#{session_name}|#{session_windows}|#{session_created}'")
|
||||
sessions = []
|
||||
for line in raw.splitlines():
|
||||
if not line.strip():
|
||||
continue
|
||||
parts = line.split("|")
|
||||
if len(parts) >= 3:
|
||||
sessions.append({"name": parts[0], "windows": int(parts[1]), "created": int(parts[2])})
|
||||
return sessions
|
||||
|
||||
def get_tmux_panes(session_name: str) -> List[Dict]:
|
||||
fmt = "#{session_name}|#{window_index}|#{pane_index}|#{pane_id}|#{pane_active}|#{pane_current_command}"
|
||||
raw = run(f"tmux list-panes -t {session_name} -a -F '{fmt}'")
|
||||
panes = []
|
||||
for line in raw.splitlines():
|
||||
if not line.strip():
|
||||
continue
|
||||
parts = line.split("|")
|
||||
if len(parts) >= 6:
|
||||
panes.append({
|
||||
"session": parts[0], "window": int(parts[1]), "pane": int(parts[2]),
|
||||
"pane_id": parts[3], "active": parts[4] == "1", "command": parts[5],
|
||||
})
|
||||
return panes
|
||||
|
||||
def capture_pane_last_lines(session: str, window: int, pane: int, lines: int = 40) -> str:
|
||||
return run(f"tmux capture-pane -t {session}:{window}.{pane} -p -S -{lines} 2>/dev/null")
|
||||
|
||||
def classify_pane(pane: Dict, output: str) -> Tuple[str, str]:
|
||||
lines = output.strip().split("\n")
|
||||
last_line = lines[-1] if lines else ""
|
||||
busy_indicators = ["⏱", "⟳", "⏳", "processing", "running"]
|
||||
if any(ind in last_line for ind in busy_indicators):
|
||||
return ("BUSY", "timer/spinner active")
|
||||
if not any(last_line.strip().endswith(s) for s in [">", ">>>", "$", "#"]):
|
||||
if any(err in last_line.lower() for err in ["error:", "traceback", "failed", "exception"]):
|
||||
return ("CRASHED", last_line[-100:])
|
||||
return ("BUSY", "no prompt visible")
|
||||
if any(last_line.strip().endswith(s) for s in [">", ">>>"]):
|
||||
ctx_indicators = [l for l in lines[-10:] if any(m in l for m in ["context", "tokens", "%", "[", "]"])]
|
||||
overflow = False
|
||||
for l in ctx_indicators:
|
||||
if "%" in l and any(c.isdigit() for c in l):
|
||||
import re
|
||||
m = re.search(r'(\d+)%', l)
|
||||
if m and int(m.group(1)) >= MAX_CONTEXT_PCT:
|
||||
overflow = True
|
||||
break
|
||||
if overflow:
|
||||
return ("OVERFLOW", f"context >= {MAX_CONTEXT_PCT}%")
|
||||
return ("READY", "prompt visible, context OK")
|
||||
if any(keyword in output.lower() for keyword in ["error:", "traceback", "segfault", "connection refused"]):
|
||||
return ("CRASHED", "error trace detected")
|
||||
done_markers = ["shipped", "pr opened", "issue filed", "complete", "finished", "✅", "DONE"]
|
||||
if any(m in output.lower() for m in done_markers):
|
||||
return ("DONE", "completion marker found")
|
||||
return ("UNKNOWN", "no clear state")
|
||||
|
||||
def main():
|
||||
state = load_state()
|
||||
pane_state = state.get("panes", {})
|
||||
active_sessions = [s["name"] for s in get_tmux_sessions() if s["name"] in SESSIONS]
|
||||
if not active_sessions:
|
||||
print("❌ No target sessions found (dev/timmy).")
|
||||
send_telegram("No dev/timmy sessions exist on this machine", "no_sessions")
|
||||
sys.exit(1)
|
||||
alerts_sent = 0
|
||||
panes_checked = 0
|
||||
actionable_events = []
|
||||
for session in SESSIONS:
|
||||
panes = get_tmux_panes(session)
|
||||
if not panes:
|
||||
continue
|
||||
for pane in panes:
|
||||
pane_key = f"{session}:{pane['window']}.{pane['pane']}"
|
||||
panes_checked += 1
|
||||
output = capture_pane_last_lines(session, pane["window"], pane["pane"])
|
||||
status, reason = classify_pane(pane, output)
|
||||
prev = pane_state.get(pane_key, {"status": None, "idle_cycles": 0})
|
||||
idle_cycles = prev["idle_cycles"] + 1 if status in ("READY", "IDLE") else 0
|
||||
pane_state[pane_key] = {"status": status, "idle_cycles": idle_cycles, "last_check": time.time()}
|
||||
if idle_cycles >= IDLE_CYCLE_LIMIT and status in ("READY", "IDLE"):
|
||||
tag = f"idle_{pane_key.replace(':', '_').replace('.', '_')}"
|
||||
last_tail = output.splitlines()[-1][:120] if output else '(empty)'
|
||||
msg = f"Pane {pane_key} idle for {idle_cycles} cycles — {reason}\nLast output: {last_tail}"
|
||||
if send_telegram(msg, tag):
|
||||
actionable_events.append(f"IDLE → {pane_key}")
|
||||
alerts_sent += 1
|
||||
if status == "OVERFLOW":
|
||||
tag = f"overflow_{pane_key.replace(':', '_').replace('.', '_')}"
|
||||
last_tail = output.splitlines()[-1][:120] if output else '(empty)'
|
||||
msg = f"Pane {pane_key} context overflow detected — {reason}\nLast output: {last_tail}\nAction: Summarize and wrap up."
|
||||
if send_telegram(msg, tag):
|
||||
actionable_events.append(f"OVERFLOW → {pane_key}")
|
||||
alerts_sent += 1
|
||||
if status == "CRASHED":
|
||||
tag = f"crashed_{pane_key.replace(':', '_').replace('.', '_')}"
|
||||
tail_lines = output.splitlines()[-3:] if len(output.splitlines()) >= 3 else [output[-200:]]
|
||||
msg = f"Pane {pane_key} CRASHED — {reason}\nLast output: {'|'.join(tail_lines)}"
|
||||
if send_telegram(msg, tag):
|
||||
actionable_events.append(f"CRASHED → {pane_key}")
|
||||
alerts_sent += 1
|
||||
state["panes"] = pane_state
|
||||
save_state(state)
|
||||
print(f"[{datetime.now().isoformat()}] Supervisor: {panes_checked} panes checked across {active_sessions}")
|
||||
if actionable_events:
|
||||
print(f" Actionable ({alerts_sent} alerts): {', '.join(actionable_events)}")
|
||||
else:
|
||||
print(" All clear — no actionable events.")
|
||||
return 0
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
sys.exit(main())
|
||||
except Exception as e:
|
||||
print(f"❌ Supervisor crashed: {e}")
|
||||
send_telegram(f"CronSupervisor internal error: {e}", "internal_error")
|
||||
sys.exit(1)
|
||||
Reference in New Issue
Block a user