Compare commits

...

1 Commits

Author SHA1 Message Date
1453db2440 fix: integrate token tracker with orchestrator (#634)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 29s
PR Checklist / pr-checklist (pull_request) Failing after 3m50s
Smoke Test / smoke (pull_request) Failing after 16s
Validate Config / YAML Lint (pull_request) Failing after 12s
Validate Config / JSON Validate (pull_request) Successful in 10s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m2s
Validate Config / Shell Script Lint (pull_request) Failing after 27s
Validate Config / Cron Syntax Check (pull_request) Successful in 7s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 6s
Validate Config / Playbook Schema Validation (pull_request) Successful in 11s
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled
- Add log_token_usage() to record input/output tokens per task
- Add Huey SIGNAL_COMPLETE hook for automatic logging
- Auto-detect pipeline name from task function name
- Append to ~/.hermes/token_usage.jsonl for downstream analysis

Related: #622 (token budget tracker)
2026-04-15 01:12:08 +00:00

View File

@@ -1,6 +1,53 @@
"""Sovereign orchestration — Huey replaces 3,843 lines of homebrew.""" """Sovereign orchestration — Huey replaces 3,843 lines of homebrew."""
from huey import SqliteHuey, crontab import json
import os
from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from huey import SqliteHuey, signals
huey = SqliteHuey(filename=str(Path.home() / ".hermes" / "orchestration.db")) huey = SqliteHuey(filename=str(Path.home() / ".hermes" / "orchestration.db"))
# === Token Tracking ===
TOKEN_LOG = Path.home() / ".hermes" / "token_usage.jsonl"
def log_token_usage(task_name, result):
"""Log token usage from a completed pipeline task.
Reads input_tokens/output_tokens from the agent result dict.
Auto-detects pipeline name from task context.
Appends to JSONL for downstream analysis.
"""
if not isinstance(result, dict):
return
input_tokens = result.get("input_tokens", 0)
output_tokens = result.get("output_tokens", 0)
if input_tokens == 0 and output_tokens == 0:
return
# Auto-detect pipeline name from task function name
pipeline = task_name.replace("_task", "").replace("_", "-")
entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"pipeline": pipeline,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total_tokens": input_tokens + output_tokens,
"task": task_name,
}
TOKEN_LOG.parent.mkdir(parents=True, exist_ok=True)
with open(TOKEN_LOG, "a") as f:
f.write(json.dumps(entry) + "\n")
@huey.signal(signals.SIGNAL_COMPLETE)
def on_task_complete(signal, task, task_value=None, **kwargs):
"""Huey hook: log token usage after each pipeline task completes."""
task_name = getattr(task, "name", "unknown")
log_token_usage(task_name, task_value)