Compare commits
1 Commits
step35/443
...
fix/634
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
213fe0ca91 |
@@ -19,6 +19,20 @@ huey = SqliteHuey(
|
|||||||
# === Token Tracking ===
|
# === Token Tracking ===
|
||||||
TOKEN_LOG = Path.home() / ".hermes" / "token_usage.jsonl"
|
TOKEN_LOG = Path.home() / ".hermes" / "token_usage.jsonl"
|
||||||
|
|
||||||
|
try:
|
||||||
|
from scripts.token_budget import can_afford, get_remaining, record_usage
|
||||||
|
except ImportError:
|
||||||
|
can_afford = None
|
||||||
|
get_remaining = None
|
||||||
|
record_usage = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
from scripts.token_tracker import get_db as get_token_tracker_db
|
||||||
|
from scripts.token_tracker import record_usage as token_tracker_record_usage
|
||||||
|
except ImportError:
|
||||||
|
get_token_tracker_db = None
|
||||||
|
token_tracker_record_usage = None
|
||||||
|
|
||||||
|
|
||||||
def log_token_usage(task_name, result):
|
def log_token_usage(task_name, result):
|
||||||
"""Log token usage from a completed pipeline task.
|
"""Log token usage from a completed pipeline task.
|
||||||
@@ -26,7 +40,8 @@ def log_token_usage(task_name, result):
|
|||||||
Reads input_tokens/output_tokens from the agent result dict.
|
Reads input_tokens/output_tokens from the agent result dict.
|
||||||
Auto-detects pipeline name from task context.
|
Auto-detects pipeline name from task context.
|
||||||
Appends to JSONL for downstream analysis.
|
Appends to JSONL for downstream analysis.
|
||||||
Also records to token_budget for daily enforcement.
|
Also records to token_budget for daily enforcement and token_tracker for
|
||||||
|
pipeline-level usage reporting.
|
||||||
"""
|
"""
|
||||||
if not isinstance(result, dict):
|
if not isinstance(result, dict):
|
||||||
return
|
return
|
||||||
@@ -55,18 +70,37 @@ def log_token_usage(task_name, result):
|
|||||||
f.write(json.dumps(entry) + "\n")
|
f.write(json.dumps(entry) + "\n")
|
||||||
|
|
||||||
# Record to token budget for daily enforcement
|
# Record to token budget for daily enforcement
|
||||||
|
if record_usage is not None:
|
||||||
try:
|
try:
|
||||||
from scripts.token_budget import record_usage
|
|
||||||
record_usage(pipeline, input_tokens, output_tokens)
|
record_usage(pipeline, input_tokens, output_tokens)
|
||||||
logger.info(f"Budget updated: {pipeline} +{entry['total_tokens']} tokens")
|
logger.info(f"Budget updated: {pipeline} +{entry['total_tokens']} tokens")
|
||||||
except ImportError:
|
except ImportError:
|
||||||
logger.debug("token_budget not available, skipping budget update")
|
logger.debug("token_budget not available, skipping budget update")
|
||||||
|
|
||||||
|
# Record to token tracker for pipeline dashboard/alerts
|
||||||
|
if get_token_tracker_db is not None and token_tracker_record_usage is not None:
|
||||||
|
conn = None
|
||||||
|
try:
|
||||||
|
conn = get_token_tracker_db()
|
||||||
|
token_tracker_record_usage(conn, pipeline, task_name, entry["total_tokens"])
|
||||||
|
logger.info(f"Token tracker updated: {pipeline}/{task_name} +{entry['total_tokens']} tokens")
|
||||||
|
except ImportError:
|
||||||
|
logger.debug("token_tracker not available, skipping tracker update")
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning(f"token_tracker update failed for {pipeline}: {exc}")
|
||||||
|
finally:
|
||||||
|
if conn is not None:
|
||||||
|
close = getattr(conn, "close", None)
|
||||||
|
if callable(close):
|
||||||
|
close()
|
||||||
|
|
||||||
|
|
||||||
def check_budget(pipeline: str, estimated_tokens: int) -> bool:
|
def check_budget(pipeline: str, estimated_tokens: int) -> bool:
|
||||||
"""Check if there's enough budget for a pipeline run."""
|
"""Check if there's enough budget for a pipeline run."""
|
||||||
|
if can_afford is None or get_remaining is None:
|
||||||
|
return True # No budget module = no enforcement
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from scripts.token_budget import can_afford, get_remaining
|
|
||||||
remaining = get_remaining()
|
remaining = get_remaining()
|
||||||
if not can_afford(estimated_tokens):
|
if not can_afford(estimated_tokens):
|
||||||
logger.warning(
|
logger.warning(
|
||||||
@@ -78,7 +112,6 @@ def check_budget(pipeline: str, estimated_tokens: int) -> bool:
|
|||||||
except ImportError:
|
except ImportError:
|
||||||
return True # No budget module = no enforcement
|
return True # No budget module = no enforcement
|
||||||
|
|
||||||
|
|
||||||
@huey.signal(signals.SIGNAL_COMPLETE)
|
@huey.signal(signals.SIGNAL_COMPLETE)
|
||||||
def on_task_complete(signal, task, task_value=None, **kwargs):
|
def on_task_complete(signal, task, task_value=None, **kwargs):
|
||||||
"""Huey hook: log token usage after each pipeline task completes."""
|
"""Huey hook: log token usage after each pipeline task completes."""
|
||||||
|
|||||||
@@ -80,6 +80,20 @@ class TestLogTokenUsage:
|
|||||||
line = json.loads(log_file.read_text().strip())
|
line = json.loads(log_file.read_text().strip())
|
||||||
assert line["pipeline"] == "knowledge-mine"
|
assert line["pipeline"] == "knowledge-mine"
|
||||||
|
|
||||||
|
def test_records_to_token_tracker(self, tmp_path):
|
||||||
|
"""Should record total tokens to token_tracker for automatic pipeline logging."""
|
||||||
|
log_file = tmp_path / "token_usage.jsonl"
|
||||||
|
mock_conn = MagicMock()
|
||||||
|
mock_tracker = MagicMock()
|
||||||
|
with patch("orchestration.TOKEN_LOG", log_file), patch("orchestration.record_usage"), patch("orchestration.get_token_tracker_db", return_value=mock_conn), patch("orchestration.token_tracker_record_usage", mock_tracker):
|
||||||
|
from orchestration import log_token_usage
|
||||||
|
log_token_usage("knowledge_mine_task", {
|
||||||
|
"input_tokens": 10,
|
||||||
|
"output_tokens": 20,
|
||||||
|
})
|
||||||
|
|
||||||
|
mock_tracker.assert_called_once_with(mock_conn, "knowledge-mine", "knowledge_mine_task", 30)
|
||||||
|
|
||||||
|
|
||||||
class TestCheckBudget:
|
class TestCheckBudget:
|
||||||
"""Test check_budget function."""
|
"""Test check_budget function."""
|
||||||
|
|||||||
Reference in New Issue
Block a user