Compare commits
1 Commits
burn/750-1
...
fix/622-to
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7f2b271225 |
329
scripts/token_tracker.py
Normal file
329
scripts/token_tracker.py
Normal file
@@ -0,0 +1,329 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
token_tracker.py — Pipeline Token Budget Tracker
|
||||
|
||||
Real-time token spend tracking across all pipelines with:
|
||||
- SQLite store for token usage per pipeline/worker/hour
|
||||
- CLI dashboard with live refresh
|
||||
- Budget alerts at 50%, 80%, 100%
|
||||
- Daily summary reports
|
||||
|
||||
Usage:
|
||||
python3 token_tracker.py --watch # Live dashboard
|
||||
python3 token_tracker.py --summary # Daily summary
|
||||
python3 token_tracker.py --record pipeline worker tokens # Record usage
|
||||
python3 token_tracker.py --budget pipeline 200000000 # Set budget
|
||||
python3 token_tracker.py --alerts # Check alerts
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sqlite3
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
DB_PATH = Path.home() / ".hermes" / "pipelines" / "token_usage.db"
|
||||
BUDGETS_FILE = Path.home() / ".hermes" / "pipelines" / "budgets.json"
|
||||
|
||||
# Default pipeline budgets (tokens)
|
||||
DEFAULT_BUDGETS = {
|
||||
"knowledge-mine": 200_000_000,
|
||||
"training-factory": 215_000_000,
|
||||
"playground": 16_000_000,
|
||||
"adversary": 17_000_000,
|
||||
}
|
||||
|
||||
|
||||
def get_db(db_path: Optional[Path] = None) -> sqlite3.Connection:
|
||||
"""Get or create SQLite database."""
|
||||
path = db_path or DB_PATH
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
conn = sqlite3.connect(str(path))
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS token_usage (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
pipeline TEXT NOT NULL,
|
||||
worker TEXT NOT NULL,
|
||||
tokens INTEGER NOT NULL,
|
||||
recorded_at TEXT NOT NULL DEFAULT (datetime('now')),
|
||||
hour_key TEXT GENERATED ALWAYS AS (strftime('%Y-%m-%d %H', recorded_at)) STORED
|
||||
)
|
||||
""")
|
||||
conn.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_pipeline_hour
|
||||
ON token_usage(pipeline, hour_key)
|
||||
""")
|
||||
conn.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_recorded_at
|
||||
ON token_usage(recorded_at)
|
||||
""")
|
||||
conn.commit()
|
||||
return conn
|
||||
|
||||
|
||||
def record_usage(conn: sqlite3.Connection, pipeline: str, worker: str, tokens: int):
|
||||
"""Record token usage."""
|
||||
conn.execute(
|
||||
"INSERT INTO token_usage (pipeline, worker, tokens) VALUES (?, ?, ?)",
|
||||
(pipeline, worker, tokens)
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
|
||||
def get_usage_since(conn: sqlite3.Connection, since: str) -> Dict[str, int]:
|
||||
"""Get total tokens per pipeline since a datetime."""
|
||||
cursor = conn.execute("""
|
||||
SELECT pipeline, SUM(tokens) as total
|
||||
FROM token_usage
|
||||
WHERE recorded_at >= ?
|
||||
GROUP BY pipeline
|
||||
ORDER BY total DESC
|
||||
""", (since,))
|
||||
return {row[0]: row[1] for row in cursor.fetchall()}
|
||||
|
||||
|
||||
def get_hourly_usage(conn: sqlite3.Connection, pipeline: str, hours: int = 24) -> List[Tuple[str, int]]:
|
||||
"""Get hourly token usage for a pipeline."""
|
||||
since = (datetime.utcnow() - timedelta(hours=hours)).isoformat()
|
||||
cursor = conn.execute("""
|
||||
SELECT hour_key, SUM(tokens) as total
|
||||
FROM token_usage
|
||||
WHERE pipeline = ? AND recorded_at >= ?
|
||||
GROUP BY hour_key
|
||||
ORDER BY hour_key
|
||||
""", (pipeline, since))
|
||||
return cursor.fetchall()
|
||||
|
||||
|
||||
def get_worker_usage(conn: sqlite3.Connection, pipeline: str, since: str) -> Dict[str, int]:
|
||||
"""Get per-worker token usage for a pipeline."""
|
||||
cursor = conn.execute("""
|
||||
SELECT worker, SUM(tokens) as total
|
||||
FROM token_usage
|
||||
WHERE pipeline = ? AND recorded_at >= ?
|
||||
GROUP BY worker
|
||||
ORDER BY total DESC
|
||||
""", (pipeline, since))
|
||||
return {row[0]: row[1] for row in cursor.fetchall()}
|
||||
|
||||
|
||||
def load_budgets() -> Dict[str, int]:
|
||||
"""Load pipeline budgets."""
|
||||
if BUDGETS_FILE.exists():
|
||||
with open(BUDGETS_FILE) as f:
|
||||
return json.load(f)
|
||||
return DEFAULT_BUDGETS.copy()
|
||||
|
||||
|
||||
def save_budgets(budgets: Dict[str, int]):
|
||||
"""Save pipeline budgets."""
|
||||
BUDGETS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(BUDGETS_FILE, 'w') as f:
|
||||
json.dump(budgets, f, indent=2)
|
||||
|
||||
|
||||
def format_tokens(tokens: int) -> str:
|
||||
"""Format token count for display."""
|
||||
if tokens >= 1_000_000_000:
|
||||
return f"{tokens / 1_000_000_000:.1f}B"
|
||||
if tokens >= 1_000_000:
|
||||
return f"{tokens / 1_000_000:.1f}M"
|
||||
if tokens >= 1_000:
|
||||
return f"{tokens / 1_000:.1f}K"
|
||||
return str(tokens)
|
||||
|
||||
|
||||
def progress_bar(used: int, target: int, width: int = 10) -> str:
|
||||
"""Generate a progress bar."""
|
||||
if target == 0:
|
||||
return "░" * width
|
||||
ratio = min(used / target, 1.0)
|
||||
filled = int(ratio * width)
|
||||
return "█" * filled + "░" * (width - filled)
|
||||
|
||||
|
||||
def estimate_eta(used: int, target: int, hours_elapsed: float) -> str:
|
||||
"""Estimate time to completion."""
|
||||
if hours_elapsed <= 0 or used <= 0:
|
||||
return "N/A"
|
||||
rate = used / hours_elapsed
|
||||
remaining = target - used
|
||||
if remaining <= 0:
|
||||
return "DONE"
|
||||
eta_hours = remaining / rate
|
||||
if eta_hours >= 1:
|
||||
return f"{eta_hours:.1f}h"
|
||||
return f"{eta_hours * 60:.0f}m"
|
||||
|
||||
|
||||
def render_dashboard(conn: sqlite3.Connection, budgets: Dict[str, int]):
|
||||
"""Render the live dashboard."""
|
||||
today = datetime.utcnow().strftime("%Y-%m-%d")
|
||||
usage = get_usage_since(conn, f"{today}T00:00:00")
|
||||
|
||||
print("\033[2J\033[H") # Clear screen
|
||||
print("=" * 70)
|
||||
print(" TOKEN BUDGET TRACKER")
|
||||
print(f" {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}")
|
||||
print("=" * 70)
|
||||
print(f" {'Pipeline':<20} {'Used':>10} {'Target':>10} {'Progress':>12} {'ETA':>8}")
|
||||
print("-" * 70)
|
||||
|
||||
total_used = 0
|
||||
total_target = 0
|
||||
|
||||
for pipeline, budget in sorted(budgets.items()):
|
||||
used = usage.get(pipeline, 0)
|
||||
total_used += used
|
||||
total_target += budget
|
||||
|
||||
bar = progress_bar(used, budget)
|
||||
pct = (used / budget * 100) if budget > 0 else 0
|
||||
|
||||
# Estimate ETA based on current hour's rate
|
||||
hour_key = datetime.utcnow().strftime("%Y-%m-%d %H")
|
||||
hourly = get_hourly_usage(conn, pipeline, hours=1)
|
||||
current_hour_rate = hourly[0][1] if hourly else 0
|
||||
remaining = budget - used
|
||||
eta = estimate_eta(used, budget, 1) if current_hour_rate > 0 else "N/A"
|
||||
|
||||
print(f" {pipeline:<20} {format_tokens(used):>10} {format_tokens(budget):>10} {bar} {pct:>5.1f}% {eta:>8}")
|
||||
|
||||
print("-" * 70)
|
||||
total_bar = progress_bar(total_used, total_target)
|
||||
total_pct = (total_used / total_target * 100) if total_target > 0 else 0
|
||||
print(f" {'TOTAL':<20} {format_tokens(total_used):>10} {format_tokens(total_target):>10} {total_bar} {total_pct:>5.1f}%")
|
||||
print("=" * 70)
|
||||
|
||||
# Alerts
|
||||
alerts = check_alerts(usage, budgets)
|
||||
if alerts:
|
||||
print("\n ⚠️ ALERTS:")
|
||||
for alert in alerts:
|
||||
print(f" {alert}")
|
||||
print()
|
||||
|
||||
|
||||
def check_alerts(usage: Dict[str, int], budgets: Dict[str, int]) -> List[str]:
|
||||
"""Check budget alerts."""
|
||||
alerts = []
|
||||
thresholds = [50, 80, 100]
|
||||
|
||||
for pipeline, budget in budgets.items():
|
||||
used = usage.get(pipeline, 0)
|
||||
pct = (used / budget * 100) if budget > 0 else 0
|
||||
|
||||
for threshold in thresholds:
|
||||
if pct >= threshold:
|
||||
level = "🔴" if threshold == 100 else "🟡" if threshold == 80 else "🟢"
|
||||
alerts.append(f"{level} {pipeline}: {pct:.1f}% used ({format_tokens(used)}/{format_tokens(budget)})")
|
||||
|
||||
return alerts
|
||||
|
||||
|
||||
def daily_summary(conn: sqlite3.Connection, budgets: Dict[str, int], date: Optional[str] = None):
|
||||
"""Generate daily summary report."""
|
||||
if date is None:
|
||||
date = datetime.utcnow().strftime("%Y-%m-%d")
|
||||
|
||||
start = f"{date}T00:00:00"
|
||||
end = f"{date}T23:59:59"
|
||||
|
||||
usage = get_usage_since(conn, start)
|
||||
|
||||
print(f"\n{'='*60}")
|
||||
print(f" DAILY SUMMARY — {date}")
|
||||
print(f"{'='*60}")
|
||||
|
||||
total = 0
|
||||
for pipeline, budget in sorted(budgets.items()):
|
||||
used = usage.get(pipeline, 0)
|
||||
total += used
|
||||
pct = (used / budget * 100) if budget > 0 else 0
|
||||
print(f" {pipeline:<20} {format_tokens(used):>10} / {format_tokens(budget):>10} ({pct:.1f}%)")
|
||||
|
||||
# Per-worker breakdown
|
||||
workers = get_worker_usage(conn, pipeline, start)
|
||||
for worker, wtokens in list(workers.items())[:5]:
|
||||
print(f" └─ {worker}: {format_tokens(wtokens)}")
|
||||
|
||||
print(f"{'─'*60}")
|
||||
print(f" {'TOTAL':<20} {format_tokens(total):>10}")
|
||||
print(f"{'='*60}\n")
|
||||
|
||||
|
||||
def watch_mode(conn: sqlite3.Connection, budgets: Dict[str, int], interval: int = 5):
|
||||
"""Live dashboard with refresh."""
|
||||
try:
|
||||
while True:
|
||||
render_dashboard(conn, budgets)
|
||||
time.sleep(interval)
|
||||
except KeyboardInterrupt:
|
||||
print("\nStopped.")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Pipeline Token Budget Tracker")
|
||||
parser.add_argument("--db", help="SQLite database path")
|
||||
parser.add_argument("--watch", action="store_true", help="Live dashboard")
|
||||
parser.add_argument("--refresh", type=int, default=5, help="Dashboard refresh interval (seconds)")
|
||||
parser.add_argument("--summary", action="store_true", help="Daily summary")
|
||||
parser.add_argument("--date", help="Date for summary (YYYY-MM-DD)")
|
||||
parser.add_argument("--record", nargs=3, metavar=("PIPELINE", "WORKER", "TOKENS"),
|
||||
help="Record token usage")
|
||||
parser.add_argument("--budget", nargs=2, metavar=("PIPELINE", "TOKENS"),
|
||||
help="Set pipeline budget")
|
||||
parser.add_argument("--budgets-file", help="Budgets JSON file path")
|
||||
parser.add_argument("--alerts", action="store_true", help="Check alerts only")
|
||||
parser.add_argument("--json", action="store_true", help="JSON output")
|
||||
args = parser.parse_args()
|
||||
|
||||
db_path = Path(args.db) if args.db else None
|
||||
conn = get_db(db_path)
|
||||
|
||||
if args.budgets_file:
|
||||
global BUDGETS_FILE
|
||||
BUDGETS_FILE = Path(args.budgets_file)
|
||||
|
||||
budgets = load_budgets()
|
||||
|
||||
if args.record:
|
||||
pipeline, worker, tokens = args.record
|
||||
record_usage(conn, pipeline, worker, int(tokens))
|
||||
print(f"Recorded: {pipeline}/{worker} = {int(tokens)} tokens")
|
||||
|
||||
elif args.budget:
|
||||
pipeline, tokens = args.budget
|
||||
budgets[pipeline] = int(tokens)
|
||||
save_budgets(budgets)
|
||||
print(f"Budget set: {pipeline} = {int(tokens)} tokens")
|
||||
|
||||
elif args.alerts:
|
||||
today = datetime.utcnow().strftime("%Y-%m-%d")
|
||||
usage = get_usage_since(conn, f"{today}T00:00:00")
|
||||
alerts = check_alerts(usage, budgets)
|
||||
if alerts:
|
||||
for a in alerts:
|
||||
print(a)
|
||||
sys.exit(1)
|
||||
else:
|
||||
print("No alerts.")
|
||||
|
||||
elif args.summary:
|
||||
daily_summary(conn, budgets, args.date)
|
||||
|
||||
elif args.watch:
|
||||
watch_mode(conn, budgets, args.refresh)
|
||||
|
||||
else:
|
||||
render_dashboard(conn, budgets)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
159
tests/test_token_tracker.py
Normal file
159
tests/test_token_tracker.py
Normal file
@@ -0,0 +1,159 @@
|
||||
"""
|
||||
Tests for scripts/token_tracker.py — Token Budget Tracker.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sqlite3
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
import sys
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
|
||||
from token_tracker import (
|
||||
get_db,
|
||||
record_usage,
|
||||
get_usage_since,
|
||||
get_hourly_usage,
|
||||
get_worker_usage,
|
||||
format_tokens,
|
||||
progress_bar,
|
||||
estimate_eta,
|
||||
check_alerts,
|
||||
load_budgets,
|
||||
save_budgets,
|
||||
)
|
||||
|
||||
|
||||
class TestTokenTracker(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.tmpdir = tempfile.mkdtemp()
|
||||
self.db_path = Path(self.tmpdir) / "test.db"
|
||||
self.conn = get_db(self.db_path)
|
||||
|
||||
def tearDown(self):
|
||||
self.conn.close()
|
||||
|
||||
def test_record_usage(self):
|
||||
record_usage(self.conn, "pipeline1", "worker1", 1000)
|
||||
cursor = self.conn.execute("SELECT pipeline, worker, tokens FROM token_usage")
|
||||
row = cursor.fetchone()
|
||||
self.assertEqual(row, ("pipeline1", "worker1", 1000))
|
||||
|
||||
def test_get_usage_since(self):
|
||||
record_usage(self.conn, "p1", "w1", 500)
|
||||
record_usage(self.conn, "p1", "w2", 300)
|
||||
record_usage(self.conn, "p2", "w1", 200)
|
||||
|
||||
usage = get_usage_since(self.conn, "2020-01-01T00:00:00")
|
||||
self.assertEqual(usage["p1"], 800)
|
||||
self.assertEqual(usage["p2"], 200)
|
||||
|
||||
def test_get_worker_usage(self):
|
||||
record_usage(self.conn, "p1", "w1", 500)
|
||||
record_usage(self.conn, "p1", "w2", 300)
|
||||
record_usage(self.conn, "p1", "w1", 100)
|
||||
|
||||
workers = get_worker_usage(self.conn, "p1", "2020-01-01T00:00:00")
|
||||
self.assertEqual(workers["w1"], 600)
|
||||
self.assertEqual(workers["w2"], 300)
|
||||
|
||||
|
||||
class TestFormatTokens(unittest.TestCase):
|
||||
def test_billions(self):
|
||||
self.assertEqual(format_tokens(1_500_000_000), "1.5B")
|
||||
|
||||
def test_millions(self):
|
||||
self.assertEqual(format_tokens(45_200_000), "45.2M")
|
||||
|
||||
def test_thousands(self):
|
||||
self.assertEqual(format_tokens(1_500), "1.5K")
|
||||
|
||||
def test_small(self):
|
||||
self.assertEqual(format_tokens(42), "42")
|
||||
|
||||
def test_zero(self):
|
||||
self.assertEqual(format_tokens(0), "0")
|
||||
|
||||
|
||||
class TestProgressBar(unittest.TestCase):
|
||||
def test_empty(self):
|
||||
self.assertEqual(progress_bar(0, 100), "░" * 10)
|
||||
|
||||
def test_half(self):
|
||||
bar = progress_bar(50, 100)
|
||||
self.assertEqual(bar, "█████░░░░░")
|
||||
|
||||
def test_full(self):
|
||||
self.assertEqual(progress_bar(100, 100), "█" * 10)
|
||||
|
||||
def test_overfull(self):
|
||||
self.assertEqual(progress_bar(150, 100), "█" * 10)
|
||||
|
||||
def test_zero_target(self):
|
||||
self.assertEqual(progress_bar(0, 0), "░" * 10)
|
||||
|
||||
|
||||
class TestEstimateEta(unittest.TestCase):
|
||||
def test_done(self):
|
||||
self.assertEqual(estimate_eta(100, 100, 1), "DONE")
|
||||
|
||||
def test_hours(self):
|
||||
eta = estimate_eta(50, 100, 1)
|
||||
self.assertEqual(eta, "1.0h")
|
||||
|
||||
def test_minutes(self):
|
||||
eta = estimate_eta(90, 100, 1)
|
||||
self.assertIn("m", eta) # Should be in minutes format
|
||||
|
||||
def test_no_data(self):
|
||||
self.assertEqual(estimate_eta(0, 100, 1), "N/A")
|
||||
|
||||
|
||||
class TestCheckAlerts(unittest.TestCase):
|
||||
def test_no_alerts(self):
|
||||
usage = {"p1": 100}
|
||||
budgets = {"p1": 1000}
|
||||
alerts = check_alerts(usage, budgets)
|
||||
self.assertEqual(alerts, [])
|
||||
|
||||
def test_50_percent(self):
|
||||
usage = {"p1": 500}
|
||||
budgets = {"p1": 1000}
|
||||
alerts = check_alerts(usage, budgets)
|
||||
self.assertTrue(any("50" in a for a in alerts))
|
||||
|
||||
def test_80_percent(self):
|
||||
usage = {"p1": 800}
|
||||
budgets = {"p1": 1000}
|
||||
alerts = check_alerts(usage, budgets)
|
||||
self.assertTrue(any("80" in a for a in alerts))
|
||||
|
||||
def test_100_percent(self):
|
||||
usage = {"p1": 1000}
|
||||
budgets = {"p1": 1000}
|
||||
alerts = check_alerts(usage, budgets)
|
||||
self.assertTrue(any("100" in a for a in alerts))
|
||||
|
||||
def test_over_budget(self):
|
||||
usage = {"p1": 1500}
|
||||
budgets = {"p1": 1000}
|
||||
alerts = check_alerts(usage, budgets)
|
||||
self.assertTrue(len(alerts) >= 3) # 50%, 80%, 100% all triggered
|
||||
self.assertTrue(any("🔴" in a for a in alerts))
|
||||
|
||||
|
||||
class TestBudgets(unittest.TestCase):
|
||||
def test_save_load(self):
|
||||
tmpfile = tempfile.mktemp(suffix=".json")
|
||||
budgets = {"p1": 100, "p2": 200}
|
||||
save_budgets(budgets)
|
||||
# Reset and reload
|
||||
from token_tracker import BUDGETS_FILE
|
||||
loaded = load_budgets()
|
||||
self.assertIn("p1", loaded)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user