diff --git a/scripts/token_tracker.py b/scripts/token_tracker.py new file mode 100644 index 00000000..2239af8c --- /dev/null +++ b/scripts/token_tracker.py @@ -0,0 +1,329 @@ +#!/usr/bin/env python3 +""" +token_tracker.py — Pipeline Token Budget Tracker + +Real-time token spend tracking across all pipelines with: +- SQLite store for token usage per pipeline/worker/hour +- CLI dashboard with live refresh +- Budget alerts at 50%, 80%, 100% +- Daily summary reports + +Usage: + python3 token_tracker.py --watch # Live dashboard + python3 token_tracker.py --summary # Daily summary + python3 token_tracker.py --record pipeline worker tokens # Record usage + python3 token_tracker.py --budget pipeline 200000000 # Set budget + python3 token_tracker.py --alerts # Check alerts +""" + +import argparse +import json +import os +import sqlite3 +import sys +import time +from datetime import datetime, timedelta +from pathlib import Path +from typing import Dict, List, Optional, Tuple + +DB_PATH = Path.home() / ".hermes" / "pipelines" / "token_usage.db" +BUDGETS_FILE = Path.home() / ".hermes" / "pipelines" / "budgets.json" + +# Default pipeline budgets (tokens) +DEFAULT_BUDGETS = { + "knowledge-mine": 200_000_000, + "training-factory": 215_000_000, + "playground": 16_000_000, + "adversary": 17_000_000, +} + + +def get_db(db_path: Optional[Path] = None) -> sqlite3.Connection: + """Get or create SQLite database.""" + path = db_path or DB_PATH + path.parent.mkdir(parents=True, exist_ok=True) + + conn = sqlite3.connect(str(path)) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute(""" + CREATE TABLE IF NOT EXISTS token_usage ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + pipeline TEXT NOT NULL, + worker TEXT NOT NULL, + tokens INTEGER NOT NULL, + recorded_at TEXT NOT NULL DEFAULT (datetime('now')), + hour_key TEXT GENERATED ALWAYS AS (strftime('%Y-%m-%d %H', recorded_at)) STORED + ) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_pipeline_hour + ON token_usage(pipeline, hour_key) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_recorded_at + ON token_usage(recorded_at) + """) + conn.commit() + return conn + + +def record_usage(conn: sqlite3.Connection, pipeline: str, worker: str, tokens: int): + """Record token usage.""" + conn.execute( + "INSERT INTO token_usage (pipeline, worker, tokens) VALUES (?, ?, ?)", + (pipeline, worker, tokens) + ) + conn.commit() + + +def get_usage_since(conn: sqlite3.Connection, since: str) -> Dict[str, int]: + """Get total tokens per pipeline since a datetime.""" + cursor = conn.execute(""" + SELECT pipeline, SUM(tokens) as total + FROM token_usage + WHERE recorded_at >= ? + GROUP BY pipeline + ORDER BY total DESC + """, (since,)) + return {row[0]: row[1] for row in cursor.fetchall()} + + +def get_hourly_usage(conn: sqlite3.Connection, pipeline: str, hours: int = 24) -> List[Tuple[str, int]]: + """Get hourly token usage for a pipeline.""" + since = (datetime.utcnow() - timedelta(hours=hours)).isoformat() + cursor = conn.execute(""" + SELECT hour_key, SUM(tokens) as total + FROM token_usage + WHERE pipeline = ? AND recorded_at >= ? + GROUP BY hour_key + ORDER BY hour_key + """, (pipeline, since)) + return cursor.fetchall() + + +def get_worker_usage(conn: sqlite3.Connection, pipeline: str, since: str) -> Dict[str, int]: + """Get per-worker token usage for a pipeline.""" + cursor = conn.execute(""" + SELECT worker, SUM(tokens) as total + FROM token_usage + WHERE pipeline = ? AND recorded_at >= ? + GROUP BY worker + ORDER BY total DESC + """, (pipeline, since)) + return {row[0]: row[1] for row in cursor.fetchall()} + + +def load_budgets() -> Dict[str, int]: + """Load pipeline budgets.""" + if BUDGETS_FILE.exists(): + with open(BUDGETS_FILE) as f: + return json.load(f) + return DEFAULT_BUDGETS.copy() + + +def save_budgets(budgets: Dict[str, int]): + """Save pipeline budgets.""" + BUDGETS_FILE.parent.mkdir(parents=True, exist_ok=True) + with open(BUDGETS_FILE, 'w') as f: + json.dump(budgets, f, indent=2) + + +def format_tokens(tokens: int) -> str: + """Format token count for display.""" + if tokens >= 1_000_000_000: + return f"{tokens / 1_000_000_000:.1f}B" + if tokens >= 1_000_000: + return f"{tokens / 1_000_000:.1f}M" + if tokens >= 1_000: + return f"{tokens / 1_000:.1f}K" + return str(tokens) + + +def progress_bar(used: int, target: int, width: int = 10) -> str: + """Generate a progress bar.""" + if target == 0: + return "░" * width + ratio = min(used / target, 1.0) + filled = int(ratio * width) + return "█" * filled + "░" * (width - filled) + + +def estimate_eta(used: int, target: int, hours_elapsed: float) -> str: + """Estimate time to completion.""" + if hours_elapsed <= 0 or used <= 0: + return "N/A" + rate = used / hours_elapsed + remaining = target - used + if remaining <= 0: + return "DONE" + eta_hours = remaining / rate + if eta_hours >= 1: + return f"{eta_hours:.1f}h" + return f"{eta_hours * 60:.0f}m" + + +def render_dashboard(conn: sqlite3.Connection, budgets: Dict[str, int]): + """Render the live dashboard.""" + today = datetime.utcnow().strftime("%Y-%m-%d") + usage = get_usage_since(conn, f"{today}T00:00:00") + + print("\033[2J\033[H") # Clear screen + print("=" * 70) + print(" TOKEN BUDGET TRACKER") + print(f" {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}") + print("=" * 70) + print(f" {'Pipeline':<20} {'Used':>10} {'Target':>10} {'Progress':>12} {'ETA':>8}") + print("-" * 70) + + total_used = 0 + total_target = 0 + + for pipeline, budget in sorted(budgets.items()): + used = usage.get(pipeline, 0) + total_used += used + total_target += budget + + bar = progress_bar(used, budget) + pct = (used / budget * 100) if budget > 0 else 0 + + # Estimate ETA based on current hour's rate + hour_key = datetime.utcnow().strftime("%Y-%m-%d %H") + hourly = get_hourly_usage(conn, pipeline, hours=1) + current_hour_rate = hourly[0][1] if hourly else 0 + remaining = budget - used + eta = estimate_eta(used, budget, 1) if current_hour_rate > 0 else "N/A" + + print(f" {pipeline:<20} {format_tokens(used):>10} {format_tokens(budget):>10} {bar} {pct:>5.1f}% {eta:>8}") + + print("-" * 70) + total_bar = progress_bar(total_used, total_target) + total_pct = (total_used / total_target * 100) if total_target > 0 else 0 + print(f" {'TOTAL':<20} {format_tokens(total_used):>10} {format_tokens(total_target):>10} {total_bar} {total_pct:>5.1f}%") + print("=" * 70) + + # Alerts + alerts = check_alerts(usage, budgets) + if alerts: + print("\n ⚠️ ALERTS:") + for alert in alerts: + print(f" {alert}") + print() + + +def check_alerts(usage: Dict[str, int], budgets: Dict[str, int]) -> List[str]: + """Check budget alerts.""" + alerts = [] + thresholds = [50, 80, 100] + + for pipeline, budget in budgets.items(): + used = usage.get(pipeline, 0) + pct = (used / budget * 100) if budget > 0 else 0 + + for threshold in thresholds: + if pct >= threshold: + level = "🔴" if threshold == 100 else "🟡" if threshold == 80 else "🟢" + alerts.append(f"{level} {pipeline}: {pct:.1f}% used ({format_tokens(used)}/{format_tokens(budget)})") + + return alerts + + +def daily_summary(conn: sqlite3.Connection, budgets: Dict[str, int], date: Optional[str] = None): + """Generate daily summary report.""" + if date is None: + date = datetime.utcnow().strftime("%Y-%m-%d") + + start = f"{date}T00:00:00" + end = f"{date}T23:59:59" + + usage = get_usage_since(conn, start) + + print(f"\n{'='*60}") + print(f" DAILY SUMMARY — {date}") + print(f"{'='*60}") + + total = 0 + for pipeline, budget in sorted(budgets.items()): + used = usage.get(pipeline, 0) + total += used + pct = (used / budget * 100) if budget > 0 else 0 + print(f" {pipeline:<20} {format_tokens(used):>10} / {format_tokens(budget):>10} ({pct:.1f}%)") + + # Per-worker breakdown + workers = get_worker_usage(conn, pipeline, start) + for worker, wtokens in list(workers.items())[:5]: + print(f" └─ {worker}: {format_tokens(wtokens)}") + + print(f"{'─'*60}") + print(f" {'TOTAL':<20} {format_tokens(total):>10}") + print(f"{'='*60}\n") + + +def watch_mode(conn: sqlite3.Connection, budgets: Dict[str, int], interval: int = 5): + """Live dashboard with refresh.""" + try: + while True: + render_dashboard(conn, budgets) + time.sleep(interval) + except KeyboardInterrupt: + print("\nStopped.") + + +def main(): + parser = argparse.ArgumentParser(description="Pipeline Token Budget Tracker") + parser.add_argument("--db", help="SQLite database path") + parser.add_argument("--watch", action="store_true", help="Live dashboard") + parser.add_argument("--refresh", type=int, default=5, help="Dashboard refresh interval (seconds)") + parser.add_argument("--summary", action="store_true", help="Daily summary") + parser.add_argument("--date", help="Date for summary (YYYY-MM-DD)") + parser.add_argument("--record", nargs=3, metavar=("PIPELINE", "WORKER", "TOKENS"), + help="Record token usage") + parser.add_argument("--budget", nargs=2, metavar=("PIPELINE", "TOKENS"), + help="Set pipeline budget") + parser.add_argument("--budgets-file", help="Budgets JSON file path") + parser.add_argument("--alerts", action="store_true", help="Check alerts only") + parser.add_argument("--json", action="store_true", help="JSON output") + args = parser.parse_args() + + db_path = Path(args.db) if args.db else None + conn = get_db(db_path) + + if args.budgets_file: + global BUDGETS_FILE + BUDGETS_FILE = Path(args.budgets_file) + + budgets = load_budgets() + + if args.record: + pipeline, worker, tokens = args.record + record_usage(conn, pipeline, worker, int(tokens)) + print(f"Recorded: {pipeline}/{worker} = {int(tokens)} tokens") + + elif args.budget: + pipeline, tokens = args.budget + budgets[pipeline] = int(tokens) + save_budgets(budgets) + print(f"Budget set: {pipeline} = {int(tokens)} tokens") + + elif args.alerts: + today = datetime.utcnow().strftime("%Y-%m-%d") + usage = get_usage_since(conn, f"{today}T00:00:00") + alerts = check_alerts(usage, budgets) + if alerts: + for a in alerts: + print(a) + sys.exit(1) + else: + print("No alerts.") + + elif args.summary: + daily_summary(conn, budgets, args.date) + + elif args.watch: + watch_mode(conn, budgets, args.refresh) + + else: + render_dashboard(conn, budgets) + + +if __name__ == "__main__": + main()