Files
timmy-config/scripts/token_tracker.py
Alexander Whitestone 91d94e29e8
Some checks failed
Smoke Test / smoke (pull_request) Failing after 24s
Architecture Lint / Linter Tests (pull_request) Successful in 29s
Validate Config / YAML Lint (pull_request) Failing after 16s
Validate Config / JSON Validate (pull_request) Successful in 21s
Validate Config / Shell Script Lint (pull_request) Failing after 1m8s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m21s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Cron Syntax Check (pull_request) Successful in 15s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 15s
Validate Config / Playbook Schema Validation (pull_request) Successful in 30s
Architecture Lint / Lint Repository (pull_request) Failing after 28s
PR Checklist / pr-checklist (pull_request) Successful in 5m18s
fix(pipeline): repair token tracker CLI summary (#622)
Fix the hyphenated token-tracker entrypoint and normalize SQLite time filtering
so same-day usage appears in the summary dashboard.
2026-04-22 11:23:43 -04:00

335 lines
11 KiB
Python

#!/usr/bin/env python3
"""
token_tracker.py — Pipeline Token Budget Tracker
Real-time token spend tracking across all pipelines with:
- SQLite store for token usage per pipeline/worker/hour
- CLI dashboard with live refresh
- Budget alerts at 50%, 80%, 100%
- Daily summary reports
Usage:
python3 token_tracker.py --watch # Live dashboard
python3 token_tracker.py --summary # Daily summary
python3 token_tracker.py --record pipeline worker tokens # Record usage
python3 token_tracker.py --budget pipeline 200000000 # Set budget
python3 token_tracker.py --alerts # Check alerts
"""
import argparse
import json
import os
import sqlite3
import sys
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Tuple
DB_PATH = Path.home() / ".hermes" / "pipelines" / "token_usage.db"
BUDGETS_FILE = Path.home() / ".hermes" / "pipelines" / "budgets.json"
# Default pipeline budgets (tokens)
DEFAULT_BUDGETS = {
"knowledge-mine": 200_000_000,
"training-factory": 215_000_000,
"playground": 16_000_000,
"adversary": 17_000_000,
}
def get_db(db_path: Optional[Path] = None) -> sqlite3.Connection:
"""Get or create SQLite database."""
path = db_path or DB_PATH
path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(path))
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("""
CREATE TABLE IF NOT EXISTS token_usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline TEXT NOT NULL,
worker TEXT NOT NULL,
tokens INTEGER NOT NULL,
recorded_at TEXT NOT NULL DEFAULT (datetime('now')),
hour_key TEXT GENERATED ALWAYS AS (strftime('%Y-%m-%d %H', recorded_at)) STORED
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_pipeline_hour
ON token_usage(pipeline, hour_key)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_recorded_at
ON token_usage(recorded_at)
""")
conn.commit()
return conn
def record_usage(conn: sqlite3.Connection, pipeline: str, worker: str, tokens: int):
"""Record token usage."""
conn.execute(
"INSERT INTO token_usage (pipeline, worker, tokens) VALUES (?, ?, ?)",
(pipeline, worker, tokens)
)
conn.commit()
def normalize_since(since: str) -> str:
"""Normalize ISO timestamps for SQLite datetime comparisons."""
return since.replace("T", " ")
def get_usage_since(conn: sqlite3.Connection, since: str) -> Dict[str, int]:
"""Get total tokens per pipeline since a datetime."""
cursor = conn.execute("""
SELECT pipeline, SUM(tokens) as total
FROM token_usage
WHERE datetime(recorded_at) >= datetime(?)
GROUP BY pipeline
ORDER BY total DESC
""", (normalize_since(since),))
return {row[0]: row[1] for row in cursor.fetchall()}
def get_hourly_usage(conn: sqlite3.Connection, pipeline: str, hours: int = 24) -> List[Tuple[str, int]]:
"""Get hourly token usage for a pipeline."""
since = (datetime.utcnow() - timedelta(hours=hours)).isoformat()
cursor = conn.execute("""
SELECT hour_key, SUM(tokens) as total
FROM token_usage
WHERE pipeline = ? AND datetime(recorded_at) >= datetime(?)
GROUP BY hour_key
ORDER BY hour_key
""", (pipeline, normalize_since(since)))
return cursor.fetchall()
def get_worker_usage(conn: sqlite3.Connection, pipeline: str, since: str) -> Dict[str, int]:
"""Get per-worker token usage for a pipeline."""
cursor = conn.execute("""
SELECT worker, SUM(tokens) as total
FROM token_usage
WHERE pipeline = ? AND datetime(recorded_at) >= datetime(?)
GROUP BY worker
ORDER BY total DESC
""", (pipeline, normalize_since(since)))
return {row[0]: row[1] for row in cursor.fetchall()}
def load_budgets() -> Dict[str, int]:
"""Load pipeline budgets."""
if BUDGETS_FILE.exists():
with open(BUDGETS_FILE) as f:
return json.load(f)
return DEFAULT_BUDGETS.copy()
def save_budgets(budgets: Dict[str, int]):
"""Save pipeline budgets."""
BUDGETS_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(BUDGETS_FILE, 'w') as f:
json.dump(budgets, f, indent=2)
def format_tokens(tokens: int) -> str:
"""Format token count for display."""
if tokens >= 1_000_000_000:
return f"{tokens / 1_000_000_000:.1f}B"
if tokens >= 1_000_000:
return f"{tokens / 1_000_000:.1f}M"
if tokens >= 1_000:
return f"{tokens / 1_000:.1f}K"
return str(tokens)
def progress_bar(used: int, target: int, width: int = 10) -> str:
"""Generate a progress bar."""
if target == 0:
return "" * width
ratio = min(used / target, 1.0)
filled = int(ratio * width)
return "" * filled + "" * (width - filled)
def estimate_eta(used: int, target: int, hours_elapsed: float) -> str:
"""Estimate time to completion."""
if hours_elapsed <= 0 or used <= 0:
return "N/A"
rate = used / hours_elapsed
remaining = target - used
if remaining <= 0:
return "DONE"
eta_hours = remaining / rate
if eta_hours >= 1:
return f"{eta_hours:.1f}h"
return f"{eta_hours * 60:.0f}m"
def render_dashboard(conn: sqlite3.Connection, budgets: Dict[str, int]):
"""Render the live dashboard."""
today = datetime.utcnow().strftime("%Y-%m-%d")
usage = get_usage_since(conn, f"{today}T00:00:00")
print("\033[2J\033[H") # Clear screen
print("=" * 70)
print(" TOKEN BUDGET TRACKER")
print(f" {datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')}")
print("=" * 70)
print(f" {'Pipeline':<20} {'Used':>10} {'Target':>10} {'Progress':>12} {'ETA':>8}")
print("-" * 70)
total_used = 0
total_target = 0
for pipeline, budget in sorted(budgets.items()):
used = usage.get(pipeline, 0)
total_used += used
total_target += budget
bar = progress_bar(used, budget)
pct = (used / budget * 100) if budget > 0 else 0
# Estimate ETA based on current hour's rate
hour_key = datetime.utcnow().strftime("%Y-%m-%d %H")
hourly = get_hourly_usage(conn, pipeline, hours=1)
current_hour_rate = hourly[0][1] if hourly else 0
remaining = budget - used
eta = estimate_eta(used, budget, 1) if current_hour_rate > 0 else "N/A"
print(f" {pipeline:<20} {format_tokens(used):>10} {format_tokens(budget):>10} {bar} {pct:>5.1f}% {eta:>8}")
print("-" * 70)
total_bar = progress_bar(total_used, total_target)
total_pct = (total_used / total_target * 100) if total_target > 0 else 0
print(f" {'TOTAL':<20} {format_tokens(total_used):>10} {format_tokens(total_target):>10} {total_bar} {total_pct:>5.1f}%")
print("=" * 70)
# Alerts
alerts = check_alerts(usage, budgets)
if alerts:
print("\n ⚠️ ALERTS:")
for alert in alerts:
print(f" {alert}")
print()
def check_alerts(usage: Dict[str, int], budgets: Dict[str, int]) -> List[str]:
"""Check budget alerts."""
alerts = []
thresholds = [50, 80, 100]
for pipeline, budget in budgets.items():
used = usage.get(pipeline, 0)
pct = (used / budget * 100) if budget > 0 else 0
for threshold in thresholds:
if pct >= threshold:
level = "🔴" if threshold == 100 else "🟡" if threshold == 80 else "🟢"
alerts.append(f"{level} {pipeline}: {pct:.1f}% used ({format_tokens(used)}/{format_tokens(budget)})")
return alerts
def daily_summary(conn: sqlite3.Connection, budgets: Dict[str, int], date: Optional[str] = None):
"""Generate daily summary report."""
if date is None:
date = datetime.utcnow().strftime("%Y-%m-%d")
start = f"{date}T00:00:00"
end = f"{date}T23:59:59"
usage = get_usage_since(conn, start)
print(f"\n{'='*60}")
print(f" DAILY SUMMARY — {date}")
print(f"{'='*60}")
total = 0
for pipeline, budget in sorted(budgets.items()):
used = usage.get(pipeline, 0)
total += used
pct = (used / budget * 100) if budget > 0 else 0
print(f" {pipeline:<20} {format_tokens(used):>10} / {format_tokens(budget):>10} ({pct:.1f}%)")
# Per-worker breakdown
workers = get_worker_usage(conn, pipeline, start)
for worker, wtokens in list(workers.items())[:5]:
print(f" └─ {worker}: {format_tokens(wtokens)}")
print(f"{''*60}")
print(f" {'TOTAL':<20} {format_tokens(total):>10}")
print(f"{'='*60}\n")
def watch_mode(conn: sqlite3.Connection, budgets: Dict[str, int], interval: int = 5):
"""Live dashboard with refresh."""
try:
while True:
render_dashboard(conn, budgets)
time.sleep(interval)
except KeyboardInterrupt:
print("\nStopped.")
def main():
parser = argparse.ArgumentParser(description="Pipeline Token Budget Tracker")
parser.add_argument("--db", help="SQLite database path")
parser.add_argument("--watch", action="store_true", help="Live dashboard")
parser.add_argument("--refresh", type=int, default=5, help="Dashboard refresh interval (seconds)")
parser.add_argument("--summary", action="store_true", help="Daily summary")
parser.add_argument("--date", help="Date for summary (YYYY-MM-DD)")
parser.add_argument("--record", nargs=3, metavar=("PIPELINE", "WORKER", "TOKENS"),
help="Record token usage")
parser.add_argument("--budget", nargs=2, metavar=("PIPELINE", "TOKENS"),
help="Set pipeline budget")
parser.add_argument("--budgets-file", help="Budgets JSON file path")
parser.add_argument("--alerts", action="store_true", help="Check alerts only")
parser.add_argument("--json", action="store_true", help="JSON output")
args = parser.parse_args()
db_path = Path(args.db) if args.db else None
conn = get_db(db_path)
if args.budgets_file:
global BUDGETS_FILE
BUDGETS_FILE = Path(args.budgets_file)
budgets = load_budgets()
if args.record:
pipeline, worker, tokens = args.record
record_usage(conn, pipeline, worker, int(tokens))
print(f"Recorded: {pipeline}/{worker} = {int(tokens)} tokens")
elif args.budget:
pipeline, tokens = args.budget
budgets[pipeline] = int(tokens)
save_budgets(budgets)
print(f"Budget set: {pipeline} = {int(tokens)} tokens")
elif args.alerts:
today = datetime.utcnow().strftime("%Y-%m-%d")
usage = get_usage_since(conn, f"{today}T00:00:00")
alerts = check_alerts(usage, budgets)
if alerts:
for a in alerts:
print(a)
sys.exit(1)
else:
print("No alerts.")
elif args.summary:
daily_summary(conn, budgets, args.date)
elif args.watch:
watch_mode(conn, budgets, args.refresh)
else:
render_dashboard(conn, budgets)
if __name__ == "__main__":
main()