Merge PR #777: scripts/token_tracker.py (added)

This commit is contained in:
Merge Bot
2026-04-16 04:58:40 +00:00
parent 872a2d3f79
commit 218b6dcb33

329
scripts/token_tracker.py Normal file
View File

@@ -0,0 +1,329 @@
#!/usr/bin/env python3
"""
token_tracker.py — Pipeline Token Budget Tracker
Real-time token spend tracking across all pipelines with:
- SQLite store for token usage per pipeline/worker/hour
- CLI dashboard with live refresh
- Budget alerts at 50%, 80%, 100%
- Daily summary reports
Usage:
python3 token_tracker.py --watch # Live dashboard
python3 token_tracker.py --summary # Daily summary
python3 token_tracker.py --record pipeline worker tokens # Record usage
python3 token_tracker.py --budget pipeline 200000000 # Set budget
python3 token_tracker.py --alerts # Check alerts
"""
import argparse
import json
import os
import sqlite3
import sys
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Tuple
DB_PATH = Path.home() / ".hermes" / "pipelines" / "token_usage.db"
BUDGETS_FILE = Path.home() / ".hermes" / "pipelines" / "budgets.json"
# Default pipeline budgets (tokens)
DEFAULT_BUDGETS = {
"knowledge-mine": 200_000_000,
"training-factory": 215_000_000,
"playground": 16_000_000,
"adversary": 17_000_000,
}
def get_db(db_path: Optional[Path] = None) -> sqlite3.Connection:
"""Get or create SQLite database."""
path = db_path or DB_PATH
path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(path))
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("""
CREATE TABLE IF NOT EXISTS token_usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline TEXT NOT NULL,
worker TEXT NOT NULL,
tokens INTEGER NOT NULL,
recorded_at TEXT NOT NULL DEFAULT (datetime('now')),
hour_key TEXT GENERATED ALWAYS AS (strftime('%Y-%m-%d %H', recorded_at)) STORED
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_pipeline_hour
ON token_usage(pipeline, hour_key)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_recorded_at
ON token_usage(recorded_at)
""")
conn.commit()
return conn
def record_usage(conn: sqlite3.Connection, pipeline: str, worker: str, tokens: int):
"""Record token usage."""
conn.execute(
"INSERT INTO token_usage (pipeline, worker, tokens) VALUES (?, ?, ?)",
(pipeline, worker, tokens)
)
conn.commit()
def get_usage_since(conn: sqlite3.Connection, since: str) -> Dict[str, int]:
"""Get total tokens per pipeline since a datetime."""
cursor = conn.execute("""
SELECT pipeline, SUM(tokens) as total
FROM token_usage
WHERE recorded_at >= ?
GROUP BY pipeline
ORDER BY total DESC
""", (since,))
return {row[0]: row[1] for row in cursor.fetchall()}
def get_hourly_usage(conn: sqlite3.Connection, pipeline: str, hours: int = 24) -> List[Tuple[str, int]]:
"""Get hourly token usage for a pipeline."""
since = (datetime.utcnow() - timedelta(hours=hours)).isoformat()
cursor = conn.execute("""
SELECT hour_key, SUM(tokens) as total
FROM token_usage
WHERE pipeline = ? AND recorded_at >= ?
GROUP BY hour_key
ORDER BY hour_key
""", (pipeline, since))
return cursor.fetchall()
def get_worker_usage(conn: sqlite3.Connection, pipeline: str, since: str) -> Dict[str, int]:
"""Get per-worker token usage for a pipeline."""
cursor = conn.execute("""
SELECT worker, SUM(tokens) as total
FROM token_usage
WHERE pipeline = ? AND recorded_at >= ?
GROUP BY worker
ORDER BY total DESC
""", (pipeline, since))
return {row[0]: row[1] for row in cursor.fetchall()}
def load_budgets() -> Dict[str, int]:
"""Load pipeline budgets."""
if BUDGETS_FILE.exists():
with open(BUDGETS_FILE) as f:
return json.load(f)
return DEFAULT_BUDGETS.copy()
def save_budgets(budgets: Dict[str, int]):
"""Save pipeline budgets."""
BUDGETS_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(BUDGETS_FILE, 'w') as f:
json.dump(budgets, f, indent=2)
def format_tokens(tokens: int) -> str:
"""Format token count for display."""
if tokens >= 1_000_000_000:
return f"{tokens / 1_000_000_000:.1f}B"
if tokens >= 1_000_000:
return f"{tokens / 1_000_000:.1f}M"
if tokens >= 1_000:
return f"{tokens / 1_000:.1f}K"
return str(tokens)
def progress_bar(used: int, target: int, width: int = 10) -> str:
"""Generate a progress bar."""
if target == 0:
return "" * width
ratio = min(used / target, 1.0)
filled = int(ratio * width)
return "" * filled + "" * (width - filled)
def estimate_eta(used: int, target: int, hours_elapsed: float) -> str:
"""Estimate time to completion."""
if hours_elapsed <= 0 or used <= 0:
return "N/A"
rate = used / hours_elapsed
remaining = target - used
if remaining <= 0:
return "DONE"
eta_hours = remaining / rate
if eta_hours >= 1:
return f"{eta_hours:.1f}h"
return f"{eta_hours * 60:.0f}m"
def render_dashboard(conn: sqlite3.Connection, budgets: Dict[str, int]):
"""Render the live dashboard."""
today = datetime.utcnow().strftime("%Y-%m-%d")
usage = get_usage_since(conn, f"{today}T00:00:00")
print("\033[2J\033[H") # Clear screen
print("=" * 70)
print(" TOKEN BUDGET TRACKER")
print(f" {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}")
print("=" * 70)
print(f" {'Pipeline':<20} {'Used':>10} {'Target':>10} {'Progress':>12} {'ETA':>8}")
print("-" * 70)
total_used = 0
total_target = 0
for pipeline, budget in sorted(budgets.items()):
used = usage.get(pipeline, 0)
total_used += used
total_target += budget
bar = progress_bar(used, budget)
pct = (used / budget * 100) if budget > 0 else 0
# Estimate ETA based on current hour's rate
hour_key = datetime.utcnow().strftime("%Y-%m-%d %H")
hourly = get_hourly_usage(conn, pipeline, hours=1)
current_hour_rate = hourly[0][1] if hourly else 0
remaining = budget - used
eta = estimate_eta(used, budget, 1) if current_hour_rate > 0 else "N/A"
print(f" {pipeline:<20} {format_tokens(used):>10} {format_tokens(budget):>10} {bar} {pct:>5.1f}% {eta:>8}")
print("-" * 70)
total_bar = progress_bar(total_used, total_target)
total_pct = (total_used / total_target * 100) if total_target > 0 else 0
print(f" {'TOTAL':<20} {format_tokens(total_used):>10} {format_tokens(total_target):>10} {total_bar} {total_pct:>5.1f}%")
print("=" * 70)
# Alerts
alerts = check_alerts(usage, budgets)
if alerts:
print("\n ⚠️ ALERTS:")
for alert in alerts:
print(f" {alert}")
print()
def check_alerts(usage: Dict[str, int], budgets: Dict[str, int]) -> List[str]:
"""Check budget alerts."""
alerts = []
thresholds = [50, 80, 100]
for pipeline, budget in budgets.items():
used = usage.get(pipeline, 0)
pct = (used / budget * 100) if budget > 0 else 0
for threshold in thresholds:
if pct >= threshold:
level = "🔴" if threshold == 100 else "🟡" if threshold == 80 else "🟢"
alerts.append(f"{level} {pipeline}: {pct:.1f}% used ({format_tokens(used)}/{format_tokens(budget)})")
return alerts
def daily_summary(conn: sqlite3.Connection, budgets: Dict[str, int], date: Optional[str] = None):
"""Generate daily summary report."""
if date is None:
date = datetime.utcnow().strftime("%Y-%m-%d")
start = f"{date}T00:00:00"
end = f"{date}T23:59:59"
usage = get_usage_since(conn, start)
print(f"\n{'='*60}")
print(f" DAILY SUMMARY — {date}")
print(f"{'='*60}")
total = 0
for pipeline, budget in sorted(budgets.items()):
used = usage.get(pipeline, 0)
total += used
pct = (used / budget * 100) if budget > 0 else 0
print(f" {pipeline:<20} {format_tokens(used):>10} / {format_tokens(budget):>10} ({pct:.1f}%)")
# Per-worker breakdown
workers = get_worker_usage(conn, pipeline, start)
for worker, wtokens in list(workers.items())[:5]:
print(f" └─ {worker}: {format_tokens(wtokens)}")
print(f"{''*60}")
print(f" {'TOTAL':<20} {format_tokens(total):>10}")
print(f"{'='*60}\n")
def watch_mode(conn: sqlite3.Connection, budgets: Dict[str, int], interval: int = 5):
"""Live dashboard with refresh."""
try:
while True:
render_dashboard(conn, budgets)
time.sleep(interval)
except KeyboardInterrupt:
print("\nStopped.")
def main():
parser = argparse.ArgumentParser(description="Pipeline Token Budget Tracker")
parser.add_argument("--db", help="SQLite database path")
parser.add_argument("--watch", action="store_true", help="Live dashboard")
parser.add_argument("--refresh", type=int, default=5, help="Dashboard refresh interval (seconds)")
parser.add_argument("--summary", action="store_true", help="Daily summary")
parser.add_argument("--date", help="Date for summary (YYYY-MM-DD)")
parser.add_argument("--record", nargs=3, metavar=("PIPELINE", "WORKER", "TOKENS"),
help="Record token usage")
parser.add_argument("--budget", nargs=2, metavar=("PIPELINE", "TOKENS"),
help="Set pipeline budget")
parser.add_argument("--budgets-file", help="Budgets JSON file path")
parser.add_argument("--alerts", action="store_true", help="Check alerts only")
parser.add_argument("--json", action="store_true", help="JSON output")
args = parser.parse_args()
db_path = Path(args.db) if args.db else None
conn = get_db(db_path)
if args.budgets_file:
global BUDGETS_FILE
BUDGETS_FILE = Path(args.budgets_file)
budgets = load_budgets()
if args.record:
pipeline, worker, tokens = args.record
record_usage(conn, pipeline, worker, int(tokens))
print(f"Recorded: {pipeline}/{worker} = {int(tokens)} tokens")
elif args.budget:
pipeline, tokens = args.budget
budgets[pipeline] = int(tokens)
save_budgets(budgets)
print(f"Budget set: {pipeline} = {int(tokens)} tokens")
elif args.alerts:
today = datetime.utcnow().strftime("%Y-%m-%d")
usage = get_usage_since(conn, f"{today}T00:00:00")
alerts = check_alerts(usage, budgets)
if alerts:
for a in alerts:
print(a)
sys.exit(1)
else:
print("No alerts.")
elif args.summary:
daily_summary(conn, budgets, args.date)
elif args.watch:
watch_mode(conn, budgets, args.refresh)
else:
render_dashboard(conn, budgets)
if __name__ == "__main__":
main()