Compare commits

..

1 Commits

Author SHA1 Message Date
628487f7bd fix(cron): rewrite cloud-incompatible prompt instructions (#378)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m9s
Health Monitor prompts say 'Check Ollama is responding' but run
on cloud models that cannot reach localhost. Instead of just
warning the agent, rewrite the instructions to cloud-compatible
equivalents the agent can actually execute.

Changes:
- Add import re
- Add _CLOUD_INCOMPATIBLE_PATTERNS: regex pairs (pattern, replacement)
- Add _rewrite_cloud_incompatible_prompt(): rewrites localhost/Ollama
  references to 'use available tools to check service health'
- Wire into run_job() after resolve_turn_route()

Closes #378
2026-04-14 01:47:00 +00:00
3 changed files with 57 additions and 69 deletions

View File

@@ -13,6 +13,7 @@ import concurrent.futures
import json
import logging
import os
import re
import subprocess
import sys
@@ -643,7 +644,56 @@ def _build_job_prompt(job: dict) -> str:
return "\n".join(parts)
def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
# Regex patterns for local service references that fail on cloud endpoints
_CLOUD_INCOMPATIBLE_PATTERNS = [
(re.compile(r"\b[Cc]heck\s+(?:that\s+)?[Oo]llama\s+(?:is\s+)?(?:responding|running|up|available)", re.IGNORECASE),
"Verify system services are healthy using available tools"),
(re.compile(r"\b[Vv]erify\s+(?:that\s+)?[Oo]llama\s+(?:is\s+)?(?:responding|running|up)", re.IGNORECASE),
"Verify system services are healthy using available tools"),
(re.compile(r"\bcurl\s+localhost:\d+", re.IGNORECASE),
"use available tools to check service health"),
(re.compile(r"\bcurl\s+127\.0\.0\.1:\d+", re.IGNORECASE),
"use available tools to check service health"),
(re.compile(r"\bpoll\s+localhost", re.IGNORECASE),
"check service health via available tools"),
]
def _rewrite_cloud_incompatible_prompt(prompt: str, base_url: str) -> str:
"""Rewrite prompt instructions that assume local service access when running on cloud.
When a cron job runs on a cloud inference endpoint (Nous, OpenRouter, Anthropic),
instructions to "Check Ollama" or "curl localhost:11434" are impossible.
Instead of just warning, this rewrites the instruction to a cloud-compatible
equivalent that the agent can actually execute.
Returns the (possibly rewritten) prompt.
"""
try:
from agent.model_metadata import is_local_endpoint
except ImportError:
return prompt
if is_local_endpoint(base_url or ""):
return prompt # Local — no rewrite needed
rewritten = prompt
for pattern, replacement in _CLOUD_INCOMPATIBLE_PATTERNS:
rewritten = pattern.sub(replacement, rewritten)
if rewritten != prompt:
rewritten = (
"[NOTE: Some instructions were adjusted for cloud execution. "
"Local service checks were rewritten to use available tools.]
"
+ rewritten
)
return rewritten
def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:(job: dict) -> tuple[bool, str, str, Optional[str]]:
"""
Execute a single cron job.
@@ -877,24 +927,12 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
result = agent.run_conversation(prompt)
final_response = result.get("final_response", "") or ""
logged_response = final_response if final_response else "(No response generated)"
# Extract token counts from the agent instance
_cron_input_toks = getattr(agent, "session_prompt_tokens", 0)
_cron_output_toks = getattr(agent, "session_completion_tokens", 0)
_cron_cost_usd = getattr(agent, "session_estimated_cost_usd", 0.0)
output = f"""# Cron Job: {job_name}
**Job ID:** {job_id}
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
**Schedule:** {job.get('schedule_display', 'N/A')}
## Token Usage
- Input tokens: {_cron_input_toks:,}
- Output tokens: {_cron_output_toks:,}
- Estimated cost: ${_cron_cost_usd:.4f}
## Prompt
{prompt}
@@ -973,11 +1011,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
# for delivery logic (empty response = no delivery).
logged_response = final_response if final_response else "(No response generated)"
# Extract token counts from the agent instance (Issue #316)
_cron_input_toks = getattr(agent, "session_prompt_tokens", 0)
_cron_output_toks = getattr(agent, "session_completion_tokens", 0)
_cron_cost_usd = getattr(agent, "session_estimated_cost_usd", 0.0)
# Check for script failure — both explicit [SCRIPT_FAILED] marker
# and heuristic detection for failures described in natural language.
_script_failed_reason = _detect_script_failure(final_response)
@@ -992,12 +1025,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
**Schedule:** {job.get('schedule_display', 'N/A')}
## Token Usage
- Input tokens: {_cron_input_toks:,}
- Output tokens: {_cron_output_toks:,}
- Estimated cost: ${_cron_cost_usd:.4f}
## Prompt
{prompt}
@@ -1007,19 +1034,13 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
{logged_response}
"""
return False, output, final_response, _script_failed_reason
output = f"""# Cron Job: {job_name}
**Job ID:** {job_id}
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
**Schedule:** {job.get('schedule_display', 'N/A')}
## Token Usage
- Input tokens: {_cron_input_toks:,}
- Output tokens: {_cron_output_toks:,}
- Estimated cost: ${_cron_cost_usd:.4f}
## Prompt
{prompt}

View File

@@ -3075,35 +3075,14 @@ class GatewayRunner:
skip_db=agent_persisted,
)
# Token counts — persist to SessionEntry and SQLite (Issue #316).
# The agent instance accumulates session_prompt_tokens and
# session_completion_tokens across API calls within a turn.
_agent = agent # from agent_holder
_input_toks = getattr(_agent, "session_prompt_tokens", 0) if _agent else 0
_output_toks = getattr(_agent, "session_completion_tokens", 0) if _agent else 0
_cost_usd = getattr(_agent, "session_estimated_cost_usd", 0.0) if _agent else 0.0
_cost_status = getattr(_agent, "session_cost_status", "unknown") if _agent else "unknown"
# Token counts and model are now persisted by the agent directly.
# Keep only last_prompt_tokens here for context-window tracking and
# compression decisions.
self.session_store.update_session(
session_entry.session_key,
last_prompt_tokens=agent_result.get("last_prompt_tokens", 0),
input_tokens=_input_toks,
output_tokens=_output_toks,
estimated_cost_usd=_cost_usd,
cost_status=_cost_status,
)
# Persist to SQLite if session DB is available
if self._session_db and session_entry.session_id:
try:
self._session_db.set_token_counts(
session_entry.session_id,
input_tokens=_input_toks,
output_tokens=_output_toks,
)
except Exception as e:
logger.debug("Failed to persist token counts to SQLite: %s", e)
# Auto voice reply: send TTS audio before the text response
_already_sent = bool(agent_result.get("already_sent"))
if self._should_send_voice_reply(event, response, agent_messages, already_sent=_already_sent):

View File

@@ -804,14 +804,12 @@ class SessionStore:
except Exception as e:
logger.warning("[Session] Failed to seed thread session: %s", e)
return entry
def update_session(
self,
session_key: str,
last_prompt_tokens: int = None,
input_tokens: int = None,
output_tokens: int = None,
estimated_cost_usd: float = None,
cost_status: str = None,
) -> None:
"""Update lightweight session metadata after an interaction."""
with self._lock:
@@ -822,18 +820,8 @@ class SessionStore:
entry.updated_at = _now()
if last_prompt_tokens is not None:
entry.last_prompt_tokens = last_prompt_tokens
if input_tokens is not None:
entry.input_tokens = input_tokens
if output_tokens is not None:
entry.output_tokens = output_tokens
if estimated_cost_usd is not None:
entry.estimated_cost_usd = estimated_cost_usd
if cost_status is not None:
entry.cost_status = cost_status
self._save()
return entry
def reset_session(self, session_key: str) -> Optional[SessionEntry]:
"""Force reset a session, creating a new session ID."""
db_end_session_id = None