Compare commits

...

1 Commits

Author SHA1 Message Date
Alexander Whitestone
213fe0ca91 fix: integrate token tracker auto-logging with orchestration (#634)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 21s
Architecture Lint / Linter Tests (pull_request) Successful in 25s
Validate Config / YAML Lint (pull_request) Failing after 16s
Validate Config / JSON Validate (pull_request) Successful in 18s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 59s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Shell Script Lint (pull_request) Failing after 1m2s
Validate Config / Cron Syntax Check (pull_request) Successful in 11s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 10s
Validate Config / Playbook Schema Validation (pull_request) Successful in 23s
Architecture Lint / Lint Repository (pull_request) Failing after 22s
PR Checklist / pr-checklist (pull_request) Successful in 5m28s
2026-04-22 20:38:06 -04:00
2 changed files with 56 additions and 9 deletions

View File

@@ -19,6 +19,20 @@ huey = SqliteHuey(
# === Token Tracking === # === Token Tracking ===
TOKEN_LOG = Path.home() / ".hermes" / "token_usage.jsonl" TOKEN_LOG = Path.home() / ".hermes" / "token_usage.jsonl"
try:
from scripts.token_budget import can_afford, get_remaining, record_usage
except ImportError:
can_afford = None
get_remaining = None
record_usage = None
try:
from scripts.token_tracker import get_db as get_token_tracker_db
from scripts.token_tracker import record_usage as token_tracker_record_usage
except ImportError:
get_token_tracker_db = None
token_tracker_record_usage = None
def log_token_usage(task_name, result): def log_token_usage(task_name, result):
"""Log token usage from a completed pipeline task. """Log token usage from a completed pipeline task.
@@ -26,7 +40,8 @@ def log_token_usage(task_name, result):
Reads input_tokens/output_tokens from the agent result dict. Reads input_tokens/output_tokens from the agent result dict.
Auto-detects pipeline name from task context. Auto-detects pipeline name from task context.
Appends to JSONL for downstream analysis. Appends to JSONL for downstream analysis.
Also records to token_budget for daily enforcement. Also records to token_budget for daily enforcement and token_tracker for
pipeline-level usage reporting.
""" """
if not isinstance(result, dict): if not isinstance(result, dict):
return return
@@ -55,18 +70,37 @@ def log_token_usage(task_name, result):
f.write(json.dumps(entry) + "\n") f.write(json.dumps(entry) + "\n")
# Record to token budget for daily enforcement # Record to token budget for daily enforcement
if record_usage is not None:
try: try:
from scripts.token_budget import record_usage
record_usage(pipeline, input_tokens, output_tokens) record_usage(pipeline, input_tokens, output_tokens)
logger.info(f"Budget updated: {pipeline} +{entry['total_tokens']} tokens") logger.info(f"Budget updated: {pipeline} +{entry['total_tokens']} tokens")
except ImportError: except ImportError:
logger.debug("token_budget not available, skipping budget update") logger.debug("token_budget not available, skipping budget update")
# Record to token tracker for pipeline dashboard/alerts
if get_token_tracker_db is not None and token_tracker_record_usage is not None:
conn = None
try:
conn = get_token_tracker_db()
token_tracker_record_usage(conn, pipeline, task_name, entry["total_tokens"])
logger.info(f"Token tracker updated: {pipeline}/{task_name} +{entry['total_tokens']} tokens")
except ImportError:
logger.debug("token_tracker not available, skipping tracker update")
except Exception as exc:
logger.warning(f"token_tracker update failed for {pipeline}: {exc}")
finally:
if conn is not None:
close = getattr(conn, "close", None)
if callable(close):
close()
def check_budget(pipeline: str, estimated_tokens: int) -> bool: def check_budget(pipeline: str, estimated_tokens: int) -> bool:
"""Check if there's enough budget for a pipeline run.""" """Check if there's enough budget for a pipeline run."""
if can_afford is None or get_remaining is None:
return True # No budget module = no enforcement
try: try:
from scripts.token_budget import can_afford, get_remaining
remaining = get_remaining() remaining = get_remaining()
if not can_afford(estimated_tokens): if not can_afford(estimated_tokens):
logger.warning( logger.warning(
@@ -78,7 +112,6 @@ def check_budget(pipeline: str, estimated_tokens: int) -> bool:
except ImportError: except ImportError:
return True # No budget module = no enforcement return True # No budget module = no enforcement
@huey.signal(signals.SIGNAL_COMPLETE) @huey.signal(signals.SIGNAL_COMPLETE)
def on_task_complete(signal, task, task_value=None, **kwargs): def on_task_complete(signal, task, task_value=None, **kwargs):
"""Huey hook: log token usage after each pipeline task completes.""" """Huey hook: log token usage after each pipeline task completes."""

View File

@@ -80,6 +80,20 @@ class TestLogTokenUsage:
line = json.loads(log_file.read_text().strip()) line = json.loads(log_file.read_text().strip())
assert line["pipeline"] == "knowledge-mine" assert line["pipeline"] == "knowledge-mine"
def test_records_to_token_tracker(self, tmp_path):
"""Should record total tokens to token_tracker for automatic pipeline logging."""
log_file = tmp_path / "token_usage.jsonl"
mock_conn = MagicMock()
mock_tracker = MagicMock()
with patch("orchestration.TOKEN_LOG", log_file), patch("orchestration.record_usage"), patch("orchestration.get_token_tracker_db", return_value=mock_conn), patch("orchestration.token_tracker_record_usage", mock_tracker):
from orchestration import log_token_usage
log_token_usage("knowledge_mine_task", {
"input_tokens": 10,
"output_tokens": 20,
})
mock_tracker.assert_called_once_with(mock_conn, "knowledge-mine", "knowledge_mine_task", 30)
class TestCheckBudget: class TestCheckBudget:
"""Test check_budget function.""" """Test check_budget function."""