Compare commits

...

3 Commits

Author SHA1 Message Date
Alexander Whitestone
91d94e29e8 fix(pipeline): repair token tracker CLI summary (#622)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 24s
Architecture Lint / Linter Tests (pull_request) Successful in 29s
Validate Config / YAML Lint (pull_request) Failing after 16s
Validate Config / JSON Validate (pull_request) Successful in 21s
Validate Config / Shell Script Lint (pull_request) Failing after 1m8s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m21s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Cron Syntax Check (pull_request) Successful in 15s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 15s
Validate Config / Playbook Schema Validation (pull_request) Successful in 30s
Architecture Lint / Lint Repository (pull_request) Failing after 28s
PR Checklist / pr-checklist (pull_request) Successful in 5m18s
Fix the hyphenated token-tracker entrypoint and normalize SQLite time filtering
so same-day usage appears in the summary dashboard.
2026-04-22 11:23:43 -04:00
ae8c1d46ae Merge pull request 'feat(#407): Phase progression tracker with auto-eval, Telegram daily post, and blockers' (#857) from fix/407 into main
Some checks failed
Architecture Lint / Linter Tests (push) Successful in 28s
Smoke Test / smoke (push) Failing after 21s
Validate Config / YAML Lint (push) Failing after 9s
Validate Config / JSON Validate (push) Successful in 12s
Validate Config / Python Syntax & Import Check (push) Failing after 35s
Validate Config / Python Test Suite (push) Has been skipped
Validate Config / Shell Script Lint (push) Failing after 38s
Validate Config / Cron Syntax Check (push) Successful in 7s
Validate Config / Deploy Script Dry Run (push) Successful in 7s
Validate Config / Playbook Schema Validation (push) Successful in 16s
Architecture Lint / Lint Repository (push) Failing after 20s
2026-04-22 07:36:26 +00:00
Alexander Whitestone
508441acb4 feat(#407): Phase progression tracker with auto-eval, Telegram daily post, and blockers
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 25s
Smoke Test / smoke (pull_request) Failing after 23s
Validate Config / YAML Lint (pull_request) Failing after 16s
Validate Config / JSON Validate (pull_request) Successful in 19s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m2s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Shell Script Lint (pull_request) Failing after 1m6s
Validate Config / Cron Syntax Check (pull_request) Successful in 14s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 14s
Validate Config / Playbook Schema Validation (pull_request) Successful in 28s
Architecture Lint / Lint Repository (pull_request) Failing after 27s
PR Checklist / pr-checklist (pull_request) Failing after 11m41s
2026-04-22 03:34:36 -04:00
4 changed files with 380 additions and 262 deletions

View File

@@ -4,111 +4,365 @@
Part of the Gemini Sovereign Infrastructure Suite.
Tracks the fleet's progress through the Paperclips-inspired evolution arc.
Usage:
python3 scripts/phase_tracker.py status # Show current state
python3 scripts/phase_tracker.py evaluate # Auto-evaluate checkable milestones
python3 scripts/phase_tracker.py complete M4 # Mark milestone complete
python3 scripts/phase_tracker.py telegram # Post daily update to Telegram
python3 scripts/phase_tracker.py daily # evaluate + telegram
"""
import os
import sys
import json
import re
import argparse
import urllib.request
import subprocess
from pathlib import Path
from datetime import datetime, timezone, timedelta
MILESTONES_FILE = "fleet/milestones.md"
COMPLETED_FILE = "fleet/completed_milestones.json"
LOG_DIR = Path(os.path.expanduser("~/.local/timmy/fleet-health"))
UPTIME_FILE = LOG_DIR / "uptime.json"
TELEGRAM_TOKEN_PATHS = [
Path.home() / ".config" / "timmy" / "telegram_bot_token",
Path.home() / ".hermes" / "telegram_bot_token",
Path.home() / ".hermes" / "telegram_token",
]
TELEGRAM_CHAT = os.environ.get("TELEGRAM_HOME_CHANNEL", "-1003664764329")
HOSTS = {
"ezra": {"ip": "143.198.27.163"},
"allegro": {"ip": "167.99.126.228"},
"bezalel": {"ip": "159.203.146.185"},
}
def _find_repo_root() -> Path:
script_dir = Path(__file__).resolve().parent
return script_dir.parent
def _read_token() -> str | None:
for p in TELEGRAM_TOKEN_PATHS:
if p.exists():
return p.read_text().strip()
return os.environ.get("TELEGRAM_BOT_TOKEN") or None
def telegram_send(text: str) -> bool:
token = _read_token()
if not token:
print("[WARN] No Telegram token found.", file=sys.stderr)
return False
url = f"https://api.telegram.org/bot{token}/sendMessage"
body = json.dumps({"chat_id": TELEGRAM_CHAT, "text": text, "parse_mode": "HTML"}).encode()
req = urllib.request.Request(url, data=body, headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status == 200
except Exception as e:
print(f"[WARN] Telegram send failed: {e}", file=sys.stderr)
return False
class Milestone:
def __init__(self, m_id: str, title: str, trigger: str, message: str):
self.id = m_id
self.title = title
self.trigger = trigger
self.message = message
class Phase:
def __init__(self, name: str, number: int, unlock_condition: str | None):
self.name = name
self.number = number
self.unlock_condition = unlock_condition
self.milestones: list[Milestone] = []
class PhaseTracker:
def __init__(self):
# Find files relative to repo root
script_dir = os.path.dirname(os.path.abspath(__file__))
repo_root = os.path.dirname(script_dir)
self.milestones_path = os.path.join(repo_root, MILESTONES_FILE)
self.completed_path = os.path.join(repo_root, COMPLETED_FILE)
self.milestones = self.parse_milestones()
self.completed = self.load_completed()
self.repo_root = _find_repo_root()
self.milestones_path = self.repo_root / MILESTONES_FILE
self.completed_path = self.repo_root / COMPLETED_FILE
self.phases: list[Phase] = self._parse_milestones()
self.completed: set[str] = self._load_completed()
def _parse_milestones(self) -> list[Phase]:
if not self.milestones_path.exists():
return []
content = self.milestones_path.read_text()
phases: list[Phase] = []
current_phase: Phase | None = None
for line in content.splitlines():
phase_match = re.match(r"##\s*Phase\s*(\d+):\s*(.+?)\s*(?:\(([^)]+)\))?\s*$", line)
if phase_match:
num = int(phase_match.group(1))
name = phase_match.group(2).strip()
unlock = phase_match.group(3)
current_phase = Phase(name, num, unlock)
phases.append(current_phase)
continue
m_match = re.match(r"###\s*(M\d+):\s*(.+)$", line)
if m_match and current_phase is not None:
m_id = m_match.group(1)
title = m_match.group(2).strip()
current_phase.milestones.append(Milestone(m_id, title, "", ""))
continue
if line.startswith("**Trigger:**") and current_phase and current_phase.milestones:
current_phase.milestones[-1].trigger = line.replace("**Trigger:**", "").strip()
continue
if line.startswith("**Message:**") and current_phase and current_phase.milestones:
current_phase.milestones[-1].message = line.replace("**Message:**", "").strip().strip('"')
continue
def parse_milestones(self):
if not os.path.exists(self.milestones_path):
return {}
with open(self.milestones_path, "r") as f:
content = f.read()
phases = {}
current_phase = None
for line in content.split("\n"):
if line.startswith("## Phase"):
current_phase = line.replace("## ", "").strip()
phases[current_phase] = []
elif line.startswith("### M"):
m_id = line.split(":")[0].replace("### ", "").strip()
title = line.split(":")[1].strip()
phases[current_phase].append({"id": m_id, "title": title})
return phases
def load_completed(self):
if os.path.exists(self.completed_path):
with open(self.completed_path, "r") as f:
try:
return json.load(f)
except:
return []
return []
def _load_completed(self) -> set[str]:
if self.completed_path.exists():
try:
data = json.loads(self.completed_path.read_text())
if isinstance(data, list):
return set(data)
except Exception:
pass
return set()
def save_completed(self):
with open(self.completed_path, "w") as f:
json.dump(self.completed, f, indent=2)
self.completed_path.write_text(json.dumps(sorted(self.completed), indent=2))
def show_progress(self):
print("--- Fleet Phase Progression Tracker ---")
total_milestones = 0
total_completed = 0
if not self.milestones:
print("[ERROR] No milestones found in fleet/milestones.md")
return
for phase, ms in self.milestones.items():
print(f"\n{phase}")
for m in ms:
total_milestones += 1
done = m["id"] in self.completed
if done:
total_completed += 1
status = "" if done else ""
print(f" {status} {m['id']}: {m['title']}")
percent = (total_completed / total_milestones) * 100 if total_milestones > 0 else 0
print(f"\nOverall Progress: {total_completed}/{total_milestones} ({percent:.1f}%)")
def mark_complete(self, m_id: str):
def mark_complete(self, m_id: str) -> bool:
m_id = m_id.upper()
exists = any(m.id == m_id for p in self.phases for m in p.milestones)
if not exists:
print(f"[ERROR] Unknown milestone: {m_id}")
return False
if m_id not in self.completed:
self.completed.append(m_id)
self.completed.add(m_id)
self.save_completed()
print(f"[SUCCESS] Marked {m_id} as complete.")
return True
print(f"[INFO] {m_id} is already complete.")
return True
def _get_phase_state(self) -> tuple[int, float, list[str], list[str]]:
"""Returns (current_phase_number, decimal_progress, blockers, next_milestones)."""
blockers = []
next_milestones = []
for phase in self.phases:
phase_completed = sum(1 for m in phase.milestones if m.id in self.completed)
phase_total = len(phase.milestones)
if phase_total == 0:
continue
if phase_completed < phase_total:
progress = phase_completed / phase_total
decimal = phase.number + progress
# Find next incomplete milestone
for m in phase.milestones:
if m.id not in self.completed:
next_milestones.append(f"{m.id}: {m.title}")
if m.trigger:
blockers.append(f"{m.id}: {m.trigger}")
break
# Phase unlock condition as blocker if near end
if phase_completed == phase_total - 1 and phase.unlock_condition:
blockers.append(f"Unlock Phase {phase.number + 1}: {phase.unlock_condition}")
return phase.number, decimal, blockers, next_milestones
# All done
last = self.phases[-1] if self.phases else None
if last:
return last.number, float(last.number) + 1.0, ["All phases complete."], []
return 0, 0.0, ["No milestones defined."], []
def show_progress(self):
phase_num, decimal, blockers, next_ms = self._get_phase_state()
total_ms = sum(len(p.milestones) for p in self.phases)
total_completed = len(self.completed)
overall_pct = (total_completed / total_ms * 100) if total_ms else 0
print("=" * 50)
print(" Fleet Phase Progression Tracker")
print("=" * 50)
print(f"\nCurrent Phase: Phase {phase_num}{self.phases[phase_num - 1].name if phase_num <= len(self.phases) else 'Complete'}")
print(f"Decimal Progress: Phase {decimal:.1f}")
print(f"Overall: {total_completed}/{total_ms} milestones ({overall_pct:.1f}%)")
print("\n--- Milestones ---")
for phase in self.phases:
done = sum(1 for m in phase.milestones if m.id in self.completed)
total = len(phase.milestones)
status = "" if done == total else ""
print(f"\n{status} Phase {phase.number}: {phase.name} ({done}/{total})")
for m in phase.milestones:
mark = "" if m.id in self.completed else ""
print(f" {mark} {m.id}: {m.title}")
print("\n--- Next Up ---")
for nm in next_ms[:3]:
print(f"{nm}")
print("\n--- Blockers ---")
for b in blockers[:5]:
print(f" ⚠️ {b}")
if not blockers:
print(" 🚀 Nothing blocking.")
print()
def summary_text(self) -> str:
phase_num, decimal, blockers, next_ms = self._get_phase_state()
total_ms = sum(len(p.milestones) for p in self.phases)
total_completed = len(self.completed)
overall_pct = (total_completed / total_ms * 100) if total_ms else 0
phase_name = self.phases[phase_num - 1].name if phase_num <= len(self.phases) else "Complete"
next_phase = phase_num + 1 if phase_num < len(self.phases) else phase_num
progress_to_next = (decimal - phase_num) * 100
lines = [
f"Fleet: Phase {decimal:.1f} ({progress_to_next:.0f}% to Phase {next_phase})",
f"Phase: {phase_num}{phase_name}",
f"Overall: {total_completed}/{total_ms} milestones ({overall_pct:.1f}%)",
]
if next_ms:
lines.append(f"Next: {next_ms[0]}")
if blockers and blockers[0] != "All phases complete.":
lines.append(f"Blocker: {blockers[0]}")
return "\n".join(lines)
# === Auto-evaluation heuristics ===
def _eval_file_exists(self, path: str) -> bool:
return (self.repo_root / path).exists()
def _eval_command(self, cmd: str) -> bool:
try:
result = subprocess.run(cmd, shell=True, capture_output=True, timeout=10)
return result.returncode == 0
except Exception:
return False
def _eval_uptime(self, target: float) -> bool:
if not UPTIME_FILE.exists():
return False
try:
data = json.loads(UPTIME_FILE.read_text())
uptime = data.get("uptime_30d_percent", 0.0)
return uptime >= target
except Exception:
return False
def _eval_local_model_multi(self) -> bool:
count = 0
for host in HOSTS:
if self._eval_command(f"ssh -o ConnectTimeout=5 {host} 'pgrep -f ollama >/dev/null 2>&1'"):
count += 1
return count >= 2
def _eval_zero_manual_restarts(self, days: int = 7) -> bool:
log = LOG_DIR / "auto_restart.log"
if not log.exists():
return False
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
try:
with open(log) as f:
for line in f:
if "manual restart" in line.lower():
# crude timestamp parse
try:
ts = datetime.fromisoformat(line[:19])
if ts > cutoff:
return False
except Exception:
continue
return True
except Exception:
return False
def evaluate(self):
"""Auto-check milestones where we have heuristics."""
print("[EVAL] Running automatic milestone checks...\n")
checks = [
("M1", self._eval_command, "python3 fleet/health_check.py --dry-run 2>/dev/null || python3 fleet/health_check.py 2>&1 | head -1 >/dev/null"),
("M2", self._eval_command, "test -f ~/.local/timmy/fleet-health/auto_restart.log && grep -q 'restarted' ~/.local/timmy/fleet-health/auto_restart.log"),
("M3", self._eval_command, "test -d ~/.local/timmy/backups && ls ~/.local/timmy/backups | grep -q ."),
("M4", self._eval_uptime, 95.0),
("M5", self._eval_uptime, 97.0),
("M6", self._eval_zero_manual_restarts, 7),
("M9", self._eval_uptime, 98.0),
("M11", self._eval_local_model_multi, None),
]
newly_found = []
for m_id, check_fn, arg in checks:
if m_id in self.completed:
continue
result = check_fn(arg) if arg is not None else check_fn()
if result:
print(f"{m_id} appears satisfied — marking complete.")
self.completed.add(m_id)
newly_found.append(m_id)
else:
print(f"{m_id} not yet satisfied.")
if newly_found:
self.save_completed()
print(f"\n[SUCCESS] Auto-completed {len(newly_found)} milestone(s): {', '.join(newly_found)}")
else:
print(f"[INFO] {m_id} is already complete.")
print("\n[INFO] No new milestones auto-detected.")
def daily(self):
self.evaluate()
text = self.summary_text()
print(text)
ok = telegram_send(text)
if ok:
print("\n[TELEGRAM] Daily update sent.")
else:
print("\n[TELEGRAM] Failed to send update.")
def main():
parser = argparse.ArgumentParser(description="Gemini Phase Tracker")
parser = argparse.ArgumentParser(description="Fleet Phase Progression Tracker")
subparsers = parser.add_subparsers(dest="command")
subparsers.add_parser("status", help="Show current progress")
subparsers.add_parser("evaluate", help="Auto-evaluate checkable milestones")
subparsers.add_parser("telegram", help="Post summary to Telegram")
subparsers.add_parser("daily", help="Evaluate then post to Telegram")
complete_parser = subparsers.add_parser("complete", help="Mark a milestone as complete")
complete_parser.add_argument("id", help="Milestone ID (e.g. M1)")
args = parser.parse_args()
tracker = PhaseTracker()
if args.command == "status":
tracker.show_progress()
elif args.command == "evaluate":
tracker.evaluate()
elif args.command == "telegram":
ok = telegram_send(tracker.summary_text())
sys.exit(0 if ok else 1)
elif args.command == "daily":
tracker.daily()
elif args.command == "complete":
tracker.mark_complete(args.id)
ok = tracker.mark_complete(args.id)
sys.exit(0 if ok else 1)
else:
parser.print_help()
if __name__ == "__main__":
main()

View File

@@ -1,194 +1,20 @@
#!/usr/bin/env python3
"""Token Budget Tracker -- real-time spend dashboard for pipelines."""
"""Compatibility wrapper for the token budget tracker CLI.
Issue #622 asked for a `token-tracker.py` entrypoint. The maintained
implementation lives in `scripts/token_tracker.py`. Keep this thin shim so
operator docs and older calls continue to work.
"""
import argparse, json, os, sqlite3, sys, time
from datetime import datetime
from pathlib import Path
import sys
DB_PATH = Path.home() / ".hermes" / "pipelines" / "token_usage.db"
ALERT_THRESHOLDS = [0.5, 0.8, 1.0]
DEFAULT_BUDGETS = {
"knowledge-mine": 200_000_000,
"training-factory": 215_000_000,
"playground": 16_000_000,
"adversary": 17_000_000,
}
SCRIPT_DIR = Path(__file__).resolve().parent
if str(SCRIPT_DIR) not in sys.path:
sys.path.insert(0, str(SCRIPT_DIR))
def get_db():
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.executescript("""
CREATE TABLE IF NOT EXISTS token_usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline TEXT NOT NULL,
worker TEXT NOT NULL,
tokens INTEGER NOT NULL,
recorded_at REAL NOT NULL,
hour_bucket TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS pipeline_budgets (
pipeline TEXT PRIMARY KEY,
target_tokens INTEGER NOT NULL,
updated_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS alerts_sent (
pipeline TEXT NOT NULL,
threshold REAL NOT NULL,
sent_at REAL NOT NULL,
PRIMARY KEY (pipeline, threshold)
);
CREATE INDEX IF NOT EXISTS idx_usage_pipeline_hour
ON token_usage(pipeline, hour_bucket);
""")
for name, target in DEFAULT_BUDGETS.items():
conn.execute(
"INSERT OR IGNORE INTO pipeline_budgets (pipeline, target_tokens, updated_at) VALUES (?, ?, ?)",
(name, target, time.time())
)
conn.commit()
return conn
from token_tracker import main
def log_usage(conn, pipeline, worker, tokens):
now = time.time()
hour = datetime.now().strftime("%Y-%m-%d %H:00")
conn.execute(
"INSERT INTO token_usage (pipeline, worker, tokens, recorded_at, hour_bucket) VALUES (?, ?, ?, ?, ?)",
(pipeline, worker, tokens, now, hour)
)
conn.commit()
check_alerts(conn, pipeline)
def get_pipeline_stats(conn):
rows = conn.execute("""
SELECT u.pipeline, COALESCE(b.target_tokens, 0) as target,
SUM(u.tokens) as used, MIN(u.recorded_at) as started_at,
COUNT(DISTINCT u.worker) as workers
FROM token_usage u
LEFT JOIN pipeline_budgets b ON u.pipeline = b.pipeline
GROUP BY u.pipeline ORDER BY used DESC
""").fetchall()
return [dict(r) for r in rows]
def fmt(n):
if n >= 1_000_000_000: return f"{n/1_000_000_000:.1f}B"
if n >= 1_000_000: return f"{n/1_000_000:.1f}M"
if n >= 1_000: return f"{n/1_000:.1f}K"
return str(n)
def bar(ratio, w=8):
filled = int(ratio * w)
return "" * filled + "" * (w - filled)
def eta(used, target, started):
if used <= 0 or started <= 0: return "--"
elapsed = (time.time() - started) / 3600
if elapsed <= 0: return "--"
rate = used / elapsed
remaining = target - used
if remaining <= 0: return "DONE"
h = remaining / rate
return f"{h/24:.1f}d" if h >= 24 else f"{h:.1f}h"
def render_dashboard(conn):
stats = get_pipeline_stats(conn)
if not stats:
print("No pipeline data recorded yet.")
return
print()
print(f"{'Pipeline':<20} {'Tokens Used':>12} {'Target':>10} {'Progress':>10} {'ETA':>8} {'Workers':>8}")
print("-" * 72)
total_used = total_target = 0
for s in stats:
used = s["used"] or 0
target = s["target"] or 1
ratio = min(used / target, 1.0) if target > 0 else 0
print(f"{s['pipeline']:<20} {fmt(used):>12} {fmt(target):>10} {bar(ratio):>10} {eta(used, target, s['started_at'] or 0):>8} {s['workers'] or 0:>8}")
total_used += used
total_target += target
print("-" * 72)
r = min(total_used / total_target, 1.0) if total_target > 0 else 0
print(f"{'TOTAL':<20} {fmt(total_used):>12} {fmt(total_target):>10} {bar(r):>10}")
print()
def render_watch(conn, interval=5):
try:
while True:
os.system("clear" if os.name != "nt" else "cls")
print(f"Token Budget Tracker -- {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("Press Ctrl+C to exit")
render_dashboard(conn)
time.sleep(interval)
except KeyboardInterrupt:
print("\nExiting.")
def render_daily_summary(conn):
since = time.time() - 86400
rows = conn.execute("""
SELECT pipeline, SUM(tokens) as total, COUNT(DISTINCT worker) as workers, COUNT(*) as entries
FROM token_usage WHERE recorded_at >= ? GROUP BY pipeline ORDER BY total DESC
""", (since,)).fetchall()
if not rows:
print("No usage in last 24 hours.")
return
print(f"\nDaily Summary -- last 24 hours")
print(f"{'Pipeline':<20} {'Total Tokens':>14} {'Workers':>8} {'Entries':>8}")
print("-" * 54)
gt = 0
for r in rows:
print(f"{r['pipeline']:<20} {fmt(r['total']):>14} {r['workers']:>8} {r['entries']:>8}")
gt += r["total"]
print("-" * 54)
print(f"{'TOTAL':<20} {fmt(gt):>14}\n")
def check_alerts(conn, pipeline):
row = conn.execute(
"SELECT SUM(u.tokens) as used, COALESCE(b.target_tokens, 0) as target "
"FROM token_usage u LEFT JOIN pipeline_budgets b ON u.pipeline = b.pipeline "
"WHERE u.pipeline = ?", (pipeline,)
).fetchone()
if not row or row["target"] <= 0: return
ratio = row["used"] / row["target"]
for t in ALERT_THRESHOLDS:
if ratio >= t:
existing = conn.execute("SELECT 1 FROM alerts_sent WHERE pipeline = ? AND threshold = ?", (pipeline, t)).fetchone()
if not existing:
print(f"⚠️ BUDGET ALERT: {pipeline} at {int(t*100)}% ({fmt(row['used'])}/{fmt(row['target'])})", file=sys.stderr)
conn.execute("INSERT INTO alerts_sent (pipeline, threshold, sent_at) VALUES (?, ?, ?)", (pipeline, t, time.time()))
conn.commit()
def set_budget(conn, pipeline, target):
conn.execute("INSERT OR REPLACE INTO pipeline_budgets (pipeline, target_tokens, updated_at) VALUES (?, ?, ?)",
(pipeline, int(target), time.time()))
conn.execute("DELETE FROM alerts_sent WHERE pipeline = ?", (pipeline,))
conn.commit()
print(f"Budget set: {pipeline} = {fmt(int(target))} tokens")
def main():
parser = argparse.ArgumentParser(description="Token Budget Tracker")
parser.add_argument("--watch", action="store_true")
parser.add_argument("--watch-interval", type=int, default=5)
parser.add_argument("--summary", action="store_true")
parser.add_argument("--log", nargs=3, metavar=("PIPELINE", "WORKER", "TOKENS"))
parser.add_argument("--budget", nargs=2, metavar=("PIPELINE", "TARGET"))
parser.add_argument("--db", type=str, default=str(DB_PATH))
args = parser.parse_args()
global DB_PATH
DB_PATH = Path(args.db)
conn = get_db()
if args.log:
log_usage(conn, args.log[0], args.log[1], int(args.log[2]))
print(f"Logged: {args.log[0]}/{args.log[1]} = {fmt(int(args.log[2]))} tokens")
elif args.budget:
set_budget(conn, args.budget[0], args.budget[1])
elif args.summary:
render_daily_summary(conn)
elif args.watch:
render_watch(conn, interval=args.watch_interval)
else:
render_dashboard(conn)
conn.close()
if __name__ == "__main__":
main()

View File

@@ -76,15 +76,20 @@ def record_usage(conn: sqlite3.Connection, pipeline: str, worker: str, tokens: i
conn.commit()
def normalize_since(since: str) -> str:
"""Normalize ISO timestamps for SQLite datetime comparisons."""
return since.replace("T", " ")
def get_usage_since(conn: sqlite3.Connection, since: str) -> Dict[str, int]:
"""Get total tokens per pipeline since a datetime."""
cursor = conn.execute("""
SELECT pipeline, SUM(tokens) as total
FROM token_usage
WHERE recorded_at >= ?
WHERE datetime(recorded_at) >= datetime(?)
GROUP BY pipeline
ORDER BY total DESC
""", (since,))
""", (normalize_since(since),))
return {row[0]: row[1] for row in cursor.fetchall()}
@@ -94,10 +99,10 @@ def get_hourly_usage(conn: sqlite3.Connection, pipeline: str, hours: int = 24) -
cursor = conn.execute("""
SELECT hour_key, SUM(tokens) as total
FROM token_usage
WHERE pipeline = ? AND recorded_at >= ?
WHERE pipeline = ? AND datetime(recorded_at) >= datetime(?)
GROUP BY hour_key
ORDER BY hour_key
""", (pipeline, since))
""", (pipeline, normalize_since(since)))
return cursor.fetchall()
@@ -106,10 +111,10 @@ def get_worker_usage(conn: sqlite3.Connection, pipeline: str, since: str) -> Dic
cursor = conn.execute("""
SELECT worker, SUM(tokens) as total
FROM token_usage
WHERE pipeline = ? AND recorded_at >= ?
WHERE pipeline = ? AND datetime(recorded_at) >= datetime(?)
GROUP BY worker
ORDER BY total DESC
""", (pipeline, since))
""", (pipeline, normalize_since(since)))
return {row[0]: row[1] for row in cursor.fetchall()}

View File

@@ -5,11 +5,12 @@ Tests for scripts/token_tracker.py — Token Budget Tracker.
import json
import os
import sqlite3
import subprocess
import sys
import tempfile
import unittest
from pathlib import Path
import sys
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from token_tracker import (
get_db,
@@ -155,5 +156,37 @@ class TestBudgets(unittest.TestCase):
self.assertIn("p1", loaded)
class TestWrapperScript(unittest.TestCase):
def test_hyphen_wrapper_summary_works_with_custom_db(self):
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "wrapper.db"
budgets_path = Path(tmpdir) / "budgets.json"
budgets_path.write_text(json.dumps({"knowledge-mine": 200_000_000}))
conn = get_db(db_path)
try:
record_usage(conn, "knowledge-mine", "worker-1", 123456)
finally:
conn.close()
repo_root = Path(__file__).parent.parent
wrapper = repo_root / "scripts" / "token-tracker.py"
result = subprocess.run(
[
sys.executable,
str(wrapper),
"--summary",
"--db", str(db_path),
"--budgets-file", str(budgets_path),
],
cwd=str(repo_root),
capture_output=True,
text=True,
)
self.assertEqual(result.returncode, 0, result.stderr)
self.assertIn("DAILY SUMMARY", result.stdout)
self.assertIn("knowledge-mine", result.stdout)
self.assertIn("123.5K", result.stdout)
if __name__ == "__main__":
unittest.main()