Compare commits
1 Commits
data/code-
...
feat/622-t
| Author | SHA1 | Date | |
|---|---|---|---|
| 89e46680df |
194
scripts/token-tracker.py
Normal file
194
scripts/token-tracker.py
Normal file
@@ -0,0 +1,194 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Token Budget Tracker -- real-time spend dashboard for pipelines."""
|
||||
|
||||
import argparse, json, os, sqlite3, sys, time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
DB_PATH = Path.home() / ".hermes" / "pipelines" / "token_usage.db"
|
||||
ALERT_THRESHOLDS = [0.5, 0.8, 1.0]
|
||||
DEFAULT_BUDGETS = {
|
||||
"knowledge-mine": 200_000_000,
|
||||
"training-factory": 215_000_000,
|
||||
"playground": 16_000_000,
|
||||
"adversary": 17_000_000,
|
||||
}
|
||||
|
||||
def get_db():
|
||||
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
conn = sqlite3.connect(str(DB_PATH))
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.executescript("""
|
||||
CREATE TABLE IF NOT EXISTS token_usage (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
pipeline TEXT NOT NULL,
|
||||
worker TEXT NOT NULL,
|
||||
tokens INTEGER NOT NULL,
|
||||
recorded_at REAL NOT NULL,
|
||||
hour_bucket TEXT NOT NULL
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS pipeline_budgets (
|
||||
pipeline TEXT PRIMARY KEY,
|
||||
target_tokens INTEGER NOT NULL,
|
||||
updated_at REAL NOT NULL
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS alerts_sent (
|
||||
pipeline TEXT NOT NULL,
|
||||
threshold REAL NOT NULL,
|
||||
sent_at REAL NOT NULL,
|
||||
PRIMARY KEY (pipeline, threshold)
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_usage_pipeline_hour
|
||||
ON token_usage(pipeline, hour_bucket);
|
||||
""")
|
||||
for name, target in DEFAULT_BUDGETS.items():
|
||||
conn.execute(
|
||||
"INSERT OR IGNORE INTO pipeline_budgets (pipeline, target_tokens, updated_at) VALUES (?, ?, ?)",
|
||||
(name, target, time.time())
|
||||
)
|
||||
conn.commit()
|
||||
return conn
|
||||
|
||||
def log_usage(conn, pipeline, worker, tokens):
|
||||
now = time.time()
|
||||
hour = datetime.now().strftime("%Y-%m-%d %H:00")
|
||||
conn.execute(
|
||||
"INSERT INTO token_usage (pipeline, worker, tokens, recorded_at, hour_bucket) VALUES (?, ?, ?, ?, ?)",
|
||||
(pipeline, worker, tokens, now, hour)
|
||||
)
|
||||
conn.commit()
|
||||
check_alerts(conn, pipeline)
|
||||
|
||||
def get_pipeline_stats(conn):
|
||||
rows = conn.execute("""
|
||||
SELECT u.pipeline, COALESCE(b.target_tokens, 0) as target,
|
||||
SUM(u.tokens) as used, MIN(u.recorded_at) as started_at,
|
||||
COUNT(DISTINCT u.worker) as workers
|
||||
FROM token_usage u
|
||||
LEFT JOIN pipeline_budgets b ON u.pipeline = b.pipeline
|
||||
GROUP BY u.pipeline ORDER BY used DESC
|
||||
""").fetchall()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
def fmt(n):
|
||||
if n >= 1_000_000_000: return f"{n/1_000_000_000:.1f}B"
|
||||
if n >= 1_000_000: return f"{n/1_000_000:.1f}M"
|
||||
if n >= 1_000: return f"{n/1_000:.1f}K"
|
||||
return str(n)
|
||||
|
||||
def bar(ratio, w=8):
|
||||
filled = int(ratio * w)
|
||||
return "█" * filled + "░" * (w - filled)
|
||||
|
||||
def eta(used, target, started):
|
||||
if used <= 0 or started <= 0: return "--"
|
||||
elapsed = (time.time() - started) / 3600
|
||||
if elapsed <= 0: return "--"
|
||||
rate = used / elapsed
|
||||
remaining = target - used
|
||||
if remaining <= 0: return "DONE"
|
||||
h = remaining / rate
|
||||
return f"{h/24:.1f}d" if h >= 24 else f"{h:.1f}h"
|
||||
|
||||
def render_dashboard(conn):
|
||||
stats = get_pipeline_stats(conn)
|
||||
if not stats:
|
||||
print("No pipeline data recorded yet.")
|
||||
return
|
||||
print()
|
||||
print(f"{'Pipeline':<20} {'Tokens Used':>12} {'Target':>10} {'Progress':>10} {'ETA':>8} {'Workers':>8}")
|
||||
print("-" * 72)
|
||||
total_used = total_target = 0
|
||||
for s in stats:
|
||||
used = s["used"] or 0
|
||||
target = s["target"] or 1
|
||||
ratio = min(used / target, 1.0) if target > 0 else 0
|
||||
print(f"{s['pipeline']:<20} {fmt(used):>12} {fmt(target):>10} {bar(ratio):>10} {eta(used, target, s['started_at'] or 0):>8} {s['workers'] or 0:>8}")
|
||||
total_used += used
|
||||
total_target += target
|
||||
print("-" * 72)
|
||||
r = min(total_used / total_target, 1.0) if total_target > 0 else 0
|
||||
print(f"{'TOTAL':<20} {fmt(total_used):>12} {fmt(total_target):>10} {bar(r):>10}")
|
||||
print()
|
||||
|
||||
def render_watch(conn, interval=5):
|
||||
try:
|
||||
while True:
|
||||
os.system("clear" if os.name != "nt" else "cls")
|
||||
print(f"Token Budget Tracker -- {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
print("Press Ctrl+C to exit")
|
||||
render_dashboard(conn)
|
||||
time.sleep(interval)
|
||||
except KeyboardInterrupt:
|
||||
print("\nExiting.")
|
||||
|
||||
def render_daily_summary(conn):
|
||||
since = time.time() - 86400
|
||||
rows = conn.execute("""
|
||||
SELECT pipeline, SUM(tokens) as total, COUNT(DISTINCT worker) as workers, COUNT(*) as entries
|
||||
FROM token_usage WHERE recorded_at >= ? GROUP BY pipeline ORDER BY total DESC
|
||||
""", (since,)).fetchall()
|
||||
if not rows:
|
||||
print("No usage in last 24 hours.")
|
||||
return
|
||||
print(f"\nDaily Summary -- last 24 hours")
|
||||
print(f"{'Pipeline':<20} {'Total Tokens':>14} {'Workers':>8} {'Entries':>8}")
|
||||
print("-" * 54)
|
||||
gt = 0
|
||||
for r in rows:
|
||||
print(f"{r['pipeline']:<20} {fmt(r['total']):>14} {r['workers']:>8} {r['entries']:>8}")
|
||||
gt += r["total"]
|
||||
print("-" * 54)
|
||||
print(f"{'TOTAL':<20} {fmt(gt):>14}\n")
|
||||
|
||||
def check_alerts(conn, pipeline):
|
||||
row = conn.execute(
|
||||
"SELECT SUM(u.tokens) as used, COALESCE(b.target_tokens, 0) as target "
|
||||
"FROM token_usage u LEFT JOIN pipeline_budgets b ON u.pipeline = b.pipeline "
|
||||
"WHERE u.pipeline = ?", (pipeline,)
|
||||
).fetchone()
|
||||
if not row or row["target"] <= 0: return
|
||||
ratio = row["used"] / row["target"]
|
||||
for t in ALERT_THRESHOLDS:
|
||||
if ratio >= t:
|
||||
existing = conn.execute("SELECT 1 FROM alerts_sent WHERE pipeline = ? AND threshold = ?", (pipeline, t)).fetchone()
|
||||
if not existing:
|
||||
print(f"⚠️ BUDGET ALERT: {pipeline} at {int(t*100)}% ({fmt(row['used'])}/{fmt(row['target'])})", file=sys.stderr)
|
||||
conn.execute("INSERT INTO alerts_sent (pipeline, threshold, sent_at) VALUES (?, ?, ?)", (pipeline, t, time.time()))
|
||||
conn.commit()
|
||||
|
||||
def set_budget(conn, pipeline, target):
|
||||
conn.execute("INSERT OR REPLACE INTO pipeline_budgets (pipeline, target_tokens, updated_at) VALUES (?, ?, ?)",
|
||||
(pipeline, int(target), time.time()))
|
||||
conn.execute("DELETE FROM alerts_sent WHERE pipeline = ?", (pipeline,))
|
||||
conn.commit()
|
||||
print(f"Budget set: {pipeline} = {fmt(int(target))} tokens")
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Token Budget Tracker")
|
||||
parser.add_argument("--watch", action="store_true")
|
||||
parser.add_argument("--watch-interval", type=int, default=5)
|
||||
parser.add_argument("--summary", action="store_true")
|
||||
parser.add_argument("--log", nargs=3, metavar=("PIPELINE", "WORKER", "TOKENS"))
|
||||
parser.add_argument("--budget", nargs=2, metavar=("PIPELINE", "TARGET"))
|
||||
parser.add_argument("--db", type=str, default=str(DB_PATH))
|
||||
args = parser.parse_args()
|
||||
global DB_PATH
|
||||
DB_PATH = Path(args.db)
|
||||
conn = get_db()
|
||||
if args.log:
|
||||
log_usage(conn, args.log[0], args.log[1], int(args.log[2]))
|
||||
print(f"Logged: {args.log[0]}/{args.log[1]} = {fmt(int(args.log[2]))} tokens")
|
||||
elif args.budget:
|
||||
set_budget(conn, args.budget[0], args.budget[1])
|
||||
elif args.summary:
|
||||
render_daily_summary(conn)
|
||||
elif args.watch:
|
||||
render_watch(conn, interval=args.watch_interval)
|
||||
else:
|
||||
render_dashboard(conn)
|
||||
conn.close()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user