Compare commits

...

1 Commits

Author SHA1 Message Date
Alexander Whitestone
7f2b271225 feat: token budget tracker with real-time dashboard (#622)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 20s
Smoke Test / smoke (pull_request) Failing after 23s
Validate Config / YAML Lint (pull_request) Failing after 20s
Validate Config / JSON Validate (pull_request) Successful in 31s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m8s
Validate Config / Shell Script Lint (pull_request) Failing after 33s
Validate Config / Cron Syntax Check (pull_request) Successful in 9s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 11s
Validate Config / Playbook Schema Validation (pull_request) Successful in 33s
PR Checklist / pr-checklist (pull_request) Failing after 7m20s
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled
scripts/token_tracker.py:
  SQLite store for token usage per pipeline/worker/hour
  CLI dashboard: --watch (live), --summary (daily)
  Budget alerts at 50%, 80%, 100% thresholds
  Progress bars + ETA estimation
  --record pipeline worker tokens
  --budget pipeline tokens
  --alerts (check only)
  Default budgets: knowledge-mine 200M, training-factory 215M,
    playground 16M, adversary 17M (448M total)

tests/test_token_tracker.py: 23 tests
  record_usage, get_usage_since, get_worker_usage
  format_tokens (B/M/K formatting)
  progress_bar (empty/half/full/over)
  estimate_eta (done/hours/minutes/no data)
  check_alerts (no alerts, 50%, 80%, 100%, over-budget)
2026-04-15 21:06:40 -04:00
2 changed files with 488 additions and 0 deletions

329
scripts/token_tracker.py Normal file
View File

@@ -0,0 +1,329 @@
#!/usr/bin/env python3
"""
token_tracker.py — Pipeline Token Budget Tracker
Real-time token spend tracking across all pipelines with:
- SQLite store for token usage per pipeline/worker/hour
- CLI dashboard with live refresh
- Budget alerts at 50%, 80%, 100%
- Daily summary reports
Usage:
python3 token_tracker.py --watch # Live dashboard
python3 token_tracker.py --summary # Daily summary
python3 token_tracker.py --record pipeline worker tokens # Record usage
python3 token_tracker.py --budget pipeline 200000000 # Set budget
python3 token_tracker.py --alerts # Check alerts
"""
import argparse
import json
import os
import sqlite3
import sys
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Tuple
DB_PATH = Path.home() / ".hermes" / "pipelines" / "token_usage.db"
BUDGETS_FILE = Path.home() / ".hermes" / "pipelines" / "budgets.json"
# Default pipeline budgets (tokens)
DEFAULT_BUDGETS = {
"knowledge-mine": 200_000_000,
"training-factory": 215_000_000,
"playground": 16_000_000,
"adversary": 17_000_000,
}
def get_db(db_path: Optional[Path] = None) -> sqlite3.Connection:
"""Get or create SQLite database."""
path = db_path or DB_PATH
path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(path))
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("""
CREATE TABLE IF NOT EXISTS token_usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline TEXT NOT NULL,
worker TEXT NOT NULL,
tokens INTEGER NOT NULL,
recorded_at TEXT NOT NULL DEFAULT (datetime('now')),
hour_key TEXT GENERATED ALWAYS AS (strftime('%Y-%m-%d %H', recorded_at)) STORED
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_pipeline_hour
ON token_usage(pipeline, hour_key)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_recorded_at
ON token_usage(recorded_at)
""")
conn.commit()
return conn
def record_usage(conn: sqlite3.Connection, pipeline: str, worker: str, tokens: int):
"""Record token usage."""
conn.execute(
"INSERT INTO token_usage (pipeline, worker, tokens) VALUES (?, ?, ?)",
(pipeline, worker, tokens)
)
conn.commit()
def get_usage_since(conn: sqlite3.Connection, since: str) -> Dict[str, int]:
"""Get total tokens per pipeline since a datetime."""
cursor = conn.execute("""
SELECT pipeline, SUM(tokens) as total
FROM token_usage
WHERE recorded_at >= ?
GROUP BY pipeline
ORDER BY total DESC
""", (since,))
return {row[0]: row[1] for row in cursor.fetchall()}
def get_hourly_usage(conn: sqlite3.Connection, pipeline: str, hours: int = 24) -> List[Tuple[str, int]]:
"""Get hourly token usage for a pipeline."""
since = (datetime.utcnow() - timedelta(hours=hours)).isoformat()
cursor = conn.execute("""
SELECT hour_key, SUM(tokens) as total
FROM token_usage
WHERE pipeline = ? AND recorded_at >= ?
GROUP BY hour_key
ORDER BY hour_key
""", (pipeline, since))
return cursor.fetchall()
def get_worker_usage(conn: sqlite3.Connection, pipeline: str, since: str) -> Dict[str, int]:
"""Get per-worker token usage for a pipeline."""
cursor = conn.execute("""
SELECT worker, SUM(tokens) as total
FROM token_usage
WHERE pipeline = ? AND recorded_at >= ?
GROUP BY worker
ORDER BY total DESC
""", (pipeline, since))
return {row[0]: row[1] for row in cursor.fetchall()}
def load_budgets() -> Dict[str, int]:
"""Load pipeline budgets."""
if BUDGETS_FILE.exists():
with open(BUDGETS_FILE) as f:
return json.load(f)
return DEFAULT_BUDGETS.copy()
def save_budgets(budgets: Dict[str, int]):
"""Save pipeline budgets."""
BUDGETS_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(BUDGETS_FILE, 'w') as f:
json.dump(budgets, f, indent=2)
def format_tokens(tokens: int) -> str:
"""Format token count for display."""
if tokens >= 1_000_000_000:
return f"{tokens / 1_000_000_000:.1f}B"
if tokens >= 1_000_000:
return f"{tokens / 1_000_000:.1f}M"
if tokens >= 1_000:
return f"{tokens / 1_000:.1f}K"
return str(tokens)
def progress_bar(used: int, target: int, width: int = 10) -> str:
"""Generate a progress bar."""
if target == 0:
return "" * width
ratio = min(used / target, 1.0)
filled = int(ratio * width)
return "" * filled + "" * (width - filled)
def estimate_eta(used: int, target: int, hours_elapsed: float) -> str:
"""Estimate time to completion."""
if hours_elapsed <= 0 or used <= 0:
return "N/A"
rate = used / hours_elapsed
remaining = target - used
if remaining <= 0:
return "DONE"
eta_hours = remaining / rate
if eta_hours >= 1:
return f"{eta_hours:.1f}h"
return f"{eta_hours * 60:.0f}m"
def render_dashboard(conn: sqlite3.Connection, budgets: Dict[str, int]):
"""Render the live dashboard."""
today = datetime.utcnow().strftime("%Y-%m-%d")
usage = get_usage_since(conn, f"{today}T00:00:00")
print("\033[2J\033[H") # Clear screen
print("=" * 70)
print(" TOKEN BUDGET TRACKER")
print(f" {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}")
print("=" * 70)
print(f" {'Pipeline':<20} {'Used':>10} {'Target':>10} {'Progress':>12} {'ETA':>8}")
print("-" * 70)
total_used = 0
total_target = 0
for pipeline, budget in sorted(budgets.items()):
used = usage.get(pipeline, 0)
total_used += used
total_target += budget
bar = progress_bar(used, budget)
pct = (used / budget * 100) if budget > 0 else 0
# Estimate ETA based on current hour's rate
hour_key = datetime.utcnow().strftime("%Y-%m-%d %H")
hourly = get_hourly_usage(conn, pipeline, hours=1)
current_hour_rate = hourly[0][1] if hourly else 0
remaining = budget - used
eta = estimate_eta(used, budget, 1) if current_hour_rate > 0 else "N/A"
print(f" {pipeline:<20} {format_tokens(used):>10} {format_tokens(budget):>10} {bar} {pct:>5.1f}% {eta:>8}")
print("-" * 70)
total_bar = progress_bar(total_used, total_target)
total_pct = (total_used / total_target * 100) if total_target > 0 else 0
print(f" {'TOTAL':<20} {format_tokens(total_used):>10} {format_tokens(total_target):>10} {total_bar} {total_pct:>5.1f}%")
print("=" * 70)
# Alerts
alerts = check_alerts(usage, budgets)
if alerts:
print("\n ⚠️ ALERTS:")
for alert in alerts:
print(f" {alert}")
print()
def check_alerts(usage: Dict[str, int], budgets: Dict[str, int]) -> List[str]:
"""Check budget alerts."""
alerts = []
thresholds = [50, 80, 100]
for pipeline, budget in budgets.items():
used = usage.get(pipeline, 0)
pct = (used / budget * 100) if budget > 0 else 0
for threshold in thresholds:
if pct >= threshold:
level = "🔴" if threshold == 100 else "🟡" if threshold == 80 else "🟢"
alerts.append(f"{level} {pipeline}: {pct:.1f}% used ({format_tokens(used)}/{format_tokens(budget)})")
return alerts
def daily_summary(conn: sqlite3.Connection, budgets: Dict[str, int], date: Optional[str] = None):
"""Generate daily summary report."""
if date is None:
date = datetime.utcnow().strftime("%Y-%m-%d")
start = f"{date}T00:00:00"
end = f"{date}T23:59:59"
usage = get_usage_since(conn, start)
print(f"\n{'='*60}")
print(f" DAILY SUMMARY — {date}")
print(f"{'='*60}")
total = 0
for pipeline, budget in sorted(budgets.items()):
used = usage.get(pipeline, 0)
total += used
pct = (used / budget * 100) if budget > 0 else 0
print(f" {pipeline:<20} {format_tokens(used):>10} / {format_tokens(budget):>10} ({pct:.1f}%)")
# Per-worker breakdown
workers = get_worker_usage(conn, pipeline, start)
for worker, wtokens in list(workers.items())[:5]:
print(f" └─ {worker}: {format_tokens(wtokens)}")
print(f"{''*60}")
print(f" {'TOTAL':<20} {format_tokens(total):>10}")
print(f"{'='*60}\n")
def watch_mode(conn: sqlite3.Connection, budgets: Dict[str, int], interval: int = 5):
"""Live dashboard with refresh."""
try:
while True:
render_dashboard(conn, budgets)
time.sleep(interval)
except KeyboardInterrupt:
print("\nStopped.")
def main():
parser = argparse.ArgumentParser(description="Pipeline Token Budget Tracker")
parser.add_argument("--db", help="SQLite database path")
parser.add_argument("--watch", action="store_true", help="Live dashboard")
parser.add_argument("--refresh", type=int, default=5, help="Dashboard refresh interval (seconds)")
parser.add_argument("--summary", action="store_true", help="Daily summary")
parser.add_argument("--date", help="Date for summary (YYYY-MM-DD)")
parser.add_argument("--record", nargs=3, metavar=("PIPELINE", "WORKER", "TOKENS"),
help="Record token usage")
parser.add_argument("--budget", nargs=2, metavar=("PIPELINE", "TOKENS"),
help="Set pipeline budget")
parser.add_argument("--budgets-file", help="Budgets JSON file path")
parser.add_argument("--alerts", action="store_true", help="Check alerts only")
parser.add_argument("--json", action="store_true", help="JSON output")
args = parser.parse_args()
db_path = Path(args.db) if args.db else None
conn = get_db(db_path)
if args.budgets_file:
global BUDGETS_FILE
BUDGETS_FILE = Path(args.budgets_file)
budgets = load_budgets()
if args.record:
pipeline, worker, tokens = args.record
record_usage(conn, pipeline, worker, int(tokens))
print(f"Recorded: {pipeline}/{worker} = {int(tokens)} tokens")
elif args.budget:
pipeline, tokens = args.budget
budgets[pipeline] = int(tokens)
save_budgets(budgets)
print(f"Budget set: {pipeline} = {int(tokens)} tokens")
elif args.alerts:
today = datetime.utcnow().strftime("%Y-%m-%d")
usage = get_usage_since(conn, f"{today}T00:00:00")
alerts = check_alerts(usage, budgets)
if alerts:
for a in alerts:
print(a)
sys.exit(1)
else:
print("No alerts.")
elif args.summary:
daily_summary(conn, budgets, args.date)
elif args.watch:
watch_mode(conn, budgets, args.refresh)
else:
render_dashboard(conn, budgets)
if __name__ == "__main__":
main()

