From 508441acb48b434f432ee0423c330c589d98e404 Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Wed, 22 Apr 2026 03:34:36 -0400 Subject: [PATCH] feat(#407): Phase progression tracker with auto-eval, Telegram daily post, and blockers --- scripts/phase_tracker.py | 394 ++++++++++++++++++++++++++++++++------- 1 file changed, 324 insertions(+), 70 deletions(-) diff --git a/scripts/phase_tracker.py b/scripts/phase_tracker.py index be6229a8..89f9da04 100644 --- a/scripts/phase_tracker.py +++ b/scripts/phase_tracker.py @@ -4,111 +4,365 @@ Part of the Gemini Sovereign Infrastructure Suite. Tracks the fleet's progress through the Paperclips-inspired evolution arc. + +Usage: + python3 scripts/phase_tracker.py status # Show current state + python3 scripts/phase_tracker.py evaluate # Auto-evaluate checkable milestones + python3 scripts/phase_tracker.py complete M4 # Mark milestone complete + python3 scripts/phase_tracker.py telegram # Post daily update to Telegram + python3 scripts/phase_tracker.py daily # evaluate + telegram """ import os import sys import json +import re import argparse +import urllib.request +import subprocess +from pathlib import Path +from datetime import datetime, timezone, timedelta MILESTONES_FILE = "fleet/milestones.md" COMPLETED_FILE = "fleet/completed_milestones.json" +LOG_DIR = Path(os.path.expanduser("~/.local/timmy/fleet-health")) +UPTIME_FILE = LOG_DIR / "uptime.json" + +TELEGRAM_TOKEN_PATHS = [ + Path.home() / ".config" / "timmy" / "telegram_bot_token", + Path.home() / ".hermes" / "telegram_bot_token", + Path.home() / ".hermes" / "telegram_token", +] +TELEGRAM_CHAT = os.environ.get("TELEGRAM_HOME_CHANNEL", "-1003664764329") + +HOSTS = { + "ezra": {"ip": "143.198.27.163"}, + "allegro": {"ip": "167.99.126.228"}, + "bezalel": {"ip": "159.203.146.185"}, +} + + +def _find_repo_root() -> Path: + script_dir = Path(__file__).resolve().parent + return script_dir.parent + + +def _read_token() -> str | None: + for p in TELEGRAM_TOKEN_PATHS: + if p.exists(): + return p.read_text().strip() + return os.environ.get("TELEGRAM_BOT_TOKEN") or None + + +def telegram_send(text: str) -> bool: + token = _read_token() + if not token: + print("[WARN] No Telegram token found.", file=sys.stderr) + return False + url = f"https://api.telegram.org/bot{token}/sendMessage" + body = json.dumps({"chat_id": TELEGRAM_CHAT, "text": text, "parse_mode": "HTML"}).encode() + req = urllib.request.Request(url, data=body, headers={"Content-Type": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return resp.status == 200 + except Exception as e: + print(f"[WARN] Telegram send failed: {e}", file=sys.stderr) + return False + + +class Milestone: + def __init__(self, m_id: str, title: str, trigger: str, message: str): + self.id = m_id + self.title = title + self.trigger = trigger + self.message = message + + +class Phase: + def __init__(self, name: str, number: int, unlock_condition: str | None): + self.name = name + self.number = number + self.unlock_condition = unlock_condition + self.milestones: list[Milestone] = [] + class PhaseTracker: def __init__(self): - # Find files relative to repo root - script_dir = os.path.dirname(os.path.abspath(__file__)) - repo_root = os.path.dirname(script_dir) - - self.milestones_path = os.path.join(repo_root, MILESTONES_FILE) - self.completed_path = os.path.join(repo_root, COMPLETED_FILE) - - self.milestones = self.parse_milestones() - self.completed = self.load_completed() + self.repo_root = _find_repo_root() + self.milestones_path = self.repo_root / MILESTONES_FILE + self.completed_path = self.repo_root / COMPLETED_FILE + self.phases: list[Phase] = self._parse_milestones() + self.completed: set[str] = self._load_completed() + + def _parse_milestones(self) -> list[Phase]: + if not self.milestones_path.exists(): + return [] + content = self.milestones_path.read_text() + phases: list[Phase] = [] + current_phase: Phase | None = None + + for line in content.splitlines(): + phase_match = re.match(r"##\s*Phase\s*(\d+):\s*(.+?)\s*(?:\(([^)]+)\))?\s*$", line) + if phase_match: + num = int(phase_match.group(1)) + name = phase_match.group(2).strip() + unlock = phase_match.group(3) + current_phase = Phase(name, num, unlock) + phases.append(current_phase) + continue + + m_match = re.match(r"###\s*(M\d+):\s*(.+)$", line) + if m_match and current_phase is not None: + m_id = m_match.group(1) + title = m_match.group(2).strip() + current_phase.milestones.append(Milestone(m_id, title, "", "")) + continue + + if line.startswith("**Trigger:**") and current_phase and current_phase.milestones: + current_phase.milestones[-1].trigger = line.replace("**Trigger:**", "").strip() + continue + + if line.startswith("**Message:**") and current_phase and current_phase.milestones: + current_phase.milestones[-1].message = line.replace("**Message:**", "").strip().strip('"') + continue - def parse_milestones(self): - if not os.path.exists(self.milestones_path): - return {} - - with open(self.milestones_path, "r") as f: - content = f.read() - - phases = {} - current_phase = None - - for line in content.split("\n"): - if line.startswith("## Phase"): - current_phase = line.replace("## ", "").strip() - phases[current_phase] = [] - elif line.startswith("### M"): - m_id = line.split(":")[0].replace("### ", "").strip() - title = line.split(":")[1].strip() - phases[current_phase].append({"id": m_id, "title": title}) - return phases - def load_completed(self): - if os.path.exists(self.completed_path): - with open(self.completed_path, "r") as f: - try: - return json.load(f) - except: - return [] - return [] + def _load_completed(self) -> set[str]: + if self.completed_path.exists(): + try: + data = json.loads(self.completed_path.read_text()) + if isinstance(data, list): + return set(data) + except Exception: + pass + return set() def save_completed(self): - with open(self.completed_path, "w") as f: - json.dump(self.completed, f, indent=2) + self.completed_path.write_text(json.dumps(sorted(self.completed), indent=2)) - def show_progress(self): - print("--- Fleet Phase Progression Tracker ---") - total_milestones = 0 - total_completed = 0 - - if not self.milestones: - print("[ERROR] No milestones found in fleet/milestones.md") - return - - for phase, ms in self.milestones.items(): - print(f"\n{phase}") - for m in ms: - total_milestones += 1 - done = m["id"] in self.completed - if done: - total_completed += 1 - status = "✅" if done else "⭕" - print(f" {status} {m['id']}: {m['title']}") - - percent = (total_completed / total_milestones) * 100 if total_milestones > 0 else 0 - print(f"\nOverall Progress: {total_completed}/{total_milestones} ({percent:.1f}%)") - - def mark_complete(self, m_id: str): + def mark_complete(self, m_id: str) -> bool: + m_id = m_id.upper() + exists = any(m.id == m_id for p in self.phases for m in p.milestones) + if not exists: + print(f"[ERROR] Unknown milestone: {m_id}") + return False if m_id not in self.completed: - self.completed.append(m_id) + self.completed.add(m_id) self.save_completed() print(f"[SUCCESS] Marked {m_id} as complete.") + return True + print(f"[INFO] {m_id} is already complete.") + return True + + def _get_phase_state(self) -> tuple[int, float, list[str], list[str]]: + """Returns (current_phase_number, decimal_progress, blockers, next_milestones).""" + blockers = [] + next_milestones = [] + + for phase in self.phases: + phase_completed = sum(1 for m in phase.milestones if m.id in self.completed) + phase_total = len(phase.milestones) + if phase_total == 0: + continue + + if phase_completed < phase_total: + progress = phase_completed / phase_total + decimal = phase.number + progress + # Find next incomplete milestone + for m in phase.milestones: + if m.id not in self.completed: + next_milestones.append(f"{m.id}: {m.title}") + if m.trigger: + blockers.append(f"{m.id}: {m.trigger}") + break + # Phase unlock condition as blocker if near end + if phase_completed == phase_total - 1 and phase.unlock_condition: + blockers.append(f"Unlock Phase {phase.number + 1}: {phase.unlock_condition}") + return phase.number, decimal, blockers, next_milestones + + # All done + last = self.phases[-1] if self.phases else None + if last: + return last.number, float(last.number) + 1.0, ["All phases complete."], [] + return 0, 0.0, ["No milestones defined."], [] + + def show_progress(self): + phase_num, decimal, blockers, next_ms = self._get_phase_state() + total_ms = sum(len(p.milestones) for p in self.phases) + total_completed = len(self.completed) + overall_pct = (total_completed / total_ms * 100) if total_ms else 0 + + print("=" * 50) + print(" Fleet Phase Progression Tracker") + print("=" * 50) + print(f"\nCurrent Phase: Phase {phase_num} — {self.phases[phase_num - 1].name if phase_num <= len(self.phases) else 'Complete'}") + print(f"Decimal Progress: Phase {decimal:.1f}") + print(f"Overall: {total_completed}/{total_ms} milestones ({overall_pct:.1f}%)") + + print("\n--- Milestones ---") + for phase in self.phases: + done = sum(1 for m in phase.milestones if m.id in self.completed) + total = len(phase.milestones) + status = "✅" if done == total else "⏳" + print(f"\n{status} Phase {phase.number}: {phase.name} ({done}/{total})") + for m in phase.milestones: + mark = "✅" if m.id in self.completed else "⭕" + print(f" {mark} {m.id}: {m.title}") + + print("\n--- Next Up ---") + for nm in next_ms[:3]: + print(f" → {nm}") + + print("\n--- Blockers ---") + for b in blockers[:5]: + print(f" ⚠️ {b}") + if not blockers: + print(" 🚀 Nothing blocking.") + print() + + def summary_text(self) -> str: + phase_num, decimal, blockers, next_ms = self._get_phase_state() + total_ms = sum(len(p.milestones) for p in self.phases) + total_completed = len(self.completed) + overall_pct = (total_completed / total_ms * 100) if total_ms else 0 + + phase_name = self.phases[phase_num - 1].name if phase_num <= len(self.phases) else "Complete" + next_phase = phase_num + 1 if phase_num < len(self.phases) else phase_num + progress_to_next = (decimal - phase_num) * 100 + + lines = [ + f"Fleet: Phase {decimal:.1f} ({progress_to_next:.0f}% to Phase {next_phase})", + f"Phase: {phase_num} — {phase_name}", + f"Overall: {total_completed}/{total_ms} milestones ({overall_pct:.1f}%)", + ] + if next_ms: + lines.append(f"Next: {next_ms[0]}") + if blockers and blockers[0] != "All phases complete.": + lines.append(f"Blocker: {blockers[0]}") + return "\n".join(lines) + + # === Auto-evaluation heuristics === + + def _eval_file_exists(self, path: str) -> bool: + return (self.repo_root / path).exists() + + def _eval_command(self, cmd: str) -> bool: + try: + result = subprocess.run(cmd, shell=True, capture_output=True, timeout=10) + return result.returncode == 0 + except Exception: + return False + + def _eval_uptime(self, target: float) -> bool: + if not UPTIME_FILE.exists(): + return False + try: + data = json.loads(UPTIME_FILE.read_text()) + uptime = data.get("uptime_30d_percent", 0.0) + return uptime >= target + except Exception: + return False + + def _eval_local_model_multi(self) -> bool: + count = 0 + for host in HOSTS: + if self._eval_command(f"ssh -o ConnectTimeout=5 {host} 'pgrep -f ollama >/dev/null 2>&1'"): + count += 1 + return count >= 2 + + def _eval_zero_manual_restarts(self, days: int = 7) -> bool: + log = LOG_DIR / "auto_restart.log" + if not log.exists(): + return False + cutoff = datetime.now(timezone.utc) - timedelta(days=days) + try: + with open(log) as f: + for line in f: + if "manual restart" in line.lower(): + # crude timestamp parse + try: + ts = datetime.fromisoformat(line[:19]) + if ts > cutoff: + return False + except Exception: + continue + return True + except Exception: + return False + + def evaluate(self): + """Auto-check milestones where we have heuristics.""" + print("[EVAL] Running automatic milestone checks...\n") + checks = [ + ("M1", self._eval_command, "python3 fleet/health_check.py --dry-run 2>/dev/null || python3 fleet/health_check.py 2>&1 | head -1 >/dev/null"), + ("M2", self._eval_command, "test -f ~/.local/timmy/fleet-health/auto_restart.log && grep -q 'restarted' ~/.local/timmy/fleet-health/auto_restart.log"), + ("M3", self._eval_command, "test -d ~/.local/timmy/backups && ls ~/.local/timmy/backups | grep -q ."), + ("M4", self._eval_uptime, 95.0), + ("M5", self._eval_uptime, 97.0), + ("M6", self._eval_zero_manual_restarts, 7), + ("M9", self._eval_uptime, 98.0), + ("M11", self._eval_local_model_multi, None), + ] + newly_found = [] + for m_id, check_fn, arg in checks: + if m_id in self.completed: + continue + result = check_fn(arg) if arg is not None else check_fn() + if result: + print(f" ✅ {m_id} appears satisfied — marking complete.") + self.completed.add(m_id) + newly_found.append(m_id) + else: + print(f" ⭕ {m_id} not yet satisfied.") + + if newly_found: + self.save_completed() + print(f"\n[SUCCESS] Auto-completed {len(newly_found)} milestone(s): {', '.join(newly_found)}") else: - print(f"[INFO] {m_id} is already complete.") + print("\n[INFO] No new milestones auto-detected.") + + def daily(self): + self.evaluate() + text = self.summary_text() + print(text) + ok = telegram_send(text) + if ok: + print("\n[TELEGRAM] Daily update sent.") + else: + print("\n[TELEGRAM] Failed to send update.") + def main(): - parser = argparse.ArgumentParser(description="Gemini Phase Tracker") + parser = argparse.ArgumentParser(description="Fleet Phase Progression Tracker") subparsers = parser.add_subparsers(dest="command") - + subparsers.add_parser("status", help="Show current progress") - + subparsers.add_parser("evaluate", help="Auto-evaluate checkable milestones") + subparsers.add_parser("telegram", help="Post summary to Telegram") + subparsers.add_parser("daily", help="Evaluate then post to Telegram") + complete_parser = subparsers.add_parser("complete", help="Mark a milestone as complete") complete_parser.add_argument("id", help="Milestone ID (e.g. M1)") - + args = parser.parse_args() - tracker = PhaseTracker() - + if args.command == "status": tracker.show_progress() + elif args.command == "evaluate": + tracker.evaluate() + elif args.command == "telegram": + ok = telegram_send(tracker.summary_text()) + sys.exit(0 if ok else 1) + elif args.command == "daily": + tracker.daily() elif args.command == "complete": - tracker.mark_complete(args.id) + ok = tracker.mark_complete(args.id) + sys.exit(0 if ok else 1) else: parser.print_help() + if __name__ == "__main__": main()