diff --git a/scripts/token-tracker.py b/scripts/token-tracker.py new file mode 100644 index 00000000..50d31797 --- /dev/null +++ b/scripts/token-tracker.py @@ -0,0 +1,194 @@ +#!/usr/bin/env python3 +"""Token Budget Tracker -- real-time spend dashboard for pipelines.""" + +import argparse, json, os, sqlite3, sys, time +from datetime import datetime +from pathlib import Path + +DB_PATH = Path.home() / ".hermes" / "pipelines" / "token_usage.db" +ALERT_THRESHOLDS = [0.5, 0.8, 1.0] +DEFAULT_BUDGETS = { + "knowledge-mine": 200_000_000, + "training-factory": 215_000_000, + "playground": 16_000_000, + "adversary": 17_000_000, +} + +def get_db(): + DB_PATH.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.executescript(""" + CREATE TABLE IF NOT EXISTS token_usage ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + pipeline TEXT NOT NULL, + worker TEXT NOT NULL, + tokens INTEGER NOT NULL, + recorded_at REAL NOT NULL, + hour_bucket TEXT NOT NULL + ); + CREATE TABLE IF NOT EXISTS pipeline_budgets ( + pipeline TEXT PRIMARY KEY, + target_tokens INTEGER NOT NULL, + updated_at REAL NOT NULL + ); + CREATE TABLE IF NOT EXISTS alerts_sent ( + pipeline TEXT NOT NULL, + threshold REAL NOT NULL, + sent_at REAL NOT NULL, + PRIMARY KEY (pipeline, threshold) + ); + CREATE INDEX IF NOT EXISTS idx_usage_pipeline_hour + ON token_usage(pipeline, hour_bucket); + """) + for name, target in DEFAULT_BUDGETS.items(): + conn.execute( + "INSERT OR IGNORE INTO pipeline_budgets (pipeline, target_tokens, updated_at) VALUES (?, ?, ?)", + (name, target, time.time()) + ) + conn.commit() + return conn + +def log_usage(conn, pipeline, worker, tokens): + now = time.time() + hour = datetime.now().strftime("%Y-%m-%d %H:00") + conn.execute( + "INSERT INTO token_usage (pipeline, worker, tokens, recorded_at, hour_bucket) VALUES (?, ?, ?, ?, ?)", + (pipeline, worker, tokens, now, hour) + ) + conn.commit() + check_alerts(conn, pipeline) + +def get_pipeline_stats(conn): + rows = conn.execute(""" + SELECT u.pipeline, COALESCE(b.target_tokens, 0) as target, + SUM(u.tokens) as used, MIN(u.recorded_at) as started_at, + COUNT(DISTINCT u.worker) as workers + FROM token_usage u + LEFT JOIN pipeline_budgets b ON u.pipeline = b.pipeline + GROUP BY u.pipeline ORDER BY used DESC + """).fetchall() + return [dict(r) for r in rows] + +def fmt(n): + if n >= 1_000_000_000: return f"{n/1_000_000_000:.1f}B" + if n >= 1_000_000: return f"{n/1_000_000:.1f}M" + if n >= 1_000: return f"{n/1_000:.1f}K" + return str(n) + +def bar(ratio, w=8): + filled = int(ratio * w) + return "█" * filled + "░" * (w - filled) + +def eta(used, target, started): + if used <= 0 or started <= 0: return "--" + elapsed = (time.time() - started) / 3600 + if elapsed <= 0: return "--" + rate = used / elapsed + remaining = target - used + if remaining <= 0: return "DONE" + h = remaining / rate + return f"{h/24:.1f}d" if h >= 24 else f"{h:.1f}h" + +def render_dashboard(conn): + stats = get_pipeline_stats(conn) + if not stats: + print("No pipeline data recorded yet.") + return + print() + print(f"{'Pipeline':<20} {'Tokens Used':>12} {'Target':>10} {'Progress':>10} {'ETA':>8} {'Workers':>8}") + print("-" * 72) + total_used = total_target = 0 + for s in stats: + used = s["used"] or 0 + target = s["target"] or 1 + ratio = min(used / target, 1.0) if target > 0 else 0 + print(f"{s['pipeline']:<20} {fmt(used):>12} {fmt(target):>10} {bar(ratio):>10} {eta(used, target, s['started_at'] or 0):>8} {s['workers'] or 0:>8}") + total_used += used + total_target += target + print("-" * 72) + r = min(total_used / total_target, 1.0) if total_target > 0 else 0 + print(f"{'TOTAL':<20} {fmt(total_used):>12} {fmt(total_target):>10} {bar(r):>10}") + print() + +def render_watch(conn, interval=5): + try: + while True: + os.system("clear" if os.name != "nt" else "cls") + print(f"Token Budget Tracker -- {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print("Press Ctrl+C to exit") + render_dashboard(conn) + time.sleep(interval) + except KeyboardInterrupt: + print("\nExiting.") + +def render_daily_summary(conn): + since = time.time() - 86400 + rows = conn.execute(""" + SELECT pipeline, SUM(tokens) as total, COUNT(DISTINCT worker) as workers, COUNT(*) as entries + FROM token_usage WHERE recorded_at >= ? GROUP BY pipeline ORDER BY total DESC + """, (since,)).fetchall() + if not rows: + print("No usage in last 24 hours.") + return + print(f"\nDaily Summary -- last 24 hours") + print(f"{'Pipeline':<20} {'Total Tokens':>14} {'Workers':>8} {'Entries':>8}") + print("-" * 54) + gt = 0 + for r in rows: + print(f"{r['pipeline']:<20} {fmt(r['total']):>14} {r['workers']:>8} {r['entries']:>8}") + gt += r["total"] + print("-" * 54) + print(f"{'TOTAL':<20} {fmt(gt):>14}\n") + +def check_alerts(conn, pipeline): + row = conn.execute( + "SELECT SUM(u.tokens) as used, COALESCE(b.target_tokens, 0) as target " + "FROM token_usage u LEFT JOIN pipeline_budgets b ON u.pipeline = b.pipeline " + "WHERE u.pipeline = ?", (pipeline,) + ).fetchone() + if not row or row["target"] <= 0: return + ratio = row["used"] / row["target"] + for t in ALERT_THRESHOLDS: + if ratio >= t: + existing = conn.execute("SELECT 1 FROM alerts_sent WHERE pipeline = ? AND threshold = ?", (pipeline, t)).fetchone() + if not existing: + print(f"⚠️ BUDGET ALERT: {pipeline} at {int(t*100)}% ({fmt(row['used'])}/{fmt(row['target'])})", file=sys.stderr) + conn.execute("INSERT INTO alerts_sent (pipeline, threshold, sent_at) VALUES (?, ?, ?)", (pipeline, t, time.time())) + conn.commit() + +def set_budget(conn, pipeline, target): + conn.execute("INSERT OR REPLACE INTO pipeline_budgets (pipeline, target_tokens, updated_at) VALUES (?, ?, ?)", + (pipeline, int(target), time.time())) + conn.execute("DELETE FROM alerts_sent WHERE pipeline = ?", (pipeline,)) + conn.commit() + print(f"Budget set: {pipeline} = {fmt(int(target))} tokens") + +def main(): + parser = argparse.ArgumentParser(description="Token Budget Tracker") + parser.add_argument("--watch", action="store_true") + parser.add_argument("--watch-interval", type=int, default=5) + parser.add_argument("--summary", action="store_true") + parser.add_argument("--log", nargs=3, metavar=("PIPELINE", "WORKER", "TOKENS")) + parser.add_argument("--budget", nargs=2, metavar=("PIPELINE", "TARGET")) + parser.add_argument("--db", type=str, default=str(DB_PATH)) + args = parser.parse_args() + global DB_PATH + DB_PATH = Path(args.db) + conn = get_db() + if args.log: + log_usage(conn, args.log[0], args.log[1], int(args.log[2])) + print(f"Logged: {args.log[0]}/{args.log[1]} = {fmt(int(args.log[2]))} tokens") + elif args.budget: + set_budget(conn, args.budget[0], args.budget[1]) + elif args.summary: + render_daily_summary(conn) + elif args.watch: + render_watch(conn, interval=args.watch_interval) + else: + render_dashboard(conn) + conn.close() + +if __name__ == "__main__": + main()