Compare commits
3 Commits
burn/timmy
...
fix/622
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
91d94e29e8 | ||
| ae8c1d46ae | |||
|
|
508441acb4 |
@@ -4,111 +4,365 @@
|
||||
Part of the Gemini Sovereign Infrastructure Suite.
|
||||
|
||||
Tracks the fleet's progress through the Paperclips-inspired evolution arc.
|
||||
|
||||
Usage:
|
||||
python3 scripts/phase_tracker.py status # Show current state
|
||||
python3 scripts/phase_tracker.py evaluate # Auto-evaluate checkable milestones
|
||||
python3 scripts/phase_tracker.py complete M4 # Mark milestone complete
|
||||
python3 scripts/phase_tracker.py telegram # Post daily update to Telegram
|
||||
python3 scripts/phase_tracker.py daily # evaluate + telegram
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import re
|
||||
import argparse
|
||||
import urllib.request
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from datetime import datetime, timezone, timedelta
|
||||
|
||||
MILESTONES_FILE = "fleet/milestones.md"
|
||||
COMPLETED_FILE = "fleet/completed_milestones.json"
|
||||
LOG_DIR = Path(os.path.expanduser("~/.local/timmy/fleet-health"))
|
||||
UPTIME_FILE = LOG_DIR / "uptime.json"
|
||||
|
||||
TELEGRAM_TOKEN_PATHS = [
|
||||
Path.home() / ".config" / "timmy" / "telegram_bot_token",
|
||||
Path.home() / ".hermes" / "telegram_bot_token",
|
||||
Path.home() / ".hermes" / "telegram_token",
|
||||
]
|
||||
TELEGRAM_CHAT = os.environ.get("TELEGRAM_HOME_CHANNEL", "-1003664764329")
|
||||
|
||||
HOSTS = {
|
||||
"ezra": {"ip": "143.198.27.163"},
|
||||
"allegro": {"ip": "167.99.126.228"},
|
||||
"bezalel": {"ip": "159.203.146.185"},
|
||||
}
|
||||
|
||||
|
||||
def _find_repo_root() -> Path:
|
||||
script_dir = Path(__file__).resolve().parent
|
||||
return script_dir.parent
|
||||
|
||||
|
||||
def _read_token() -> str | None:
|
||||
for p in TELEGRAM_TOKEN_PATHS:
|
||||
if p.exists():
|
||||
return p.read_text().strip()
|
||||
return os.environ.get("TELEGRAM_BOT_TOKEN") or None
|
||||
|
||||
|
||||
def telegram_send(text: str) -> bool:
|
||||
token = _read_token()
|
||||
if not token:
|
||||
print("[WARN] No Telegram token found.", file=sys.stderr)
|
||||
return False
|
||||
url = f"https://api.telegram.org/bot{token}/sendMessage"
|
||||
body = json.dumps({"chat_id": TELEGRAM_CHAT, "text": text, "parse_mode": "HTML"}).encode()
|
||||
req = urllib.request.Request(url, data=body, headers={"Content-Type": "application/json"})
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return resp.status == 200
|
||||
except Exception as e:
|
||||
print(f"[WARN] Telegram send failed: {e}", file=sys.stderr)
|
||||
return False
|
||||
|
||||
|
||||
class Milestone:
|
||||
def __init__(self, m_id: str, title: str, trigger: str, message: str):
|
||||
self.id = m_id
|
||||
self.title = title
|
||||
self.trigger = trigger
|
||||
self.message = message
|
||||
|
||||
|
||||
class Phase:
|
||||
def __init__(self, name: str, number: int, unlock_condition: str | None):
|
||||
self.name = name
|
||||
self.number = number
|
||||
self.unlock_condition = unlock_condition
|
||||
self.milestones: list[Milestone] = []
|
||||
|
||||
|
||||
class PhaseTracker:
|
||||
def __init__(self):
|
||||
# Find files relative to repo root
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
repo_root = os.path.dirname(script_dir)
|
||||
|
||||
self.milestones_path = os.path.join(repo_root, MILESTONES_FILE)
|
||||
self.completed_path = os.path.join(repo_root, COMPLETED_FILE)
|
||||
|
||||
self.milestones = self.parse_milestones()
|
||||
self.completed = self.load_completed()
|
||||
self.repo_root = _find_repo_root()
|
||||
self.milestones_path = self.repo_root / MILESTONES_FILE
|
||||
self.completed_path = self.repo_root / COMPLETED_FILE
|
||||
self.phases: list[Phase] = self._parse_milestones()
|
||||
self.completed: set[str] = self._load_completed()
|
||||
|
||||
def _parse_milestones(self) -> list[Phase]:
|
||||
if not self.milestones_path.exists():
|
||||
return []
|
||||
content = self.milestones_path.read_text()
|
||||
phases: list[Phase] = []
|
||||
current_phase: Phase | None = None
|
||||
|
||||
for line in content.splitlines():
|
||||
phase_match = re.match(r"##\s*Phase\s*(\d+):\s*(.+?)\s*(?:\(([^)]+)\))?\s*$", line)
|
||||
if phase_match:
|
||||
num = int(phase_match.group(1))
|
||||
name = phase_match.group(2).strip()
|
||||
unlock = phase_match.group(3)
|
||||
current_phase = Phase(name, num, unlock)
|
||||
phases.append(current_phase)
|
||||
continue
|
||||
|
||||
m_match = re.match(r"###\s*(M\d+):\s*(.+)$", line)
|
||||
if m_match and current_phase is not None:
|
||||
m_id = m_match.group(1)
|
||||
title = m_match.group(2).strip()
|
||||
current_phase.milestones.append(Milestone(m_id, title, "", ""))
|
||||
continue
|
||||
|
||||
if line.startswith("**Trigger:**") and current_phase and current_phase.milestones:
|
||||
current_phase.milestones[-1].trigger = line.replace("**Trigger:**", "").strip()
|
||||
continue
|
||||
|
||||
if line.startswith("**Message:**") and current_phase and current_phase.milestones:
|
||||
current_phase.milestones[-1].message = line.replace("**Message:**", "").strip().strip('"')
|
||||
continue
|
||||
|
||||
def parse_milestones(self):
|
||||
if not os.path.exists(self.milestones_path):
|
||||
return {}
|
||||
|
||||
with open(self.milestones_path, "r") as f:
|
||||
content = f.read()
|
||||
|
||||
phases = {}
|
||||
current_phase = None
|
||||
|
||||
for line in content.split("\n"):
|
||||
if line.startswith("## Phase"):
|
||||
current_phase = line.replace("## ", "").strip()
|
||||
phases[current_phase] = []
|
||||
elif line.startswith("### M"):
|
||||
m_id = line.split(":")[0].replace("### ", "").strip()
|
||||
title = line.split(":")[1].strip()
|
||||
phases[current_phase].append({"id": m_id, "title": title})
|
||||
|
||||
return phases
|
||||
|
||||
def load_completed(self):
|
||||
if os.path.exists(self.completed_path):
|
||||
with open(self.completed_path, "r") as f:
|
||||
try:
|
||||
return json.load(f)
|
||||
except:
|
||||
return []
|
||||
return []
|
||||
def _load_completed(self) -> set[str]:
|
||||
if self.completed_path.exists():
|
||||
try:
|
||||
data = json.loads(self.completed_path.read_text())
|
||||
if isinstance(data, list):
|
||||
return set(data)
|
||||
except Exception:
|
||||
pass
|
||||
return set()
|
||||
|
||||
def save_completed(self):
|
||||
with open(self.completed_path, "w") as f:
|
||||
json.dump(self.completed, f, indent=2)
|
||||
self.completed_path.write_text(json.dumps(sorted(self.completed), indent=2))
|
||||
|
||||
def show_progress(self):
|
||||
print("--- Fleet Phase Progression Tracker ---")
|
||||
total_milestones = 0
|
||||
total_completed = 0
|
||||
|
||||
if not self.milestones:
|
||||
print("[ERROR] No milestones found in fleet/milestones.md")
|
||||
return
|
||||
|
||||
for phase, ms in self.milestones.items():
|
||||
print(f"\n{phase}")
|
||||
for m in ms:
|
||||
total_milestones += 1
|
||||
done = m["id"] in self.completed
|
||||
if done:
|
||||
total_completed += 1
|
||||
status = "✅" if done else "⭕"
|
||||
print(f" {status} {m['id']}: {m['title']}")
|
||||
|
||||
percent = (total_completed / total_milestones) * 100 if total_milestones > 0 else 0
|
||||
print(f"\nOverall Progress: {total_completed}/{total_milestones} ({percent:.1f}%)")
|
||||
|
||||
def mark_complete(self, m_id: str):
|
||||
def mark_complete(self, m_id: str) -> bool:
|
||||
m_id = m_id.upper()
|
||||
exists = any(m.id == m_id for p in self.phases for m in p.milestones)
|
||||
if not exists:
|
||||
print(f"[ERROR] Unknown milestone: {m_id}")
|
||||
return False
|
||||
if m_id not in self.completed:
|
||||
self.completed.append(m_id)
|
||||
self.completed.add(m_id)
|
||||
self.save_completed()
|
||||
print(f"[SUCCESS] Marked {m_id} as complete.")
|
||||
return True
|
||||
print(f"[INFO] {m_id} is already complete.")
|
||||
return True
|
||||
|
||||
def _get_phase_state(self) -> tuple[int, float, list[str], list[str]]:
|
||||
"""Returns (current_phase_number, decimal_progress, blockers, next_milestones)."""
|
||||
blockers = []
|
||||
next_milestones = []
|
||||
|
||||
for phase in self.phases:
|
||||
phase_completed = sum(1 for m in phase.milestones if m.id in self.completed)
|
||||
phase_total = len(phase.milestones)
|
||||
if phase_total == 0:
|
||||
continue
|
||||
|
||||
if phase_completed < phase_total:
|
||||
progress = phase_completed / phase_total
|
||||
decimal = phase.number + progress
|
||||
# Find next incomplete milestone
|
||||
for m in phase.milestones:
|
||||
if m.id not in self.completed:
|
||||
next_milestones.append(f"{m.id}: {m.title}")
|
||||
if m.trigger:
|
||||
blockers.append(f"{m.id}: {m.trigger}")
|
||||
break
|
||||
# Phase unlock condition as blocker if near end
|
||||
if phase_completed == phase_total - 1 and phase.unlock_condition:
|
||||
blockers.append(f"Unlock Phase {phase.number + 1}: {phase.unlock_condition}")
|
||||
return phase.number, decimal, blockers, next_milestones
|
||||
|
||||
# All done
|
||||
last = self.phases[-1] if self.phases else None
|
||||
if last:
|
||||
return last.number, float(last.number) + 1.0, ["All phases complete."], []
|
||||
return 0, 0.0, ["No milestones defined."], []
|
||||
|
||||
def show_progress(self):
|
||||
phase_num, decimal, blockers, next_ms = self._get_phase_state()
|
||||
total_ms = sum(len(p.milestones) for p in self.phases)
|
||||
total_completed = len(self.completed)
|
||||
overall_pct = (total_completed / total_ms * 100) if total_ms else 0
|
||||
|
||||
print("=" * 50)
|
||||
print(" Fleet Phase Progression Tracker")
|
||||
print("=" * 50)
|
||||
print(f"\nCurrent Phase: Phase {phase_num} — {self.phases[phase_num - 1].name if phase_num <= len(self.phases) else 'Complete'}")
|
||||
print(f"Decimal Progress: Phase {decimal:.1f}")
|
||||
print(f"Overall: {total_completed}/{total_ms} milestones ({overall_pct:.1f}%)")
|
||||
|
||||
print("\n--- Milestones ---")
|
||||
for phase in self.phases:
|
||||
done = sum(1 for m in phase.milestones if m.id in self.completed)
|
||||
total = len(phase.milestones)
|
||||
status = "✅" if done == total else "⏳"
|
||||
print(f"\n{status} Phase {phase.number}: {phase.name} ({done}/{total})")
|
||||
for m in phase.milestones:
|
||||
mark = "✅" if m.id in self.completed else "⭕"
|
||||
print(f" {mark} {m.id}: {m.title}")
|
||||
|
||||
print("\n--- Next Up ---")
|
||||
for nm in next_ms[:3]:
|
||||
print(f" → {nm}")
|
||||
|
||||
print("\n--- Blockers ---")
|
||||
for b in blockers[:5]:
|
||||
print(f" ⚠️ {b}")
|
||||
if not blockers:
|
||||
print(" 🚀 Nothing blocking.")
|
||||
print()
|
||||
|
||||
def summary_text(self) -> str:
|
||||
phase_num, decimal, blockers, next_ms = self._get_phase_state()
|
||||
total_ms = sum(len(p.milestones) for p in self.phases)
|
||||
total_completed = len(self.completed)
|
||||
overall_pct = (total_completed / total_ms * 100) if total_ms else 0
|
||||
|
||||
phase_name = self.phases[phase_num - 1].name if phase_num <= len(self.phases) else "Complete"
|
||||
next_phase = phase_num + 1 if phase_num < len(self.phases) else phase_num
|
||||
progress_to_next = (decimal - phase_num) * 100
|
||||
|
||||
lines = [
|
||||
f"Fleet: Phase {decimal:.1f} ({progress_to_next:.0f}% to Phase {next_phase})",
|
||||
f"Phase: {phase_num} — {phase_name}",
|
||||
f"Overall: {total_completed}/{total_ms} milestones ({overall_pct:.1f}%)",
|
||||
]
|
||||
if next_ms:
|
||||
lines.append(f"Next: {next_ms[0]}")
|
||||
if blockers and blockers[0] != "All phases complete.":
|
||||
lines.append(f"Blocker: {blockers[0]}")
|
||||
return "\n".join(lines)
|
||||
|
||||
# === Auto-evaluation heuristics ===
|
||||
|
||||
def _eval_file_exists(self, path: str) -> bool:
|
||||
return (self.repo_root / path).exists()
|
||||
|
||||
def _eval_command(self, cmd: str) -> bool:
|
||||
try:
|
||||
result = subprocess.run(cmd, shell=True, capture_output=True, timeout=10)
|
||||
return result.returncode == 0
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def _eval_uptime(self, target: float) -> bool:
|
||||
if not UPTIME_FILE.exists():
|
||||
return False
|
||||
try:
|
||||
data = json.loads(UPTIME_FILE.read_text())
|
||||
uptime = data.get("uptime_30d_percent", 0.0)
|
||||
return uptime >= target
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def _eval_local_model_multi(self) -> bool:
|
||||
count = 0
|
||||
for host in HOSTS:
|
||||
if self._eval_command(f"ssh -o ConnectTimeout=5 {host} 'pgrep -f ollama >/dev/null 2>&1'"):
|
||||
count += 1
|
||||
return count >= 2
|
||||
|
||||
def _eval_zero_manual_restarts(self, days: int = 7) -> bool:
|
||||
log = LOG_DIR / "auto_restart.log"
|
||||
if not log.exists():
|
||||
return False
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
|
||||
try:
|
||||
with open(log) as f:
|
||||
for line in f:
|
||||
if "manual restart" in line.lower():
|
||||
# crude timestamp parse
|
||||
try:
|
||||
ts = datetime.fromisoformat(line[:19])
|
||||
if ts > cutoff:
|
||||
return False
|
||||
except Exception:
|
||||
continue
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def evaluate(self):
|
||||
"""Auto-check milestones where we have heuristics."""
|
||||
print("[EVAL] Running automatic milestone checks...\n")
|
||||
checks = [
|
||||
("M1", self._eval_command, "python3 fleet/health_check.py --dry-run 2>/dev/null || python3 fleet/health_check.py 2>&1 | head -1 >/dev/null"),
|
||||
("M2", self._eval_command, "test -f ~/.local/timmy/fleet-health/auto_restart.log && grep -q 'restarted' ~/.local/timmy/fleet-health/auto_restart.log"),
|
||||
("M3", self._eval_command, "test -d ~/.local/timmy/backups && ls ~/.local/timmy/backups | grep -q ."),
|
||||
("M4", self._eval_uptime, 95.0),
|
||||
("M5", self._eval_uptime, 97.0),
|
||||
("M6", self._eval_zero_manual_restarts, 7),
|
||||
("M9", self._eval_uptime, 98.0),
|
||||
("M11", self._eval_local_model_multi, None),
|
||||
]
|
||||
newly_found = []
|
||||
for m_id, check_fn, arg in checks:
|
||||
if m_id in self.completed:
|
||||
continue
|
||||
result = check_fn(arg) if arg is not None else check_fn()
|
||||
if result:
|
||||
print(f" ✅ {m_id} appears satisfied — marking complete.")
|
||||
self.completed.add(m_id)
|
||||
newly_found.append(m_id)
|
||||
else:
|
||||
print(f" ⭕ {m_id} not yet satisfied.")
|
||||
|
||||
if newly_found:
|
||||
self.save_completed()
|
||||
print(f"\n[SUCCESS] Auto-completed {len(newly_found)} milestone(s): {', '.join(newly_found)}")
|
||||
else:
|
||||
print(f"[INFO] {m_id} is already complete.")
|
||||
print("\n[INFO] No new milestones auto-detected.")
|
||||
|
||||
def daily(self):
|
||||
self.evaluate()
|
||||
text = self.summary_text()
|
||||
print(text)
|
||||
ok = telegram_send(text)
|
||||
if ok:
|
||||
print("\n[TELEGRAM] Daily update sent.")
|
||||
else:
|
||||
print("\n[TELEGRAM] Failed to send update.")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Gemini Phase Tracker")
|
||||
parser = argparse.ArgumentParser(description="Fleet Phase Progression Tracker")
|
||||
subparsers = parser.add_subparsers(dest="command")
|
||||
|
||||
|
||||
subparsers.add_parser("status", help="Show current progress")
|
||||
|
||||
subparsers.add_parser("evaluate", help="Auto-evaluate checkable milestones")
|
||||
subparsers.add_parser("telegram", help="Post summary to Telegram")
|
||||
subparsers.add_parser("daily", help="Evaluate then post to Telegram")
|
||||
|
||||
complete_parser = subparsers.add_parser("complete", help="Mark a milestone as complete")
|
||||
complete_parser.add_argument("id", help="Milestone ID (e.g. M1)")
|
||||
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
tracker = PhaseTracker()
|
||||
|
||||
|
||||
if args.command == "status":
|
||||
tracker.show_progress()
|
||||
elif args.command == "evaluate":
|
||||
tracker.evaluate()
|
||||
elif args.command == "telegram":
|
||||
ok = telegram_send(tracker.summary_text())
|
||||
sys.exit(0 if ok else 1)
|
||||
elif args.command == "daily":
|
||||
tracker.daily()
|
||||
elif args.command == "complete":
|
||||
tracker.mark_complete(args.id)
|
||||
ok = tracker.mark_complete(args.id)
|
||||
sys.exit(0 if ok else 1)
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -1,194 +1,20 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Token Budget Tracker -- real-time spend dashboard for pipelines."""
|
||||
"""Compatibility wrapper for the token budget tracker CLI.
|
||||
|
||||
Issue #622 asked for a `token-tracker.py` entrypoint. The maintained
|
||||
implementation lives in `scripts/token_tracker.py`. Keep this thin shim so
|
||||
operator docs and older calls continue to work.
|
||||
"""
|
||||
|
||||
import argparse, json, os, sqlite3, sys, time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
import sys
|
||||
|
||||
DB_PATH = Path.home() / ".hermes" / "pipelines" / "token_usage.db"
|
||||
ALERT_THRESHOLDS = [0.5, 0.8, 1.0]
|
||||
DEFAULT_BUDGETS = {
|
||||
"knowledge-mine": 200_000_000,
|
||||
"training-factory": 215_000_000,
|
||||
"playground": 16_000_000,
|
||||
"adversary": 17_000_000,
|
||||
}
|
||||
SCRIPT_DIR = Path(__file__).resolve().parent
|
||||
if str(SCRIPT_DIR) not in sys.path:
|
||||
sys.path.insert(0, str(SCRIPT_DIR))
|
||||
|
||||
def get_db():
|
||||
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
conn = sqlite3.connect(str(DB_PATH))
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.executescript("""
|
||||
CREATE TABLE IF NOT EXISTS token_usage (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
pipeline TEXT NOT NULL,
|
||||
worker TEXT NOT NULL,
|
||||
tokens INTEGER NOT NULL,
|
||||
recorded_at REAL NOT NULL,
|
||||
hour_bucket TEXT NOT NULL
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS pipeline_budgets (
|
||||
pipeline TEXT PRIMARY KEY,
|
||||
target_tokens INTEGER NOT NULL,
|
||||
updated_at REAL NOT NULL
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS alerts_sent (
|
||||
pipeline TEXT NOT NULL,
|
||||
threshold REAL NOT NULL,
|
||||
sent_at REAL NOT NULL,
|
||||
PRIMARY KEY (pipeline, threshold)
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_usage_pipeline_hour
|
||||
ON token_usage(pipeline, hour_bucket);
|
||||
""")
|
||||
for name, target in DEFAULT_BUDGETS.items():
|
||||
conn.execute(
|
||||
"INSERT OR IGNORE INTO pipeline_budgets (pipeline, target_tokens, updated_at) VALUES (?, ?, ?)",
|
||||
(name, target, time.time())
|
||||
)
|
||||
conn.commit()
|
||||
return conn
|
||||
from token_tracker import main
|
||||
|
||||
def log_usage(conn, pipeline, worker, tokens):
|
||||
now = time.time()
|
||||
hour = datetime.now().strftime("%Y-%m-%d %H:00")
|
||||
conn.execute(
|
||||
"INSERT INTO token_usage (pipeline, worker, tokens, recorded_at, hour_bucket) VALUES (?, ?, ?, ?, ?)",
|
||||
(pipeline, worker, tokens, now, hour)
|
||||
)
|
||||
conn.commit()
|
||||
check_alerts(conn, pipeline)
|
||||
|
||||
def get_pipeline_stats(conn):
|
||||
rows = conn.execute("""
|
||||
SELECT u.pipeline, COALESCE(b.target_tokens, 0) as target,
|
||||
SUM(u.tokens) as used, MIN(u.recorded_at) as started_at,
|
||||
COUNT(DISTINCT u.worker) as workers
|
||||
FROM token_usage u
|
||||
LEFT JOIN pipeline_budgets b ON u.pipeline = b.pipeline
|
||||
GROUP BY u.pipeline ORDER BY used DESC
|
||||
""").fetchall()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
def fmt(n):
|
||||
if n >= 1_000_000_000: return f"{n/1_000_000_000:.1f}B"
|
||||
if n >= 1_000_000: return f"{n/1_000_000:.1f}M"
|
||||
if n >= 1_000: return f"{n/1_000:.1f}K"
|
||||
return str(n)
|
||||
|
||||
def bar(ratio, w=8):
|
||||
filled = int(ratio * w)
|
||||
return "█" * filled + "░" * (w - filled)
|
||||
|
||||
def eta(used, target, started):
|
||||
if used <= 0 or started <= 0: return "--"
|
||||
elapsed = (time.time() - started) / 3600
|
||||
if elapsed <= 0: return "--"
|
||||
rate = used / elapsed
|
||||
remaining = target - used
|
||||
if remaining <= 0: return "DONE"
|
||||
h = remaining / rate
|
||||
return f"{h/24:.1f}d" if h >= 24 else f"{h:.1f}h"
|
||||
|
||||
def render_dashboard(conn):
|
||||
stats = get_pipeline_stats(conn)
|
||||
if not stats:
|
||||
print("No pipeline data recorded yet.")
|
||||
return
|
||||
print()
|
||||
print(f"{'Pipeline':<20} {'Tokens Used':>12} {'Target':>10} {'Progress':>10} {'ETA':>8} {'Workers':>8}")
|
||||
print("-" * 72)
|
||||
total_used = total_target = 0
|
||||
for s in stats:
|
||||
used = s["used"] or 0
|
||||
target = s["target"] or 1
|
||||
ratio = min(used / target, 1.0) if target > 0 else 0
|
||||
print(f"{s['pipeline']:<20} {fmt(used):>12} {fmt(target):>10} {bar(ratio):>10} {eta(used, target, s['started_at'] or 0):>8} {s['workers'] or 0:>8}")
|
||||
total_used += used
|
||||
total_target += target
|
||||
print("-" * 72)
|
||||
r = min(total_used / total_target, 1.0) if total_target > 0 else 0
|
||||
print(f"{'TOTAL':<20} {fmt(total_used):>12} {fmt(total_target):>10} {bar(r):>10}")
|
||||
print()
|
||||
|
||||
def render_watch(conn, interval=5):
|
||||
try:
|
||||
while True:
|
||||
os.system("clear" if os.name != "nt" else "cls")
|
||||
print(f"Token Budget Tracker -- {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
print("Press Ctrl+C to exit")
|
||||
render_dashboard(conn)
|
||||
time.sleep(interval)
|
||||
except KeyboardInterrupt:
|
||||
print("\nExiting.")
|
||||
|
||||
def render_daily_summary(conn):
|
||||
since = time.time() - 86400
|
||||
rows = conn.execute("""
|
||||
SELECT pipeline, SUM(tokens) as total, COUNT(DISTINCT worker) as workers, COUNT(*) as entries
|
||||
FROM token_usage WHERE recorded_at >= ? GROUP BY pipeline ORDER BY total DESC
|
||||
""", (since,)).fetchall()
|
||||
if not rows:
|
||||
print("No usage in last 24 hours.")
|
||||
return
|
||||
print(f"\nDaily Summary -- last 24 hours")
|
||||
print(f"{'Pipeline':<20} {'Total Tokens':>14} {'Workers':>8} {'Entries':>8}")
|
||||
print("-" * 54)
|
||||
gt = 0
|
||||
for r in rows:
|
||||
print(f"{r['pipeline']:<20} {fmt(r['total']):>14} {r['workers']:>8} {r['entries']:>8}")
|
||||
gt += r["total"]
|
||||
print("-" * 54)
|
||||
print(f"{'TOTAL':<20} {fmt(gt):>14}\n")
|
||||
|
||||
def check_alerts(conn, pipeline):
|
||||
row = conn.execute(
|
||||
"SELECT SUM(u.tokens) as used, COALESCE(b.target_tokens, 0) as target "
|
||||
"FROM token_usage u LEFT JOIN pipeline_budgets b ON u.pipeline = b.pipeline "
|
||||
"WHERE u.pipeline = ?", (pipeline,)
|
||||
).fetchone()
|
||||
if not row or row["target"] <= 0: return
|
||||
ratio = row["used"] / row["target"]
|
||||
for t in ALERT_THRESHOLDS:
|
||||
if ratio >= t:
|
||||
existing = conn.execute("SELECT 1 FROM alerts_sent WHERE pipeline = ? AND threshold = ?", (pipeline, t)).fetchone()
|
||||
if not existing:
|
||||
print(f"⚠️ BUDGET ALERT: {pipeline} at {int(t*100)}% ({fmt(row['used'])}/{fmt(row['target'])})", file=sys.stderr)
|
||||
conn.execute("INSERT INTO alerts_sent (pipeline, threshold, sent_at) VALUES (?, ?, ?)", (pipeline, t, time.time()))
|
||||
conn.commit()
|
||||
|
||||
def set_budget(conn, pipeline, target):
|
||||
conn.execute("INSERT OR REPLACE INTO pipeline_budgets (pipeline, target_tokens, updated_at) VALUES (?, ?, ?)",
|
||||
(pipeline, int(target), time.time()))
|
||||
conn.execute("DELETE FROM alerts_sent WHERE pipeline = ?", (pipeline,))
|
||||
conn.commit()
|
||||
print(f"Budget set: {pipeline} = {fmt(int(target))} tokens")
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Token Budget Tracker")
|
||||
parser.add_argument("--watch", action="store_true")
|
||||
parser.add_argument("--watch-interval", type=int, default=5)
|
||||
parser.add_argument("--summary", action="store_true")
|
||||
parser.add_argument("--log", nargs=3, metavar=("PIPELINE", "WORKER", "TOKENS"))
|
||||
parser.add_argument("--budget", nargs=2, metavar=("PIPELINE", "TARGET"))
|
||||
parser.add_argument("--db", type=str, default=str(DB_PATH))
|
||||
args = parser.parse_args()
|
||||
global DB_PATH
|
||||
DB_PATH = Path(args.db)
|
||||
conn = get_db()
|
||||
if args.log:
|
||||
log_usage(conn, args.log[0], args.log[1], int(args.log[2]))
|
||||
print(f"Logged: {args.log[0]}/{args.log[1]} = {fmt(int(args.log[2]))} tokens")
|
||||
elif args.budget:
|
||||
set_budget(conn, args.budget[0], args.budget[1])
|
||||
elif args.summary:
|
||||
render_daily_summary(conn)
|
||||
elif args.watch:
|
||||
render_watch(conn, interval=args.watch_interval)
|
||||
else:
|
||||
render_dashboard(conn)
|
||||
conn.close()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -76,15 +76,20 @@ def record_usage(conn: sqlite3.Connection, pipeline: str, worker: str, tokens: i
|
||||
conn.commit()
|
||||
|
||||
|
||||
def normalize_since(since: str) -> str:
|
||||
"""Normalize ISO timestamps for SQLite datetime comparisons."""
|
||||
return since.replace("T", " ")
|
||||
|
||||
|
||||
def get_usage_since(conn: sqlite3.Connection, since: str) -> Dict[str, int]:
|
||||
"""Get total tokens per pipeline since a datetime."""
|
||||
cursor = conn.execute("""
|
||||
SELECT pipeline, SUM(tokens) as total
|
||||
FROM token_usage
|
||||
WHERE recorded_at >= ?
|
||||
WHERE datetime(recorded_at) >= datetime(?)
|
||||
GROUP BY pipeline
|
||||
ORDER BY total DESC
|
||||
""", (since,))
|
||||
""", (normalize_since(since),))
|
||||
return {row[0]: row[1] for row in cursor.fetchall()}
|
||||
|
||||
|
||||
@@ -94,10 +99,10 @@ def get_hourly_usage(conn: sqlite3.Connection, pipeline: str, hours: int = 24) -
|
||||
cursor = conn.execute("""
|
||||
SELECT hour_key, SUM(tokens) as total
|
||||
FROM token_usage
|
||||
WHERE pipeline = ? AND recorded_at >= ?
|
||||
WHERE pipeline = ? AND datetime(recorded_at) >= datetime(?)
|
||||
GROUP BY hour_key
|
||||
ORDER BY hour_key
|
||||
""", (pipeline, since))
|
||||
""", (pipeline, normalize_since(since)))
|
||||
return cursor.fetchall()
|
||||
|
||||
|
||||
@@ -106,10 +111,10 @@ def get_worker_usage(conn: sqlite3.Connection, pipeline: str, since: str) -> Dic
|
||||
cursor = conn.execute("""
|
||||
SELECT worker, SUM(tokens) as total
|
||||
FROM token_usage
|
||||
WHERE pipeline = ? AND recorded_at >= ?
|
||||
WHERE pipeline = ? AND datetime(recorded_at) >= datetime(?)
|
||||
GROUP BY worker
|
||||
ORDER BY total DESC
|
||||
""", (pipeline, since))
|
||||
""", (pipeline, normalize_since(since)))
|
||||
return {row[0]: row[1] for row in cursor.fetchall()}
|
||||
|
||||
|
||||
|
||||
@@ -5,11 +5,12 @@ Tests for scripts/token_tracker.py — Token Budget Tracker.
|
||||
import json
|
||||
import os
|
||||
import sqlite3
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
import sys
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
|
||||
from token_tracker import (
|
||||
get_db,
|
||||
@@ -155,5 +156,37 @@ class TestBudgets(unittest.TestCase):
|
||||
self.assertIn("p1", loaded)
|
||||
|
||||
|
||||
class TestWrapperScript(unittest.TestCase):
|
||||
def test_hyphen_wrapper_summary_works_with_custom_db(self):
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = Path(tmpdir) / "wrapper.db"
|
||||
budgets_path = Path(tmpdir) / "budgets.json"
|
||||
budgets_path.write_text(json.dumps({"knowledge-mine": 200_000_000}))
|
||||
conn = get_db(db_path)
|
||||
try:
|
||||
record_usage(conn, "knowledge-mine", "worker-1", 123456)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
repo_root = Path(__file__).parent.parent
|
||||
wrapper = repo_root / "scripts" / "token-tracker.py"
|
||||
result = subprocess.run(
|
||||
[
|
||||
sys.executable,
|
||||
str(wrapper),
|
||||
"--summary",
|
||||
"--db", str(db_path),
|
||||
"--budgets-file", str(budgets_path),
|
||||
],
|
||||
cwd=str(repo_root),
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
self.assertEqual(result.returncode, 0, result.stderr)
|
||||
self.assertIn("DAILY SUMMARY", result.stdout)
|
||||
self.assertIn("knowledge-mine", result.stdout)
|
||||
self.assertIn("123.5K", result.stdout)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user