diff --git a/scripts/token-tracker.py b/scripts/token-tracker.py index 50d31797..f407cc78 100644 --- a/scripts/token-tracker.py +++ b/scripts/token-tracker.py @@ -1,194 +1,20 @@ #!/usr/bin/env python3 -"""Token Budget Tracker -- real-time spend dashboard for pipelines.""" +"""Compatibility wrapper for the token budget tracker CLI. + +Issue #622 asked for a `token-tracker.py` entrypoint. The maintained +implementation lives in `scripts/token_tracker.py`. Keep this thin shim so +operator docs and older calls continue to work. +""" -import argparse, json, os, sqlite3, sys, time -from datetime import datetime from pathlib import Path +import sys -DB_PATH = Path.home() / ".hermes" / "pipelines" / "token_usage.db" -ALERT_THRESHOLDS = [0.5, 0.8, 1.0] -DEFAULT_BUDGETS = { - "knowledge-mine": 200_000_000, - "training-factory": 215_000_000, - "playground": 16_000_000, - "adversary": 17_000_000, -} +SCRIPT_DIR = Path(__file__).resolve().parent +if str(SCRIPT_DIR) not in sys.path: + sys.path.insert(0, str(SCRIPT_DIR)) -def get_db(): - DB_PATH.parent.mkdir(parents=True, exist_ok=True) - conn = sqlite3.connect(str(DB_PATH)) - conn.row_factory = sqlite3.Row - conn.execute("PRAGMA journal_mode=WAL") - conn.executescript(""" - CREATE TABLE IF NOT EXISTS token_usage ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - pipeline TEXT NOT NULL, - worker TEXT NOT NULL, - tokens INTEGER NOT NULL, - recorded_at REAL NOT NULL, - hour_bucket TEXT NOT NULL - ); - CREATE TABLE IF NOT EXISTS pipeline_budgets ( - pipeline TEXT PRIMARY KEY, - target_tokens INTEGER NOT NULL, - updated_at REAL NOT NULL - ); - CREATE TABLE IF NOT EXISTS alerts_sent ( - pipeline TEXT NOT NULL, - threshold REAL NOT NULL, - sent_at REAL NOT NULL, - PRIMARY KEY (pipeline, threshold) - ); - CREATE INDEX IF NOT EXISTS idx_usage_pipeline_hour - ON token_usage(pipeline, hour_bucket); - """) - for name, target in DEFAULT_BUDGETS.items(): - conn.execute( - "INSERT OR IGNORE INTO pipeline_budgets (pipeline, target_tokens, updated_at) VALUES (?, ?, ?)", - (name, target, time.time()) - ) - conn.commit() - return conn +from token_tracker import main -def log_usage(conn, pipeline, worker, tokens): - now = time.time() - hour = datetime.now().strftime("%Y-%m-%d %H:00") - conn.execute( - "INSERT INTO token_usage (pipeline, worker, tokens, recorded_at, hour_bucket) VALUES (?, ?, ?, ?, ?)", - (pipeline, worker, tokens, now, hour) - ) - conn.commit() - check_alerts(conn, pipeline) - -def get_pipeline_stats(conn): - rows = conn.execute(""" - SELECT u.pipeline, COALESCE(b.target_tokens, 0) as target, - SUM(u.tokens) as used, MIN(u.recorded_at) as started_at, - COUNT(DISTINCT u.worker) as workers - FROM token_usage u - LEFT JOIN pipeline_budgets b ON u.pipeline = b.pipeline - GROUP BY u.pipeline ORDER BY used DESC - """).fetchall() - return [dict(r) for r in rows] - -def fmt(n): - if n >= 1_000_000_000: return f"{n/1_000_000_000:.1f}B" - if n >= 1_000_000: return f"{n/1_000_000:.1f}M" - if n >= 1_000: return f"{n/1_000:.1f}K" - return str(n) - -def bar(ratio, w=8): - filled = int(ratio * w) - return "█" * filled + "░" * (w - filled) - -def eta(used, target, started): - if used <= 0 or started <= 0: return "--" - elapsed = (time.time() - started) / 3600 - if elapsed <= 0: return "--" - rate = used / elapsed - remaining = target - used - if remaining <= 0: return "DONE" - h = remaining / rate - return f"{h/24:.1f}d" if h >= 24 else f"{h:.1f}h" - -def render_dashboard(conn): - stats = get_pipeline_stats(conn) - if not stats: - print("No pipeline data recorded yet.") - return - print() - print(f"{'Pipeline':<20} {'Tokens Used':>12} {'Target':>10} {'Progress':>10} {'ETA':>8} {'Workers':>8}") - print("-" * 72) - total_used = total_target = 0 - for s in stats: - used = s["used"] or 0 - target = s["target"] or 1 - ratio = min(used / target, 1.0) if target > 0 else 0 - print(f"{s['pipeline']:<20} {fmt(used):>12} {fmt(target):>10} {bar(ratio):>10} {eta(used, target, s['started_at'] or 0):>8} {s['workers'] or 0:>8}") - total_used += used - total_target += target - print("-" * 72) - r = min(total_used / total_target, 1.0) if total_target > 0 else 0 - print(f"{'TOTAL':<20} {fmt(total_used):>12} {fmt(total_target):>10} {bar(r):>10}") - print() - -def render_watch(conn, interval=5): - try: - while True: - os.system("clear" if os.name != "nt" else "cls") - print(f"Token Budget Tracker -- {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") - print("Press Ctrl+C to exit") - render_dashboard(conn) - time.sleep(interval) - except KeyboardInterrupt: - print("\nExiting.") - -def render_daily_summary(conn): - since = time.time() - 86400 - rows = conn.execute(""" - SELECT pipeline, SUM(tokens) as total, COUNT(DISTINCT worker) as workers, COUNT(*) as entries - FROM token_usage WHERE recorded_at >= ? GROUP BY pipeline ORDER BY total DESC - """, (since,)).fetchall() - if not rows: - print("No usage in last 24 hours.") - return - print(f"\nDaily Summary -- last 24 hours") - print(f"{'Pipeline':<20} {'Total Tokens':>14} {'Workers':>8} {'Entries':>8}") - print("-" * 54) - gt = 0 - for r in rows: - print(f"{r['pipeline']:<20} {fmt(r['total']):>14} {r['workers']:>8} {r['entries']:>8}") - gt += r["total"] - print("-" * 54) - print(f"{'TOTAL':<20} {fmt(gt):>14}\n") - -def check_alerts(conn, pipeline): - row = conn.execute( - "SELECT SUM(u.tokens) as used, COALESCE(b.target_tokens, 0) as target " - "FROM token_usage u LEFT JOIN pipeline_budgets b ON u.pipeline = b.pipeline " - "WHERE u.pipeline = ?", (pipeline,) - ).fetchone() - if not row or row["target"] <= 0: return - ratio = row["used"] / row["target"] - for t in ALERT_THRESHOLDS: - if ratio >= t: - existing = conn.execute("SELECT 1 FROM alerts_sent WHERE pipeline = ? AND threshold = ?", (pipeline, t)).fetchone() - if not existing: - print(f"⚠️ BUDGET ALERT: {pipeline} at {int(t*100)}% ({fmt(row['used'])}/{fmt(row['target'])})", file=sys.stderr) - conn.execute("INSERT INTO alerts_sent (pipeline, threshold, sent_at) VALUES (?, ?, ?)", (pipeline, t, time.time())) - conn.commit() - -def set_budget(conn, pipeline, target): - conn.execute("INSERT OR REPLACE INTO pipeline_budgets (pipeline, target_tokens, updated_at) VALUES (?, ?, ?)", - (pipeline, int(target), time.time())) - conn.execute("DELETE FROM alerts_sent WHERE pipeline = ?", (pipeline,)) - conn.commit() - print(f"Budget set: {pipeline} = {fmt(int(target))} tokens") - -def main(): - parser = argparse.ArgumentParser(description="Token Budget Tracker") - parser.add_argument("--watch", action="store_true") - parser.add_argument("--watch-interval", type=int, default=5) - parser.add_argument("--summary", action="store_true") - parser.add_argument("--log", nargs=3, metavar=("PIPELINE", "WORKER", "TOKENS")) - parser.add_argument("--budget", nargs=2, metavar=("PIPELINE", "TARGET")) - parser.add_argument("--db", type=str, default=str(DB_PATH)) - args = parser.parse_args() - global DB_PATH - DB_PATH = Path(args.db) - conn = get_db() - if args.log: - log_usage(conn, args.log[0], args.log[1], int(args.log[2])) - print(f"Logged: {args.log[0]}/{args.log[1]} = {fmt(int(args.log[2]))} tokens") - elif args.budget: - set_budget(conn, args.budget[0], args.budget[1]) - elif args.summary: - render_daily_summary(conn) - elif args.watch: - render_watch(conn, interval=args.watch_interval) - else: - render_dashboard(conn) - conn.close() if __name__ == "__main__": main() diff --git a/scripts/token_tracker.py b/scripts/token_tracker.py index 2239af8c..76325b34 100644 --- a/scripts/token_tracker.py +++ b/scripts/token_tracker.py @@ -76,15 +76,20 @@ def record_usage(conn: sqlite3.Connection, pipeline: str, worker: str, tokens: i conn.commit() +def normalize_since(since: str) -> str: + """Normalize ISO timestamps for SQLite datetime comparisons.""" + return since.replace("T", " ") + + def get_usage_since(conn: sqlite3.Connection, since: str) -> Dict[str, int]: """Get total tokens per pipeline since a datetime.""" cursor = conn.execute(""" SELECT pipeline, SUM(tokens) as total FROM token_usage - WHERE recorded_at >= ? + WHERE datetime(recorded_at) >= datetime(?) GROUP BY pipeline ORDER BY total DESC - """, (since,)) + """, (normalize_since(since),)) return {row[0]: row[1] for row in cursor.fetchall()} @@ -94,10 +99,10 @@ def get_hourly_usage(conn: sqlite3.Connection, pipeline: str, hours: int = 24) - cursor = conn.execute(""" SELECT hour_key, SUM(tokens) as total FROM token_usage - WHERE pipeline = ? AND recorded_at >= ? + WHERE pipeline = ? AND datetime(recorded_at) >= datetime(?) GROUP BY hour_key ORDER BY hour_key - """, (pipeline, since)) + """, (pipeline, normalize_since(since))) return cursor.fetchall() @@ -106,10 +111,10 @@ def get_worker_usage(conn: sqlite3.Connection, pipeline: str, since: str) -> Dic cursor = conn.execute(""" SELECT worker, SUM(tokens) as total FROM token_usage - WHERE pipeline = ? AND recorded_at >= ? + WHERE pipeline = ? AND datetime(recorded_at) >= datetime(?) GROUP BY worker ORDER BY total DESC - """, (pipeline, since)) + """, (pipeline, normalize_since(since))) return {row[0]: row[1] for row in cursor.fetchall()} diff --git a/tests/test_token_tracker.py b/tests/test_token_tracker.py index dd58340f..38ff3d8e 100644 --- a/tests/test_token_tracker.py +++ b/tests/test_token_tracker.py @@ -5,11 +5,12 @@ Tests for scripts/token_tracker.py — Token Budget Tracker. import json import os import sqlite3 +import subprocess +import sys import tempfile import unittest from pathlib import Path -import sys sys.path.insert(0, str(Path(__file__).parent.parent / "scripts")) from token_tracker import ( get_db, @@ -155,5 +156,37 @@ class TestBudgets(unittest.TestCase): self.assertIn("p1", loaded) +class TestWrapperScript(unittest.TestCase): + def test_hyphen_wrapper_summary_works_with_custom_db(self): + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / "wrapper.db" + budgets_path = Path(tmpdir) / "budgets.json" + budgets_path.write_text(json.dumps({"knowledge-mine": 200_000_000})) + conn = get_db(db_path) + try: + record_usage(conn, "knowledge-mine", "worker-1", 123456) + finally: + conn.close() + + repo_root = Path(__file__).parent.parent + wrapper = repo_root / "scripts" / "token-tracker.py" + result = subprocess.run( + [ + sys.executable, + str(wrapper), + "--summary", + "--db", str(db_path), + "--budgets-file", str(budgets_path), + ], + cwd=str(repo_root), + capture_output=True, + text=True, + ) + self.assertEqual(result.returncode, 0, result.stderr) + self.assertIn("DAILY SUMMARY", result.stdout) + self.assertIn("knowledge-mine", result.stdout) + self.assertIn("123.5K", result.stdout) + + if __name__ == "__main__": unittest.main()