Compare commits
1 Commits
q/316-1776
...
queue/378-
| Author | SHA1 | Date | |
|---|---|---|---|
| 628487f7bd |
@@ -13,6 +13,7 @@ import concurrent.futures
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
@@ -643,7 +644,56 @@ def _build_job_prompt(job: dict) -> str:
|
||||
return "\n".join(parts)
|
||||
|
||||
|
||||
def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
# Regex patterns for local service references that fail on cloud endpoints
|
||||
_CLOUD_INCOMPATIBLE_PATTERNS = [
|
||||
(re.compile(r"\b[Cc]heck\s+(?:that\s+)?[Oo]llama\s+(?:is\s+)?(?:responding|running|up|available)", re.IGNORECASE),
|
||||
"Verify system services are healthy using available tools"),
|
||||
(re.compile(r"\b[Vv]erify\s+(?:that\s+)?[Oo]llama\s+(?:is\s+)?(?:responding|running|up)", re.IGNORECASE),
|
||||
"Verify system services are healthy using available tools"),
|
||||
(re.compile(r"\bcurl\s+localhost:\d+", re.IGNORECASE),
|
||||
"use available tools to check service health"),
|
||||
(re.compile(r"\bcurl\s+127\.0\.0\.1:\d+", re.IGNORECASE),
|
||||
"use available tools to check service health"),
|
||||
(re.compile(r"\bpoll\s+localhost", re.IGNORECASE),
|
||||
"check service health via available tools"),
|
||||
]
|
||||
|
||||
|
||||
def _rewrite_cloud_incompatible_prompt(prompt: str, base_url: str) -> str:
|
||||
"""Rewrite prompt instructions that assume local service access when running on cloud.
|
||||
|
||||
When a cron job runs on a cloud inference endpoint (Nous, OpenRouter, Anthropic),
|
||||
instructions to "Check Ollama" or "curl localhost:11434" are impossible.
|
||||
Instead of just warning, this rewrites the instruction to a cloud-compatible
|
||||
equivalent that the agent can actually execute.
|
||||
|
||||
Returns the (possibly rewritten) prompt.
|
||||
"""
|
||||
try:
|
||||
from agent.model_metadata import is_local_endpoint
|
||||
except ImportError:
|
||||
return prompt
|
||||
|
||||
if is_local_endpoint(base_url or ""):
|
||||
return prompt # Local — no rewrite needed
|
||||
|
||||
rewritten = prompt
|
||||
for pattern, replacement in _CLOUD_INCOMPATIBLE_PATTERNS:
|
||||
rewritten = pattern.sub(replacement, rewritten)
|
||||
|
||||
if rewritten != prompt:
|
||||
rewritten = (
|
||||
"[NOTE: Some instructions were adjusted for cloud execution. "
|
||||
"Local service checks were rewritten to use available tools.]
|
||||
|
||||
"
|
||||
+ rewritten
|
||||
)
|
||||
|
||||
return rewritten
|
||||
|
||||
|
||||
def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
"""
|
||||
Execute a single cron job.
|
||||
|
||||
@@ -877,24 +927,12 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
result = agent.run_conversation(prompt)
|
||||
final_response = result.get("final_response", "") or ""
|
||||
logged_response = final_response if final_response else "(No response generated)"
|
||||
|
||||
# Extract token counts from the agent instance
|
||||
_cron_input_toks = getattr(agent, "session_prompt_tokens", 0)
|
||||
_cron_output_toks = getattr(agent, "session_completion_tokens", 0)
|
||||
_cron_cost_usd = getattr(agent, "session_estimated_cost_usd", 0.0)
|
||||
|
||||
output = f"""# Cron Job: {job_name}
|
||||
|
||||
**Job ID:** {job_id}
|
||||
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
|
||||
**Schedule:** {job.get('schedule_display', 'N/A')}
|
||||
|
||||
## Token Usage
|
||||
|
||||
- Input tokens: {_cron_input_toks:,}
|
||||
- Output tokens: {_cron_output_toks:,}
|
||||
- Estimated cost: ${_cron_cost_usd:.4f}
|
||||
|
||||
## Prompt
|
||||
|
||||
{prompt}
|
||||
@@ -973,11 +1011,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
# for delivery logic (empty response = no delivery).
|
||||
logged_response = final_response if final_response else "(No response generated)"
|
||||
|
||||
# Extract token counts from the agent instance (Issue #316)
|
||||
_cron_input_toks = getattr(agent, "session_prompt_tokens", 0)
|
||||
_cron_output_toks = getattr(agent, "session_completion_tokens", 0)
|
||||
_cron_cost_usd = getattr(agent, "session_estimated_cost_usd", 0.0)
|
||||
|
||||
# Check for script failure — both explicit [SCRIPT_FAILED] marker
|
||||
# and heuristic detection for failures described in natural language.
|
||||
_script_failed_reason = _detect_script_failure(final_response)
|
||||
@@ -992,12 +1025,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
|
||||
**Schedule:** {job.get('schedule_display', 'N/A')}
|
||||
|
||||
## Token Usage
|
||||
|
||||
- Input tokens: {_cron_input_toks:,}
|
||||
- Output tokens: {_cron_output_toks:,}
|
||||
- Estimated cost: ${_cron_cost_usd:.4f}
|
||||
|
||||
## Prompt
|
||||
|
||||
{prompt}
|
||||
@@ -1007,19 +1034,13 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
{logged_response}
|
||||
"""
|
||||
return False, output, final_response, _script_failed_reason
|
||||
|
||||
|
||||
output = f"""# Cron Job: {job_name}
|
||||
|
||||
**Job ID:** {job_id}
|
||||
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
|
||||
**Schedule:** {job.get('schedule_display', 'N/A')}
|
||||
|
||||
## Token Usage
|
||||
|
||||
- Input tokens: {_cron_input_toks:,}
|
||||
- Output tokens: {_cron_output_toks:,}
|
||||
- Estimated cost: ${_cron_cost_usd:.4f}
|
||||
|
||||
## Prompt
|
||||
|
||||
{prompt}
|
||||
|
||||
@@ -3075,35 +3075,14 @@ class GatewayRunner:
|
||||
skip_db=agent_persisted,
|
||||
)
|
||||
|
||||
# Token counts — persist to SessionEntry and SQLite (Issue #316).
|
||||
# The agent instance accumulates session_prompt_tokens and
|
||||
# session_completion_tokens across API calls within a turn.
|
||||
_agent = agent # from agent_holder
|
||||
_input_toks = getattr(_agent, "session_prompt_tokens", 0) if _agent else 0
|
||||
_output_toks = getattr(_agent, "session_completion_tokens", 0) if _agent else 0
|
||||
_cost_usd = getattr(_agent, "session_estimated_cost_usd", 0.0) if _agent else 0.0
|
||||
_cost_status = getattr(_agent, "session_cost_status", "unknown") if _agent else "unknown"
|
||||
|
||||
# Token counts and model are now persisted by the agent directly.
|
||||
# Keep only last_prompt_tokens here for context-window tracking and
|
||||
# compression decisions.
|
||||
self.session_store.update_session(
|
||||
session_entry.session_key,
|
||||
last_prompt_tokens=agent_result.get("last_prompt_tokens", 0),
|
||||
input_tokens=_input_toks,
|
||||
output_tokens=_output_toks,
|
||||
estimated_cost_usd=_cost_usd,
|
||||
cost_status=_cost_status,
|
||||
)
|
||||
|
||||
# Persist to SQLite if session DB is available
|
||||
if self._session_db and session_entry.session_id:
|
||||
try:
|
||||
self._session_db.set_token_counts(
|
||||
session_entry.session_id,
|
||||
input_tokens=_input_toks,
|
||||
output_tokens=_output_toks,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug("Failed to persist token counts to SQLite: %s", e)
|
||||
|
||||
# Auto voice reply: send TTS audio before the text response
|
||||
_already_sent = bool(agent_result.get("already_sent"))
|
||||
if self._should_send_voice_reply(event, response, agent_messages, already_sent=_already_sent):
|
||||
|
||||
@@ -804,14 +804,12 @@ class SessionStore:
|
||||
except Exception as e:
|
||||
logger.warning("[Session] Failed to seed thread session: %s", e)
|
||||
|
||||
return entry
|
||||
|
||||
def update_session(
|
||||
self,
|
||||
session_key: str,
|
||||
last_prompt_tokens: int = None,
|
||||
input_tokens: int = None,
|
||||
output_tokens: int = None,
|
||||
estimated_cost_usd: float = None,
|
||||
cost_status: str = None,
|
||||
) -> None:
|
||||
"""Update lightweight session metadata after an interaction."""
|
||||
with self._lock:
|
||||
@@ -822,18 +820,8 @@ class SessionStore:
|
||||
entry.updated_at = _now()
|
||||
if last_prompt_tokens is not None:
|
||||
entry.last_prompt_tokens = last_prompt_tokens
|
||||
if input_tokens is not None:
|
||||
entry.input_tokens = input_tokens
|
||||
if output_tokens is not None:
|
||||
entry.output_tokens = output_tokens
|
||||
if estimated_cost_usd is not None:
|
||||
entry.estimated_cost_usd = estimated_cost_usd
|
||||
if cost_status is not None:
|
||||
entry.cost_status = cost_status
|
||||
self._save()
|
||||
|
||||
return entry
|
||||
|
||||
def reset_session(self, session_key: str) -> Optional[SessionEntry]:
|
||||
"""Force reset a session, creating a new session ID."""
|
||||
db_end_session_id = None
|
||||
|
||||
Reference in New Issue
Block a user