Compare commits

...

1 Commits

Author SHA1 Message Date
Alexander Whitestone
91d94e29e8 fix(pipeline): repair token tracker CLI summary (#622)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 24s
Architecture Lint / Linter Tests (pull_request) Successful in 29s
Validate Config / YAML Lint (pull_request) Failing after 16s
Validate Config / JSON Validate (pull_request) Successful in 21s
Validate Config / Shell Script Lint (pull_request) Failing after 1m8s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m21s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Cron Syntax Check (pull_request) Successful in 15s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 15s
Validate Config / Playbook Schema Validation (pull_request) Successful in 30s
Architecture Lint / Lint Repository (pull_request) Failing after 28s
PR Checklist / pr-checklist (pull_request) Successful in 5m18s
Fix the hyphenated token-tracker entrypoint and normalize SQLite time filtering
so same-day usage appears in the summary dashboard.
2026-04-22 11:23:43 -04:00
3 changed files with 56 additions and 192 deletions

View File

@@ -1,194 +1,20 @@
#!/usr/bin/env python3
"""Token Budget Tracker -- real-time spend dashboard for pipelines."""
"""Compatibility wrapper for the token budget tracker CLI.
Issue #622 asked for a `token-tracker.py` entrypoint. The maintained
implementation lives in `scripts/token_tracker.py`. Keep this thin shim so
operator docs and older calls continue to work.
"""
import argparse, json, os, sqlite3, sys, time
from datetime import datetime
from pathlib import Path
import sys
DB_PATH = Path.home() / ".hermes" / "pipelines" / "token_usage.db"
ALERT_THRESHOLDS = [0.5, 0.8, 1.0]
DEFAULT_BUDGETS = {
"knowledge-mine": 200_000_000,
"training-factory": 215_000_000,
"playground": 16_000_000,
"adversary": 17_000_000,
}
SCRIPT_DIR = Path(__file__).resolve().parent
if str(SCRIPT_DIR) not in sys.path:
sys.path.insert(0, str(SCRIPT_DIR))
def get_db():
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.executescript("""
CREATE TABLE IF NOT EXISTS token_usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline TEXT NOT NULL,
worker TEXT NOT NULL,
tokens INTEGER NOT NULL,
recorded_at REAL NOT NULL,
hour_bucket TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS pipeline_budgets (
pipeline TEXT PRIMARY KEY,
target_tokens INTEGER NOT NULL,
updated_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS alerts_sent (
pipeline TEXT NOT NULL,
threshold REAL NOT NULL,
sent_at REAL NOT NULL,
PRIMARY KEY (pipeline, threshold)
);
CREATE INDEX IF NOT EXISTS idx_usage_pipeline_hour
ON token_usage(pipeline, hour_bucket);
""")
for name, target in DEFAULT_BUDGETS.items():
conn.execute(
"INSERT OR IGNORE INTO pipeline_budgets (pipeline, target_tokens, updated_at) VALUES (?, ?, ?)",
(name, target, time.time())
)
conn.commit()
return conn
from token_tracker import main
def log_usage(conn, pipeline, worker, tokens):
now = time.time()
hour = datetime.now().strftime("%Y-%m-%d %H:00")
conn.execute(
"INSERT INTO token_usage (pipeline, worker, tokens, recorded_at, hour_bucket) VALUES (?, ?, ?, ?, ?)",
(pipeline, worker, tokens, now, hour)
)
conn.commit()
check_alerts(conn, pipeline)
def get_pipeline_stats(conn):
rows = conn.execute("""
SELECT u.pipeline, COALESCE(b.target_tokens, 0) as target,
SUM(u.tokens) as used, MIN(u.recorded_at) as started_at,
COUNT(DISTINCT u.worker) as workers
FROM token_usage u
LEFT JOIN pipeline_budgets b ON u.pipeline = b.pipeline
GROUP BY u.pipeline ORDER BY used DESC
""").fetchall()
return [dict(r) for r in rows]
def fmt(n):
if n >= 1_000_000_000: return f"{n/1_000_000_000:.1f}B"
if n >= 1_000_000: return f"{n/1_000_000:.1f}M"
if n >= 1_000: return f"{n/1_000:.1f}K"
return str(n)
def bar(ratio, w=8):
filled = int(ratio * w)
return "" * filled + "" * (w - filled)
def eta(used, target, started):
if used <= 0 or started <= 0: return "--"
elapsed = (time.time() - started) / 3600
if elapsed <= 0: return "--"
rate = used / elapsed
remaining = target - used
if remaining <= 0: return "DONE"
h = remaining / rate
return f"{h/24:.1f}d" if h >= 24 else f"{h:.1f}h"
def render_dashboard(conn):
stats = get_pipeline_stats(conn)
if not stats:
print("No pipeline data recorded yet.")
return
print()
print(f"{'Pipeline':<20} {'Tokens Used':>12} {'Target':>10} {'Progress':>10} {'ETA':>8} {'Workers':>8}")
print("-" * 72)
total_used = total_target = 0
for s in stats:
used = s["used"] or 0
target = s["target"] or 1
ratio = min(used / target, 1.0) if target > 0 else 0
print(f"{s['pipeline']:<20} {fmt(used):>12} {fmt(target):>10} {bar(ratio):>10} {eta(used, target, s['started_at'] or 0):>8} {s['workers'] or 0:>8}")
total_used += used
total_target += target
print("-" * 72)
r = min(total_used / total_target, 1.0) if total_target > 0 else 0
print(f"{'TOTAL':<20} {fmt(total_used):>12} {fmt(total_target):>10} {bar(r):>10}")
print()
def render_watch(conn, interval=5):
try:
while True:
os.system("clear" if os.name != "nt" else "cls")
print(f"Token Budget Tracker -- {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("Press Ctrl+C to exit")
render_dashboard(conn)
time.sleep(interval)
except KeyboardInterrupt:
print("\nExiting.")
def render_daily_summary(conn):
since = time.time() - 86400
rows = conn.execute("""
SELECT pipeline, SUM(tokens) as total, COUNT(DISTINCT worker) as workers, COUNT(*) as entries
FROM token_usage WHERE recorded_at >= ? GROUP BY pipeline ORDER BY total DESC
""", (since,)).fetchall()
if not rows:
print("No usage in last 24 hours.")
return
print(f"\nDaily Summary -- last 24 hours")
print(f"{'Pipeline':<20} {'Total Tokens':>14} {'Workers':>8} {'Entries':>8}")
print("-" * 54)
gt = 0
for r in rows:
print(f"{r['pipeline']:<20} {fmt(r['total']):>14} {r['workers']:>8} {r['entries']:>8}")
gt += r["total"]
print("-" * 54)
print(f"{'TOTAL':<20} {fmt(gt):>14}\n")
def check_alerts(conn, pipeline):
row = conn.execute(
"SELECT SUM(u.tokens) as used, COALESCE(b.target_tokens, 0) as target "
"FROM token_usage u LEFT JOIN pipeline_budgets b ON u.pipeline = b.pipeline "
"WHERE u.pipeline = ?", (pipeline,)
).fetchone()
if not row or row["target"] <= 0: return
ratio = row["used"] / row["target"]
for t in ALERT_THRESHOLDS:
if ratio >= t:
existing = conn.execute("SELECT 1 FROM alerts_sent WHERE pipeline = ? AND threshold = ?", (pipeline, t)).fetchone()
if not existing:
print(f"⚠️ BUDGET ALERT: {pipeline} at {int(t*100)}% ({fmt(row['used'])}/{fmt(row['target'])})", file=sys.stderr)
conn.execute("INSERT INTO alerts_sent (pipeline, threshold, sent_at) VALUES (?, ?, ?)", (pipeline, t, time.time()))
conn.commit()
def set_budget(conn, pipeline, target):
conn.execute("INSERT OR REPLACE INTO pipeline_budgets (pipeline, target_tokens, updated_at) VALUES (?, ?, ?)",
(pipeline, int(target), time.time()))
conn.execute("DELETE FROM alerts_sent WHERE pipeline = ?", (pipeline,))
conn.commit()
print(f"Budget set: {pipeline} = {fmt(int(target))} tokens")
def main():
parser = argparse.ArgumentParser(description="Token Budget Tracker")
parser.add_argument("--watch", action="store_true")
parser.add_argument("--watch-interval", type=int, default=5)
parser.add_argument("--summary", action="store_true")
parser.add_argument("--log", nargs=3, metavar=("PIPELINE", "WORKER", "TOKENS"))
parser.add_argument("--budget", nargs=2, metavar=("PIPELINE", "TARGET"))
parser.add_argument("--db", type=str, default=str(DB_PATH))
args = parser.parse_args()
global DB_PATH
DB_PATH = Path(args.db)
conn = get_db()
if args.log:
log_usage(conn, args.log[0], args.log[1], int(args.log[2]))
print(f"Logged: {args.log[0]}/{args.log[1]} = {fmt(int(args.log[2]))} tokens")
elif args.budget:
set_budget(conn, args.budget[0], args.budget[1])
elif args.summary:
render_daily_summary(conn)
elif args.watch:
render_watch(conn, interval=args.watch_interval)
else:
render_dashboard(conn)
conn.close()
if __name__ == "__main__":
main()

View File

@@ -76,15 +76,20 @@ def record_usage(conn: sqlite3.Connection, pipeline: str, worker: str, tokens: i
conn.commit()
def normalize_since(since: str) -> str:
"""Normalize ISO timestamps for SQLite datetime comparisons."""
return since.replace("T", " ")
def get_usage_since(conn: sqlite3.Connection, since: str) -> Dict[str, int]:
"""Get total tokens per pipeline since a datetime."""
cursor = conn.execute("""
SELECT pipeline, SUM(tokens) as total
FROM token_usage
WHERE recorded_at >= ?
WHERE datetime(recorded_at) >= datetime(?)
GROUP BY pipeline
ORDER BY total DESC
""", (since,))
""", (normalize_since(since),))
return {row[0]: row[1] for row in cursor.fetchall()}
@@ -94,10 +99,10 @@ def get_hourly_usage(conn: sqlite3.Connection, pipeline: str, hours: int = 24) -
cursor = conn.execute("""
SELECT hour_key, SUM(tokens) as total
FROM token_usage
WHERE pipeline = ? AND recorded_at >= ?
WHERE pipeline = ? AND datetime(recorded_at) >= datetime(?)
GROUP BY hour_key
ORDER BY hour_key
""", (pipeline, since))
""", (pipeline, normalize_since(since)))
return cursor.fetchall()
@@ -106,10 +111,10 @@ def get_worker_usage(conn: sqlite3.Connection, pipeline: str, since: str) -> Dic
cursor = conn.execute("""
SELECT worker, SUM(tokens) as total
FROM token_usage
WHERE pipeline = ? AND recorded_at >= ?
WHERE pipeline = ? AND datetime(recorded_at) >= datetime(?)
GROUP BY worker
ORDER BY total DESC
""", (pipeline, since))
""", (pipeline, normalize_since(since)))
return {row[0]: row[1] for row in cursor.fetchall()}

View File

@@ -5,11 +5,12 @@ Tests for scripts/token_tracker.py — Token Budget Tracker.
import json
import os
import sqlite3
import subprocess
import sys
import tempfile
import unittest
from pathlib import Path
import sys
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from token_tracker import (
get_db,
@@ -155,5 +156,37 @@ class TestBudgets(unittest.TestCase):
self.assertIn("p1", loaded)
class TestWrapperScript(unittest.TestCase):
def test_hyphen_wrapper_summary_works_with_custom_db(self):
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "wrapper.db"
budgets_path = Path(tmpdir) / "budgets.json"
budgets_path.write_text(json.dumps({"knowledge-mine": 200_000_000}))
conn = get_db(db_path)
try:
record_usage(conn, "knowledge-mine", "worker-1", 123456)
finally:
conn.close()
repo_root = Path(__file__).parent.parent
wrapper = repo_root / "scripts" / "token-tracker.py"
result = subprocess.run(
[
sys.executable,
str(wrapper),
"--summary",
"--db", str(db_path),
"--budgets-file", str(budgets_path),
],
cwd=str(repo_root),
capture_output=True,
text=True,
)
self.assertEqual(result.returncode, 0, result.stderr)
self.assertIn("DAILY SUMMARY", result.stdout)
self.assertIn("knowledge-mine", result.stdout)
self.assertIn("123.5K", result.stdout)
if __name__ == "__main__":
unittest.main()