Compare commits
1 Commits
step35/443
...
fix/622
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
91d94e29e8 |
@@ -1,194 +1,20 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""Token Budget Tracker -- real-time spend dashboard for pipelines."""
|
"""Compatibility wrapper for the token budget tracker CLI.
|
||||||
|
|
||||||
|
Issue #622 asked for a `token-tracker.py` entrypoint. The maintained
|
||||||
|
implementation lives in `scripts/token_tracker.py`. Keep this thin shim so
|
||||||
|
operator docs and older calls continue to work.
|
||||||
|
"""
|
||||||
|
|
||||||
import argparse, json, os, sqlite3, sys, time
|
|
||||||
from datetime import datetime
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
import sys
|
||||||
|
|
||||||
DB_PATH = Path.home() / ".hermes" / "pipelines" / "token_usage.db"
|
SCRIPT_DIR = Path(__file__).resolve().parent
|
||||||
ALERT_THRESHOLDS = [0.5, 0.8, 1.0]
|
if str(SCRIPT_DIR) not in sys.path:
|
||||||
DEFAULT_BUDGETS = {
|
sys.path.insert(0, str(SCRIPT_DIR))
|
||||||
"knowledge-mine": 200_000_000,
|
|
||||||
"training-factory": 215_000_000,
|
|
||||||
"playground": 16_000_000,
|
|
||||||
"adversary": 17_000_000,
|
|
||||||
}
|
|
||||||
|
|
||||||
def get_db():
|
from token_tracker import main
|
||||||
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
||||||
conn = sqlite3.connect(str(DB_PATH))
|
|
||||||
conn.row_factory = sqlite3.Row
|
|
||||||
conn.execute("PRAGMA journal_mode=WAL")
|
|
||||||
conn.executescript("""
|
|
||||||
CREATE TABLE IF NOT EXISTS token_usage (
|
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
||||||
pipeline TEXT NOT NULL,
|
|
||||||
worker TEXT NOT NULL,
|
|
||||||
tokens INTEGER NOT NULL,
|
|
||||||
recorded_at REAL NOT NULL,
|
|
||||||
hour_bucket TEXT NOT NULL
|
|
||||||
);
|
|
||||||
CREATE TABLE IF NOT EXISTS pipeline_budgets (
|
|
||||||
pipeline TEXT PRIMARY KEY,
|
|
||||||
target_tokens INTEGER NOT NULL,
|
|
||||||
updated_at REAL NOT NULL
|
|
||||||
);
|
|
||||||
CREATE TABLE IF NOT EXISTS alerts_sent (
|
|
||||||
pipeline TEXT NOT NULL,
|
|
||||||
threshold REAL NOT NULL,
|
|
||||||
sent_at REAL NOT NULL,
|
|
||||||
PRIMARY KEY (pipeline, threshold)
|
|
||||||
);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_usage_pipeline_hour
|
|
||||||
ON token_usage(pipeline, hour_bucket);
|
|
||||||
""")
|
|
||||||
for name, target in DEFAULT_BUDGETS.items():
|
|
||||||
conn.execute(
|
|
||||||
"INSERT OR IGNORE INTO pipeline_budgets (pipeline, target_tokens, updated_at) VALUES (?, ?, ?)",
|
|
||||||
(name, target, time.time())
|
|
||||||
)
|
|
||||||
conn.commit()
|
|
||||||
return conn
|
|
||||||
|
|
||||||
def log_usage(conn, pipeline, worker, tokens):
|
|
||||||
now = time.time()
|
|
||||||
hour = datetime.now().strftime("%Y-%m-%d %H:00")
|
|
||||||
conn.execute(
|
|
||||||
"INSERT INTO token_usage (pipeline, worker, tokens, recorded_at, hour_bucket) VALUES (?, ?, ?, ?, ?)",
|
|
||||||
(pipeline, worker, tokens, now, hour)
|
|
||||||
)
|
|
||||||
conn.commit()
|
|
||||||
check_alerts(conn, pipeline)
|
|
||||||
|
|
||||||
def get_pipeline_stats(conn):
|
|
||||||
rows = conn.execute("""
|
|
||||||
SELECT u.pipeline, COALESCE(b.target_tokens, 0) as target,
|
|
||||||
SUM(u.tokens) as used, MIN(u.recorded_at) as started_at,
|
|
||||||
COUNT(DISTINCT u.worker) as workers
|
|
||||||
FROM token_usage u
|
|
||||||
LEFT JOIN pipeline_budgets b ON u.pipeline = b.pipeline
|
|
||||||
GROUP BY u.pipeline ORDER BY used DESC
|
|
||||||
""").fetchall()
|
|
||||||
return [dict(r) for r in rows]
|
|
||||||
|
|
||||||
def fmt(n):
|
|
||||||
if n >= 1_000_000_000: return f"{n/1_000_000_000:.1f}B"
|
|
||||||
if n >= 1_000_000: return f"{n/1_000_000:.1f}M"
|
|
||||||
if n >= 1_000: return f"{n/1_000:.1f}K"
|
|
||||||
return str(n)
|
|
||||||
|
|
||||||
def bar(ratio, w=8):
|
|
||||||
filled = int(ratio * w)
|
|
||||||
return "█" * filled + "░" * (w - filled)
|
|
||||||
|
|
||||||
def eta(used, target, started):
|
|
||||||
if used <= 0 or started <= 0: return "--"
|
|
||||||
elapsed = (time.time() - started) / 3600
|
|
||||||
if elapsed <= 0: return "--"
|
|
||||||
rate = used / elapsed
|
|
||||||
remaining = target - used
|
|
||||||
if remaining <= 0: return "DONE"
|
|
||||||
h = remaining / rate
|
|
||||||
return f"{h/24:.1f}d" if h >= 24 else f"{h:.1f}h"
|
|
||||||
|
|
||||||
def render_dashboard(conn):
|
|
||||||
stats = get_pipeline_stats(conn)
|
|
||||||
if not stats:
|
|
||||||
print("No pipeline data recorded yet.")
|
|
||||||
return
|
|
||||||
print()
|
|
||||||
print(f"{'Pipeline':<20} {'Tokens Used':>12} {'Target':>10} {'Progress':>10} {'ETA':>8} {'Workers':>8}")
|
|
||||||
print("-" * 72)
|
|
||||||
total_used = total_target = 0
|
|
||||||
for s in stats:
|
|
||||||
used = s["used"] or 0
|
|
||||||
target = s["target"] or 1
|
|
||||||
ratio = min(used / target, 1.0) if target > 0 else 0
|
|
||||||
print(f"{s['pipeline']:<20} {fmt(used):>12} {fmt(target):>10} {bar(ratio):>10} {eta(used, target, s['started_at'] or 0):>8} {s['workers'] or 0:>8}")
|
|
||||||
total_used += used
|
|
||||||
total_target += target
|
|
||||||
print("-" * 72)
|
|
||||||
r = min(total_used / total_target, 1.0) if total_target > 0 else 0
|
|
||||||
print(f"{'TOTAL':<20} {fmt(total_used):>12} {fmt(total_target):>10} {bar(r):>10}")
|
|
||||||
print()
|
|
||||||
|
|
||||||
def render_watch(conn, interval=5):
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
os.system("clear" if os.name != "nt" else "cls")
|
|
||||||
print(f"Token Budget Tracker -- {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
||||||
print("Press Ctrl+C to exit")
|
|
||||||
render_dashboard(conn)
|
|
||||||
time.sleep(interval)
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
print("\nExiting.")
|
|
||||||
|
|
||||||
def render_daily_summary(conn):
|
|
||||||
since = time.time() - 86400
|
|
||||||
rows = conn.execute("""
|
|
||||||
SELECT pipeline, SUM(tokens) as total, COUNT(DISTINCT worker) as workers, COUNT(*) as entries
|
|
||||||
FROM token_usage WHERE recorded_at >= ? GROUP BY pipeline ORDER BY total DESC
|
|
||||||
""", (since,)).fetchall()
|
|
||||||
if not rows:
|
|
||||||
print("No usage in last 24 hours.")
|
|
||||||
return
|
|
||||||
print(f"\nDaily Summary -- last 24 hours")
|
|
||||||
print(f"{'Pipeline':<20} {'Total Tokens':>14} {'Workers':>8} {'Entries':>8}")
|
|
||||||
print("-" * 54)
|
|
||||||
gt = 0
|
|
||||||
for r in rows:
|
|
||||||
print(f"{r['pipeline']:<20} {fmt(r['total']):>14} {r['workers']:>8} {r['entries']:>8}")
|
|
||||||
gt += r["total"]
|
|
||||||
print("-" * 54)
|
|
||||||
print(f"{'TOTAL':<20} {fmt(gt):>14}\n")
|
|
||||||
|
|
||||||
def check_alerts(conn, pipeline):
|
|
||||||
row = conn.execute(
|
|
||||||
"SELECT SUM(u.tokens) as used, COALESCE(b.target_tokens, 0) as target "
|
|
||||||
"FROM token_usage u LEFT JOIN pipeline_budgets b ON u.pipeline = b.pipeline "
|
|
||||||
"WHERE u.pipeline = ?", (pipeline,)
|
|
||||||
).fetchone()
|
|
||||||
if not row or row["target"] <= 0: return
|
|
||||||
ratio = row["used"] / row["target"]
|
|
||||||
for t in ALERT_THRESHOLDS:
|
|
||||||
if ratio >= t:
|
|
||||||
existing = conn.execute("SELECT 1 FROM alerts_sent WHERE pipeline = ? AND threshold = ?", (pipeline, t)).fetchone()
|
|
||||||
if not existing:
|
|
||||||
print(f"⚠️ BUDGET ALERT: {pipeline} at {int(t*100)}% ({fmt(row['used'])}/{fmt(row['target'])})", file=sys.stderr)
|
|
||||||
conn.execute("INSERT INTO alerts_sent (pipeline, threshold, sent_at) VALUES (?, ?, ?)", (pipeline, t, time.time()))
|
|
||||||
conn.commit()
|
|
||||||
|
|
||||||
def set_budget(conn, pipeline, target):
|
|
||||||
conn.execute("INSERT OR REPLACE INTO pipeline_budgets (pipeline, target_tokens, updated_at) VALUES (?, ?, ?)",
|
|
||||||
(pipeline, int(target), time.time()))
|
|
||||||
conn.execute("DELETE FROM alerts_sent WHERE pipeline = ?", (pipeline,))
|
|
||||||
conn.commit()
|
|
||||||
print(f"Budget set: {pipeline} = {fmt(int(target))} tokens")
|
|
||||||
|
|
||||||
def main():
|
|
||||||
parser = argparse.ArgumentParser(description="Token Budget Tracker")
|
|
||||||
parser.add_argument("--watch", action="store_true")
|
|
||||||
parser.add_argument("--watch-interval", type=int, default=5)
|
|
||||||
parser.add_argument("--summary", action="store_true")
|
|
||||||
parser.add_argument("--log", nargs=3, metavar=("PIPELINE", "WORKER", "TOKENS"))
|
|
||||||
parser.add_argument("--budget", nargs=2, metavar=("PIPELINE", "TARGET"))
|
|
||||||
parser.add_argument("--db", type=str, default=str(DB_PATH))
|
|
||||||
args = parser.parse_args()
|
|
||||||
global DB_PATH
|
|
||||||
DB_PATH = Path(args.db)
|
|
||||||
conn = get_db()
|
|
||||||
if args.log:
|
|
||||||
log_usage(conn, args.log[0], args.log[1], int(args.log[2]))
|
|
||||||
print(f"Logged: {args.log[0]}/{args.log[1]} = {fmt(int(args.log[2]))} tokens")
|
|
||||||
elif args.budget:
|
|
||||||
set_budget(conn, args.budget[0], args.budget[1])
|
|
||||||
elif args.summary:
|
|
||||||
render_daily_summary(conn)
|
|
||||||
elif args.watch:
|
|
||||||
render_watch(conn, interval=args.watch_interval)
|
|
||||||
else:
|
|
||||||
render_dashboard(conn)
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|||||||
@@ -76,15 +76,20 @@ def record_usage(conn: sqlite3.Connection, pipeline: str, worker: str, tokens: i
|
|||||||
conn.commit()
|
conn.commit()
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_since(since: str) -> str:
|
||||||
|
"""Normalize ISO timestamps for SQLite datetime comparisons."""
|
||||||
|
return since.replace("T", " ")
|
||||||
|
|
||||||
|
|
||||||
def get_usage_since(conn: sqlite3.Connection, since: str) -> Dict[str, int]:
|
def get_usage_since(conn: sqlite3.Connection, since: str) -> Dict[str, int]:
|
||||||
"""Get total tokens per pipeline since a datetime."""
|
"""Get total tokens per pipeline since a datetime."""
|
||||||
cursor = conn.execute("""
|
cursor = conn.execute("""
|
||||||
SELECT pipeline, SUM(tokens) as total
|
SELECT pipeline, SUM(tokens) as total
|
||||||
FROM token_usage
|
FROM token_usage
|
||||||
WHERE recorded_at >= ?
|
WHERE datetime(recorded_at) >= datetime(?)
|
||||||
GROUP BY pipeline
|
GROUP BY pipeline
|
||||||
ORDER BY total DESC
|
ORDER BY total DESC
|
||||||
""", (since,))
|
""", (normalize_since(since),))
|
||||||
return {row[0]: row[1] for row in cursor.fetchall()}
|
return {row[0]: row[1] for row in cursor.fetchall()}
|
||||||
|
|
||||||
|
|
||||||
@@ -94,10 +99,10 @@ def get_hourly_usage(conn: sqlite3.Connection, pipeline: str, hours: int = 24) -
|
|||||||
cursor = conn.execute("""
|
cursor = conn.execute("""
|
||||||
SELECT hour_key, SUM(tokens) as total
|
SELECT hour_key, SUM(tokens) as total
|
||||||
FROM token_usage
|
FROM token_usage
|
||||||
WHERE pipeline = ? AND recorded_at >= ?
|
WHERE pipeline = ? AND datetime(recorded_at) >= datetime(?)
|
||||||
GROUP BY hour_key
|
GROUP BY hour_key
|
||||||
ORDER BY hour_key
|
ORDER BY hour_key
|
||||||
""", (pipeline, since))
|
""", (pipeline, normalize_since(since)))
|
||||||
return cursor.fetchall()
|
return cursor.fetchall()
|
||||||
|
|
||||||
|
|
||||||
@@ -106,10 +111,10 @@ def get_worker_usage(conn: sqlite3.Connection, pipeline: str, since: str) -> Dic
|
|||||||
cursor = conn.execute("""
|
cursor = conn.execute("""
|
||||||
SELECT worker, SUM(tokens) as total
|
SELECT worker, SUM(tokens) as total
|
||||||
FROM token_usage
|
FROM token_usage
|
||||||
WHERE pipeline = ? AND recorded_at >= ?
|
WHERE pipeline = ? AND datetime(recorded_at) >= datetime(?)
|
||||||
GROUP BY worker
|
GROUP BY worker
|
||||||
ORDER BY total DESC
|
ORDER BY total DESC
|
||||||
""", (pipeline, since))
|
""", (pipeline, normalize_since(since)))
|
||||||
return {row[0]: row[1] for row in cursor.fetchall()}
|
return {row[0]: row[1] for row in cursor.fetchall()}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -5,11 +5,12 @@ Tests for scripts/token_tracker.py — Token Budget Tracker.
|
|||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import sqlite3
|
import sqlite3
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
import tempfile
|
import tempfile
|
||||||
import unittest
|
import unittest
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
import sys
|
|
||||||
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
|
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
|
||||||
from token_tracker import (
|
from token_tracker import (
|
||||||
get_db,
|
get_db,
|
||||||
@@ -155,5 +156,37 @@ class TestBudgets(unittest.TestCase):
|
|||||||
self.assertIn("p1", loaded)
|
self.assertIn("p1", loaded)
|
||||||
|
|
||||||
|
|
||||||
|
class TestWrapperScript(unittest.TestCase):
|
||||||
|
def test_hyphen_wrapper_summary_works_with_custom_db(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
db_path = Path(tmpdir) / "wrapper.db"
|
||||||
|
budgets_path = Path(tmpdir) / "budgets.json"
|
||||||
|
budgets_path.write_text(json.dumps({"knowledge-mine": 200_000_000}))
|
||||||
|
conn = get_db(db_path)
|
||||||
|
try:
|
||||||
|
record_usage(conn, "knowledge-mine", "worker-1", 123456)
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
repo_root = Path(__file__).parent.parent
|
||||||
|
wrapper = repo_root / "scripts" / "token-tracker.py"
|
||||||
|
result = subprocess.run(
|
||||||
|
[
|
||||||
|
sys.executable,
|
||||||
|
str(wrapper),
|
||||||
|
"--summary",
|
||||||
|
"--db", str(db_path),
|
||||||
|
"--budgets-file", str(budgets_path),
|
||||||
|
],
|
||||||
|
cwd=str(repo_root),
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
)
|
||||||
|
self.assertEqual(result.returncode, 0, result.stderr)
|
||||||
|
self.assertIn("DAILY SUMMARY", result.stdout)
|
||||||
|
self.assertIn("knowledge-mine", result.stdout)
|
||||||
|
self.assertIn("123.5K", result.stdout)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
Reference in New Issue
Block a user