Compare commits

..

1 Commits

Author SHA1 Message Date
Timmy
84497c6a9f fix(cron): inject cloud-context warning when prompt refs localhost (#468)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m22s
When a cron job runs on a cloud endpoint but its prompt references
local services (Ollama, localhost, etc.), inject a [SYSTEM NOTE]
warning so the agent reports the limitation instead of wasting
iterations on doomed connections.

Fixes #378, Closes #456.

## Changes
- cron/scheduler.py: Added import re, _LOCAL_SERVICE_PATTERNS (12 patterns),
  _detect_local_service_refs(), _inject_cloud_context(). Injection in run_job()
  after cloud endpoint detection.
- tests/cron/test_cron_cloud_context.py: 19 tests

## Detection patterns
localhost:PORT, 127.x.x.x:PORT, 0.0.0.0:PORT, ollama, curl localhost,
wget localhost, http://localhost, https://127.x, check ollama,
connect local, hermes gateway local

Closes #468.
2026-04-13 21:33:21 -04:00
6 changed files with 204 additions and 218 deletions

14
cli.py
View File

@@ -4123,19 +4123,7 @@ class HermesCLI:
print(f" Skills: {', '.join(job['skills'])}")
print(f" Prompt: {job.get('prompt_preview', '')}")
if job.get("last_run_at"):
status = job.get('last_status', '?')
if status == "error" and job.get("last_error"):
# Show error with staleness hint
last_success = job.get("last_success_at", "")
if last_success and last_success > job.get("last_error_at", ""):
print(f" Last run: {job['last_run_at']} (recovered, last error was before {last_success})")
else:
err_preview = job['last_error'][:80]
print(f" Last run: {job['last_run_at']} (error: {err_preview}...)")
elif status == "retrying":
print(f" Last run: {job['last_run_at']} (retrying...)")
else:
print(f" Last run: {job['last_run_at']} ({status})")
print(f" Last run: {job['last_run_at']} ({job.get('last_status', '?')})")
print()
return

View File

@@ -528,11 +528,7 @@ def pause_job(job_id: str, reason: Optional[str] = None) -> Optional[Dict[str, A
def resume_job(job_id: str) -> Optional[Dict[str, Any]]:
"""Resume a paused job and compute the next future run from now.
Clears stale error state so the job's health reflects the upcoming
run rather than a previous failure that may have caused the pause.
"""
"""Resume a paused job and compute the next future run from now."""
job = get_job(job_id)
if not job:
return None
@@ -546,19 +542,12 @@ def resume_job(job_id: str) -> Optional[Dict[str, Any]]:
"paused_at": None,
"paused_reason": None,
"next_run_at": next_run_at,
# Clear stale error — resuming is an explicit retry signal.
"last_error": None,
"last_status": "retrying" if job.get("last_status") == "error" else job.get("last_status"),
},
)
def trigger_job(job_id: str) -> Optional[Dict[str, Any]]:
"""Schedule a job to run on the next scheduler tick.
Clears stale error state so the job's health reflects the upcoming
re-run rather than the previous failure.
"""
"""Schedule a job to run on the next scheduler tick."""
job = get_job(job_id)
if not job:
return None
@@ -570,10 +559,6 @@ def trigger_job(job_id: str) -> Optional[Dict[str, Any]]:
"paused_at": None,
"paused_reason": None,
"next_run_at": _hermes_now().isoformat(),
# Clear stale error — the job is being retried.
# The next run will set last_error on failure or None on success.
"last_error": None,
"last_status": "retrying" if job.get("last_status") == "error" else job.get("last_status"),
},
)
@@ -633,8 +618,6 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None):
Updates last_run_at, last_status, increments completed count,
computes next_run_at, and auto-deletes if repeat limit reached.
Also tracks last_error_at and last_success_at timestamps so callers
can distinguish current health from historical failure residue.
"""
jobs = load_jobs()
for i, job in enumerate(jobs):
@@ -644,12 +627,6 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None):
job["last_status"] = "ok" if success else "error"
job["last_error"] = error if not success else None
# Track timestamps for health distinction
if success:
job["last_success_at"] = now
else:
job["last_error_at"] = now
# Increment completed count
if job.get("repeat"):
job["repeat"]["completed"] = job["repeat"].get("completed", 0) + 1

View File

@@ -13,6 +13,7 @@ import concurrent.futures
import json
import logging
import os
import re
import subprocess
import sys
@@ -156,6 +157,27 @@ _KNOWN_DELIVERY_PLATFORMS = frozenset({
from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run
# ---------------------------------------------------------------------------
# Model context guard
# ---------------------------------------------------------------------------
CRON_MIN_CONTEXT_TOKENS = 4096
class ModelContextError(ValueError):
"""Raised when a job's model has insufficient context for cron execution."""
pass
def _check_model_context_compat(model: str, context_length: int) -> None:
"""Raise ModelContextError if the model context is below the cron minimum."""
if context_length < CRON_MIN_CONTEXT_TOKENS:
raise ModelContextError(
f"Model '{model}' context ({context_length} tokens) is below the "
f"minimum {CRON_MIN_CONTEXT_TOKENS} tokens required for cron jobs."
)
# Sentinel: when a cron agent has nothing new to report, it can start its
# response with this marker to suppress delivery. Output is still saved
# locally for audit.
@@ -544,6 +566,55 @@ def _run_job_script(script_path: str) -> tuple[bool, str]:
return False, f"Script execution failed: {exc}"
# ---------------------------------------------------------------------------
# Cloud context warning — detect local service refs in cloud cron prompts
# ---------------------------------------------------------------------------
_LOCAL_SERVICE_PATTERNS = [
r'localhost:\d{2,5}',
r'127\.0\.0\.\d{1,3}:\d{2,5}',
r'0\.0\.0\.0:\d{2,5}',
r'\bollama\b',
r'curl\s+.*localhost',
r'wget\s+.*localhost',
r'http://localhost',
r'https?://127\.',
r'https?://0\.0\.0\.0',
r'check.*ollama',
r'connect.*local',
r'hermes.*gateway.*local',
]
_LOCAL_SERVICE_RE = [re.compile(p, re.IGNORECASE) for p in _LOCAL_SERVICE_PATTERNS]
def _detect_local_service_refs(prompt: str) -> list[str]:
"""Scan a prompt for references to local services (Ollama, localhost, etc.).
Returns list of matched patterns for logging.
"""
matches = []
for pattern_re in _LOCAL_SERVICE_RE:
if pattern_re.search(prompt):
matches.append(pattern_re.pattern)
return matches
def _inject_cloud_context(prompt: str, local_refs: list[str]) -> str:
"""Prepend a warning when cron runs on cloud but prompt refs local services.
The agent reports the limitation instead of wasting iterations on doomed connections.
"""
warning = (
"[SYSTEM NOTE: You are running on a cloud endpoint, but your prompt references "
"local services (localhost/Ollama). You cannot reach localhost from a cloud "
"endpoint. Report this limitation to the user and suggest running the job on "
"a local endpoint instead. Do NOT attempt to connect to localhost — it will "
"timeout and waste your iteration budget.]\n\n"
)
return warning + prompt
def _build_job_prompt(job: dict) -> str:
"""Build the effective prompt for a cron job, optionally loading one or more skills first."""
prompt = job.get("prompt", "")
@@ -762,6 +833,16 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
message = format_runtime_provider_error(exc)
raise RuntimeError(message) from exc
# Cloud context warning: if running on cloud but prompt refs local services,
# inject a warning so the agent reports the limitation instead of wasting
# iterations on doomed connections. (Fixes #378, #456)
base_url = runtime.get("base_url") or ""
is_cloud = not any(h in base_url for h in ("localhost", "127.0.0.1", "0.0.0.0", "::1"))
local_refs = _detect_local_service_refs(prompt)
if is_cloud and local_refs:
logger.info("Job '%s': cloud endpoint + local service refs detected, injecting warning", job_name)
prompt = _inject_cloud_context(prompt, local_refs)
from agent.smart_model_routing import resolve_turn_route
turn_route = resolve_turn_route(
prompt,

View File

@@ -0,0 +1,120 @@
"""Tests for cron cloud context warning injection (fix #378, #456).
When a cron job runs on a cloud endpoint but its prompt references local
services (Ollama, localhost, etc.), inject a warning so the agent reports
the limitation instead of wasting iterations on doomed connections.
"""
import pytest
from cron.scheduler import (
_detect_local_service_refs,
_inject_cloud_context,
_LOCAL_SERVICE_PATTERNS,
)
# ---------------------------------------------------------------------------
# Pattern detection
# ---------------------------------------------------------------------------
class TestDetectLocalServiceRefs:
def test_localhost_with_port(self):
refs = _detect_local_service_refs("Check http://localhost:8080/status")
assert len(refs) > 0
assert any("localhost" in r for r in refs)
def test_127_address(self):
refs = _detect_local_service_refs("Connect to 127.0.0.1:11434")
assert len(refs) > 0
def test_ollama_reference(self):
refs = _detect_local_service_refs("Run this on Ollama with gemma3")
assert len(refs) > 0
assert any("ollama" in r.lower() for r in refs)
def test_curl_localhost(self):
refs = _detect_local_service_refs("curl localhost:3000/api/data")
assert len(refs) > 0
def test_wget_localhost(self):
refs = _detect_local_service_refs("wget http://localhost/file.txt")
assert len(refs) > 0
def test_http_localhost(self):
refs = _detect_local_service_refs("http://localhost:8642/health")
assert len(refs) > 0
def test_https_127(self):
refs = _detect_local_service_refs("https://127.0.0.1:443/secure")
assert len(refs) > 0
def test_0000_address(self):
refs = _detect_local_service_refs("Bind to 0.0.0.0:9090")
assert len(refs) > 0
def test_no_match_for_remote(self):
refs = _detect_local_service_refs("Check https://api.openai.com/v1/models")
assert len(refs) == 0
def test_no_match_for_gitea(self):
refs = _detect_local_service_refs("Query forge.alexanderwhitestone.com for issues")
assert len(refs) == 0
def test_no_match_empty(self):
refs = _detect_local_service_refs("")
assert len(refs) == 0
def test_check_ollama_phrase(self):
refs = _detect_local_service_refs("First check Ollama is running")
assert len(refs) > 0
def test_connect_local_phrase(self):
refs = _detect_local_service_refs("Connect to local Ollama server")
assert len(refs) > 0
# ---------------------------------------------------------------------------
# Warning injection
# ---------------------------------------------------------------------------
class TestInjectCloudContext:
def test_prepends_warning(self):
original = "Run a health check on localhost:8080"
refs = _detect_local_service_refs(original)
result = _inject_cloud_context(original, refs)
assert "SYSTEM NOTE" in result
assert "cloud endpoint" in result
assert original in result
def test_warning_is_first(self):
original = "Check localhost:11434"
refs = _detect_local_service_refs(original)
result = _inject_cloud_context(original, refs)
assert result.startswith("[SYSTEM NOTE")
def test_preserves_original_prompt(self):
original = "Do something with Ollama and then report results"
refs = _detect_local_service_refs(original)
result = _inject_cloud_context(original, refs)
assert "Do something with Ollama" in result
def test_mentions_cannot_reach(self):
original = "curl localhost:8080"
refs = _detect_local_service_refs(original)
result = _inject_cloud_context(original, refs)
assert "cannot reach" in result.lower() or "cannot" in result.lower()
# ---------------------------------------------------------------------------
# Pattern coverage
# ---------------------------------------------------------------------------
class TestPatternCoverage:
def test_at_least_10_patterns(self):
assert len(_LOCAL_SERVICE_PATTERNS) >= 10
def test_patterns_are_strings(self):
for p in _LOCAL_SERVICE_PATTERNS:
assert isinstance(p, str)
assert len(p) > 0

View File

@@ -1,177 +0,0 @@
"""Tests for cron job stale error state clearing (#349)."""
import json
import os
import sys
import tempfile
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
# Ensure cron.jobs can be imported without triggering cron/__init__.py
# which may have additional imports that break in test isolation.
sys.modules.setdefault("cron.scheduler", MagicMock())
@pytest.fixture()
def isolated_cron_dir(tmp_path, monkeypatch):
"""Point CRON_DIR at a temp directory for test isolation."""
cron_dir = tmp_path / "cron"
cron_dir.mkdir()
output_dir = cron_dir / "output"
output_dir.mkdir()
monkeypatch.setattr("cron.jobs.CRON_DIR", cron_dir)
monkeypatch.setattr("cron.jobs.JOBS_FILE", cron_dir / "jobs.json")
monkeypatch.setattr("cron.jobs.OUTPUT_DIR", output_dir)
return cron_dir
def _make_job(cron_dir, job_id="test-job-1", last_status="error", last_error="auth revoked"):
"""Write a minimal jobs.json with one job."""
from hermes_time import now as _hermes_now
job = {
"id": job_id,
"name": "Test Job",
"prompt": "test prompt",
"schedule": {"kind": "interval", "minutes": 10, "display": "every 10m"},
"enabled": True,
"state": "scheduled",
"last_run_at": _hermes_now().isoformat(),
"last_status": last_status,
"last_error": last_error,
"last_error_at": _hermes_now().isoformat(),
"next_run_at": _hermes_now().isoformat(),
}
jobs_file = cron_dir / "jobs.json"
jobs_file.write_text(json.dumps({"jobs": [job]}))
return job
# ---------------------------------------------------------------------------
# trigger_job clears stale error
# ---------------------------------------------------------------------------
class TestTriggerJobClearsError:
def test_trigger_clears_last_error(self, isolated_cron_dir):
from cron.jobs import trigger_job, get_job
_make_job(isolated_cron_dir, last_status="error", last_error="Refresh session revoked")
result = trigger_job("test-job-1")
assert result is not None
job = get_job("test-job-1")
assert job["last_error"] is None
assert job["last_status"] == "retrying"
def test_trigger_preserves_ok_status(self, isolated_cron_dir):
from cron.jobs import trigger_job, get_job
_make_job(isolated_cron_dir, last_status="ok", last_error=None)
result = trigger_job("test-job-1")
assert result is not None
job = get_job("test-job-1")
assert job["last_error"] is None
assert job["last_status"] == "ok" # stays ok, not retrying
def test_trigger_nonexistent_returns_none(self, isolated_cron_dir):
from cron.jobs import trigger_job
result = trigger_job("nonexistent")
assert result is None
# ---------------------------------------------------------------------------
# resume_job also clears stale error
# ---------------------------------------------------------------------------
class TestResumeJobClearsError:
def test_resume_clears_last_error(self, isolated_cron_dir):
from cron.jobs import resume_job, get_job, pause_job
_make_job(isolated_cron_dir, last_status="error", last_error="auth revoked")
pause_job("test-job-1", reason="auth issue")
result = resume_job("test-job-1")
assert result is not None
job = get_job("test-job-1")
assert job["last_error"] is None
assert job["last_status"] == "retrying"
assert job["state"] == "scheduled"
# ---------------------------------------------------------------------------
# mark_job_run tracks timestamps
# ---------------------------------------------------------------------------
class TestMarkJobRunTimestamps:
def test_success_sets_last_success_at(self, isolated_cron_dir):
from cron.jobs import mark_job_run, get_job
_make_job(isolated_cron_dir)
mark_job_run("test-job-1", success=True, error=None)
job = get_job("test-job-1")
assert job["last_status"] == "ok"
assert job["last_error"] is None
assert job.get("last_success_at") is not None
def test_error_sets_last_error_at(self, isolated_cron_dir):
from cron.jobs import mark_job_run, get_job
_make_job(isolated_cron_dir, last_status="ok", last_error=None)
mark_job_run("test-job-1", success=False, error="new error")
job = get_job("test-job-1")
assert job["last_status"] == "error"
assert job["last_error"] == "new error"
assert job.get("last_error_at") is not None
def test_success_clears_error(self, isolated_cron_dir):
from cron.jobs import mark_job_run, get_job
_make_job(isolated_cron_dir, last_status="error", last_error="old error")
mark_job_run("test-job-1", success=True, error=None)
job = get_job("test-job-1")
assert job["last_status"] == "ok"
assert job["last_error"] is None
assert job.get("last_success_at") is not None
# ---------------------------------------------------------------------------
# Health distinction: error vs recovered
# ---------------------------------------------------------------------------
class TestHealthDistinction:
def test_recovered_job_shows_success_after_error(self, isolated_cron_dir):
from cron.jobs import mark_job_run, get_job
from hermes_time import now as _hermes_now
_make_job(isolated_cron_dir, last_status="error", last_error="auth revoked")
# Simulate recovery
mark_job_run("test-job-1", success=True, error=None)
job = get_job("test-job-1")
assert job["last_status"] == "ok"
assert job["last_error"] is None
# last_success_at should be after last_error_at
assert job["last_success_at"] >= job.get("last_error_at", "")
# ---------------------------------------------------------------------------
# _format_job includes health fields
# ---------------------------------------------------------------------------
class TestFormatJobHealth:
def test_format_includes_health_fields(self):
from tools.cronjob_tools import _format_job
job = {
"id": "j1",
"name": "Test",
"prompt": "hello",
"schedule_display": "every 10m",
"last_status": "error",
"last_error": "revoked",
"last_error_at": "2026-04-13T10:00:00",
"last_success_at": "2026-04-13T09:00:00",
}
result = _format_job(job)
assert result["last_error"] == "revoked"
assert result["last_error_at"] == "2026-04-13T10:00:00"
assert result["last_success_at"] == "2026-04-13T09:00:00"

View File

@@ -196,9 +196,6 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]:
"next_run_at": job.get("next_run_at"),
"last_run_at": job.get("last_run_at"),
"last_status": job.get("last_status"),
"last_error": job.get("last_error"),
"last_error_at": job.get("last_error_at"),
"last_success_at": job.get("last_success_at"),
"enabled": job.get("enabled", True),
"state": job.get("state", "scheduled" if job.get("enabled", True) else "paused"),
"paused_at": job.get("paused_at"),