Compare commits

...

1 Commits

Author SHA1 Message Date
Alexander Whitestone
7e4ce70f8f fix: token tracking — persist costs, cron job usage, full pipeline (#316)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 59s
Closes #316, #490, #492

Audit of the full token tracking pipeline found 3 gaps beyond the
base persistence fix:

1. Gateway doesn't persist cost (#492): update_session() didn't accept
   estimated_cost_usd or cost_status. Agent accumulates costs but
   they're silently dropped. Now persisted to SessionEntry.

2. Cron jobs discard tokens (#490): run_job() never extracts token
   counts from the agent instance. Output has no usage data. Now
   extracts and includes Token Usage section in cron output doc.

3. Both gaps fixed in one pass to avoid partial state.

Changes:
- gateway/session.py: update_session() accepts estimated_cost_usd,
  cost_status, input_tokens, output_tokens
- gateway/run.py: extracts all token/cost fields from agent instance,
  passes to update_session() and _session_db.set_token_counts()
- cron/scheduler.py: extracts token counts from agent after
  run_conversation(), includes Token Usage section in output doc
  (both async and sync fallback paths)
2026-04-13 21:32:47 -04:00
3 changed files with 68 additions and 6 deletions

View File

@@ -877,12 +877,24 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
result = agent.run_conversation(prompt)
final_response = result.get("final_response", "") or ""
logged_response = final_response if final_response else "(No response generated)"
# Extract token counts from the agent instance
_cron_input_toks = getattr(agent, "session_prompt_tokens", 0)
_cron_output_toks = getattr(agent, "session_completion_tokens", 0)
_cron_cost_usd = getattr(agent, "session_estimated_cost_usd", 0.0)
output = f"""# Cron Job: {job_name}
**Job ID:** {job_id}
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
**Schedule:** {job.get('schedule_display', 'N/A')}
## Token Usage
- Input tokens: {_cron_input_toks:,}
- Output tokens: {_cron_output_toks:,}
- Estimated cost: ${_cron_cost_usd:.4f}
## Prompt
{prompt}
@@ -961,6 +973,11 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
# for delivery logic (empty response = no delivery).
logged_response = final_response if final_response else "(No response generated)"
# Extract token counts from the agent instance (Issue #316)
_cron_input_toks = getattr(agent, "session_prompt_tokens", 0)
_cron_output_toks = getattr(agent, "session_completion_tokens", 0)
_cron_cost_usd = getattr(agent, "session_estimated_cost_usd", 0.0)
# Check for script failure — both explicit [SCRIPT_FAILED] marker
# and heuristic detection for failures described in natural language.
_script_failed_reason = _detect_script_failure(final_response)
@@ -975,6 +992,12 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
**Schedule:** {job.get('schedule_display', 'N/A')}
## Token Usage
- Input tokens: {_cron_input_toks:,}
- Output tokens: {_cron_output_toks:,}
- Estimated cost: ${_cron_cost_usd:.4f}
## Prompt
{prompt}
@@ -984,13 +1007,19 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
{logged_response}
"""
return False, output, final_response, _script_failed_reason
output = f"""# Cron Job: {job_name}
**Job ID:** {job_id}
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
**Schedule:** {job.get('schedule_display', 'N/A')}
## Token Usage
- Input tokens: {_cron_input_toks:,}
- Output tokens: {_cron_output_toks:,}
- Estimated cost: ${_cron_cost_usd:.4f}
## Prompt
{prompt}

View File

@@ -3075,14 +3075,35 @@ class GatewayRunner:
skip_db=agent_persisted,
)
# Token counts and model are now persisted by the agent directly.
# Keep only last_prompt_tokens here for context-window tracking and
# compression decisions.
# Token counts — persist to SessionEntry and SQLite (Issue #316).
# The agent instance accumulates session_prompt_tokens and
# session_completion_tokens across API calls within a turn.
_agent = agent # from agent_holder
_input_toks = getattr(_agent, "session_prompt_tokens", 0) if _agent else 0
_output_toks = getattr(_agent, "session_completion_tokens", 0) if _agent else 0
_cost_usd = getattr(_agent, "session_estimated_cost_usd", 0.0) if _agent else 0.0
_cost_status = getattr(_agent, "session_cost_status", "unknown") if _agent else "unknown"
self.session_store.update_session(
session_entry.session_key,
last_prompt_tokens=agent_result.get("last_prompt_tokens", 0),
input_tokens=_input_toks,
output_tokens=_output_toks,
estimated_cost_usd=_cost_usd,
cost_status=_cost_status,
)
# Persist to SQLite if session DB is available
if self._session_db and session_entry.session_id:
try:
self._session_db.set_token_counts(
session_entry.session_id,
input_tokens=_input_toks,
output_tokens=_output_toks,
)
except Exception as e:
logger.debug("Failed to persist token counts to SQLite: %s", e)
# Auto voice reply: send TTS audio before the text response
_already_sent = bool(agent_result.get("already_sent"))
if self._should_send_voice_reply(event, response, agent_messages, already_sent=_already_sent):

View File

@@ -804,12 +804,14 @@ class SessionStore:
except Exception as e:
logger.warning("[Session] Failed to seed thread session: %s", e)
return entry
def update_session(
self,
session_key: str,
last_prompt_tokens: int = None,
input_tokens: int = None,
output_tokens: int = None,
estimated_cost_usd: float = None,
cost_status: str = None,
) -> None:
"""Update lightweight session metadata after an interaction."""
with self._lock:
@@ -820,8 +822,18 @@ class SessionStore:
entry.updated_at = _now()
if last_prompt_tokens is not None:
entry.last_prompt_tokens = last_prompt_tokens
if input_tokens is not None:
entry.input_tokens = input_tokens
if output_tokens is not None:
entry.output_tokens = output_tokens
if estimated_cost_usd is not None:
entry.estimated_cost_usd = estimated_cost_usd
if cost_status is not None:
entry.cost_status = cost_status
self._save()
return entry
def reset_session(self, session_key: str) -> Optional[SessionEntry]:
"""Force reset a session, creating a new session ID."""
db_end_session_id = None