159
tests/test_token_tracker.py Normal file
View File

@@ -0,0 +1,159 @@
"""
Tests for scripts/token_tracker.py — Token Budget Tracker.
"""
import json
import os
import sqlite3
import tempfile
import unittest
from pathlib import Path
import sys
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from token_tracker import (
get_db,
record_usage,
get_usage_since,
get_hourly_usage,
get_worker_usage,
format_tokens,
progress_bar,
estimate_eta,
check_alerts,
load_budgets,
save_budgets,
)
class TestTokenTracker(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.db_path = Path(self.tmpdir) / "test.db"
self.conn = get_db(self.db_path)
def tearDown(self):
self.conn.close()
def test_record_usage(self):
record_usage(self.conn, "pipeline1", "worker1", 1000)
cursor = self.conn.execute("SELECT pipeline, worker, tokens FROM token_usage")
row = cursor.fetchone()
self.assertEqual(row, ("pipeline1", "worker1", 1000))
def test_get_usage_since(self):
record_usage(self.conn, "p1", "w1", 500)
record_usage(self.conn, "p1", "w2", 300)
record_usage(self.conn, "p2", "w1", 200)
usage = get_usage_since(self.conn, "2020-01-01T00:00:00")
self.assertEqual(usage["p1"], 800)
self.assertEqual(usage["p2"], 200)
def test_get_worker_usage(self):
record_usage(self.conn, "p1", "w1", 500)
record_usage(self.conn, "p1", "w2", 300)
record_usage(self.conn, "p1", "w1", 100)
workers = get_worker_usage(self.conn, "p1", "2020-01-01T00:00:00")
self.assertEqual(workers["w1"], 600)
self.assertEqual(workers["w2"], 300)
class TestFormatTokens(unittest.TestCase):
def test_billions(self):
self.assertEqual(format_tokens(1_500_000_000), "1.5B")
def test_millions(self):
self.assertEqual(format_tokens(45_200_000), "45.2M")
def test_thousands(self):
self.assertEqual(format_tokens(1_500), "1.5K")
def test_small(self):
self.assertEqual(format_tokens(42), "42")
def test_zero(self):
self.assertEqual(format_tokens(0), "0")
class TestProgressBar(unittest.TestCase):
def test_empty(self):
self.assertEqual(progress_bar(0, 100), "" * 10)
def test_half(self):
bar = progress_bar(50, 100)
self.assertEqual(bar, "█████░░░░░")
def test_full(self):
self.assertEqual(progress_bar(100, 100), "" * 10)
def test_overfull(self):
self.assertEqual(progress_bar(150, 100), "" * 10)
def test_zero_target(self):
self.assertEqual(progress_bar(0, 0), "" * 10)
class TestEstimateEta(unittest.TestCase):
def test_done(self):
self.assertEqual(estimate_eta(100, 100, 1), "DONE")
def test_hours(self):
eta = estimate_eta(50, 100, 1)
self.assertEqual(eta, "1.0h")
def test_minutes(self):
eta = estimate_eta(90, 100, 1)
self.assertIn("m", eta) # Should be in minutes format
def test_no_data(self):
self.assertEqual(estimate_eta(0, 100, 1), "N/A")
class TestCheckAlerts(unittest.TestCase):
def test_no_alerts(self):
usage = {"p1": 100}
budgets = {"p1": 1000}
alerts = check_alerts(usage, budgets)
self.assertEqual(alerts, [])
def test_50_percent(self):
usage = {"p1": 500}
budgets = {"p1": 1000}
alerts = check_alerts(usage, budgets)
self.assertTrue(any("50" in a for a in alerts))
def test_80_percent(self):
usage = {"p1": 800}
budgets = {"p1": 1000}
alerts = check_alerts(usage, budgets)
self.assertTrue(any("80" in a for a in alerts))
def test_100_percent(self):
usage = {"p1": 1000}
budgets = {"p1": 1000}
alerts = check_alerts(usage, budgets)
self.assertTrue(any("100" in a for a in alerts))
def test_over_budget(self):
usage = {"p1": 1500}
budgets = {"p1": 1000}
alerts = check_alerts(usage, budgets)
self.assertTrue(len(alerts) >= 3) # 50%, 80%, 100% all triggered
self.assertTrue(any("🔴" in a for a in alerts))
class TestBudgets(unittest.TestCase):
def test_save_load(self):
tmpfile = tempfile.mktemp(suffix=".json")
budgets = {"p1": 100, "p2": 200}
save_budgets(budgets)
# Reset and reload
from token_tracker import BUDGETS_FILE
loaded = load_budgets()
self.assertIn("p1", loaded)
if __name__ == "__main__":
unittest.main()