Compare commits

..

5 Commits

Author SHA1 Message Date
379769ca6d feat(cron): Show health status in job list
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m0s
Part of #349. Shows current vs. cleared errors, success history.
2026-04-14 00:19:11 +00:00
91bc02bc38 feat(cron): Add clear-error CLI subparser
Part of #349. Adds `hermes cron clear-error JOB_ID` command.
2026-04-14 00:18:52 +00:00
77265a31e1 feat(cron): Add clear-error CLI command
Part of #349. Adds `hermes cron clear-error JOB_ID` command.
2026-04-14 00:18:30 +00:00
cf36bd2ddf feat(cron): Add clear_error action and health timestamps
Part of #349. Adds clear_error action and includes health timestamps in job format.
2026-04-14 00:18:09 +00:00
0413fc1788 feat(cron): Comprehensive stale error state handling
- mark_job_run: track last_error_at, last_success_at, error_resolved_at
- trigger_job: clear stale error state when re-triggering
- clear_job_error: manual clearing of stale errors

Closes #349
2026-04-14 00:17:45 +00:00
6 changed files with 131 additions and 214 deletions

View File

@@ -547,20 +547,30 @@ def resume_job(job_id: str) -> Optional[Dict[str, Any]]:
def trigger_job(job_id: str) -> Optional[Dict[str, Any]]:
"""Schedule a job to run on the next scheduler tick."""
"""Schedule a job to run on the next scheduler tick.
Clears stale error state when re-triggering a previously-failed job
so the stale failure doesn't persist until the next tick completes.
"""
job = get_job(job_id)
if not job:
return None
return update_job(
job_id,
{
"enabled": True,
"state": "scheduled",
"paused_at": None,
"paused_reason": None,
"next_run_at": _hermes_now().isoformat(),
},
)
updates = {
"enabled": True,
"state": "scheduled",
"paused_at": None,
"paused_reason": None,
"next_run_at": _hermes_now().isoformat(),
}
# Clear stale error state when re-triggering
if job.get("last_status") == "error":
updates["last_status"] = "retrying"
updates["last_error"] = None
updates["error_cleared_at"] = _hermes_now().isoformat()
return update_job(job_id, updates)
def run_job_now(job_id: str) -> Optional[Dict[str, Any]]:
@@ -618,6 +628,7 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None):
Updates last_run_at, last_status, increments completed count,
computes next_run_at, and auto-deletes if repeat limit reached.
Tracks health timestamps for error/success history.
"""
jobs = load_jobs()
for i, job in enumerate(jobs):
@@ -627,6 +638,18 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None):
job["last_status"] = "ok" if success else "error"
job["last_error"] = error if not success else None
# Track health timestamps
if success:
job["last_success_at"] = now
# Clear stale error tracking on success
if job.get("last_error_at"):
job["error_resolved_at"] = now
else:
job["last_error_at"] = now
# Clear resolved tracking on new error
if job.get("error_resolved_at"):
del job["error_resolved_at"]
# Increment completed count
if job.get("repeat"):
job["repeat"]["completed"] = job["repeat"].get("completed", 0) + 1
@@ -656,6 +679,32 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None):
save_jobs(jobs)
def clear_job_error(job_id: str) -> Optional[Dict[str, Any]]:
"""
Clear stale error state for a job.
Resets last_status to 'ok', last_error to None, and
records when the error was cleared. Useful after auth
recovery when the job itself is healthy but stale error
state persists.
Returns:
Updated job dict, or None if not found.
"""
jobs = load_jobs()
for job in jobs:
if job["id"] == job_id:
job["last_status"] = "ok"
job["last_error"] = None
job["error_cleared_at"] = _hermes_now().isoformat()
save_jobs(jobs)
return job
save_jobs(jobs)
return None
def advance_next_run(job_id: str) -> bool:
"""Preemptively advance next_run_at for a recurring job before execution.

View File

@@ -13,7 +13,6 @@ import concurrent.futures
import json
import logging
import os
import re
import subprocess
import sys
@@ -157,27 +156,6 @@ _KNOWN_DELIVERY_PLATFORMS = frozenset({
from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run
# ---------------------------------------------------------------------------
# Model context guard
# ---------------------------------------------------------------------------
CRON_MIN_CONTEXT_TOKENS = 4096
class ModelContextError(ValueError):
"""Raised when a job's model has insufficient context for cron execution."""
pass
def _check_model_context_compat(model: str, context_length: int) -> None:
"""Raise ModelContextError if the model context is below the cron minimum."""
if context_length < CRON_MIN_CONTEXT_TOKENS:
raise ModelContextError(
f"Model '{model}' context ({context_length} tokens) is below the "
f"minimum {CRON_MIN_CONTEXT_TOKENS} tokens required for cron jobs."
)
# Sentinel: when a cron agent has nothing new to report, it can start its
# response with this marker to suppress delivery. Output is still saved
# locally for audit.
@@ -566,55 +544,6 @@ def _run_job_script(script_path: str) -> tuple[bool, str]:
return False, f"Script execution failed: {exc}"
# ---------------------------------------------------------------------------
# Cloud context warning — detect local service refs in cloud cron prompts
# ---------------------------------------------------------------------------
_LOCAL_SERVICE_PATTERNS = [
r'localhost:\d{2,5}',
r'127\.0\.0\.\d{1,3}:\d{2,5}',
r'0\.0\.0\.0:\d{2,5}',
r'\bollama\b',
r'curl\s+.*localhost',
r'wget\s+.*localhost',
r'http://localhost',
r'https?://127\.',
r'https?://0\.0\.0\.0',
r'check.*ollama',
r'connect.*local',
r'hermes.*gateway.*local',
]
_LOCAL_SERVICE_RE = [re.compile(p, re.IGNORECASE) for p in _LOCAL_SERVICE_PATTERNS]
def _detect_local_service_refs(prompt: str) -> list[str]:
"""Scan a prompt for references to local services (Ollama, localhost, etc.).
Returns list of matched patterns for logging.
"""
matches = []
for pattern_re in _LOCAL_SERVICE_RE:
if pattern_re.search(prompt):
matches.append(pattern_re.pattern)
return matches
def _inject_cloud_context(prompt: str, local_refs: list[str]) -> str:
"""Prepend a warning when cron runs on cloud but prompt refs local services.
The agent reports the limitation instead of wasting iterations on doomed connections.
"""
warning = (
"[SYSTEM NOTE: You are running on a cloud endpoint, but your prompt references "
"local services (localhost/Ollama). You cannot reach localhost from a cloud "
"endpoint. Report this limitation to the user and suggest running the job on "
"a local endpoint instead. Do NOT attempt to connect to localhost — it will "
"timeout and waste your iteration budget.]\n\n"
)
return warning + prompt
def _build_job_prompt(job: dict) -> str:
"""Build the effective prompt for a cron job, optionally loading one or more skills first."""
prompt = job.get("prompt", "")
@@ -833,16 +762,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
message = format_runtime_provider_error(exc)
raise RuntimeError(message) from exc
# Cloud context warning: if running on cloud but prompt refs local services,
# inject a warning so the agent reports the limitation instead of wasting
# iterations on doomed connections. (Fixes #378, #456)
base_url = runtime.get("base_url") or ""
is_cloud = not any(h in base_url for h in ("localhost", "127.0.0.1", "0.0.0.0", "::1"))
local_refs = _detect_local_service_refs(prompt)
if is_cloud and local_refs:
logger.info("Job '%s': cloud endpoint + local service refs detected, injecting warning", job_name)
prompt = _inject_cloud_context(prompt, local_refs)
from agent.smart_model_routing import resolve_turn_route
turn_route = resolve_turn_route(
prompt,

View File

@@ -93,6 +93,39 @@ def cron_list(show_all: bool = False):
script = job.get("script")
if script:
print(f" Script: {script}")
# Show health status
last_status = job.get("last_status")
last_error = job.get("last_error")
last_error_at = job.get("last_error_at")
last_success_at = job.get("last_success_at")
error_cleared_at = job.get("error_cleared_at")
error_resolved_at = job.get("error_resolved_at")
if last_status == "error" and last_error:
if error_cleared_at or error_resolved_at:
# Error was cleared/resolved
cleared_time = error_cleared_at or error_resolved_at
print(color(f" Status: ok (error cleared)", Colors.GREEN))
print(color(f" Last error: {last_error[:80]}...", Colors.DIM))
print(color(f" Resolved: {cleared_time}", Colors.DIM))
else:
# Current error
print(color(f" Status: ERROR", Colors.RED))
print(color(f" Error: {last_error[:80]}...", Colors.RED))
if last_error_at:
print(color(f" Since: {last_error_at}", Colors.RED))
elif last_status == "retrying":
print(color(f" Status: retrying (error cleared)", Colors.YELLOW))
elif last_status == "ok":
if last_success_at:
print(color(f" Status: ok (last success: {last_success_at})", Colors.GREEN))
elif last_status:
print(f" Status: {last_status}")
# Show success history if available
if last_success_at and last_status != "error":
print(f" Last ok: {last_success_at}")
print()
from hermes_cli.gateway import find_gateway_pids
@@ -222,7 +255,18 @@ def cron_edit(args):
def _job_action(action: str, job_id: str, success_verb: str, now: bool = False) -> int:
if action == "run" and now:
if action == "clear_error":
result = _cron_api(action="clear_error", job_id=job_id)
if not result.get("success"):
print(color(f"Failed to clear error: {result.get('error', 'unknown error')}", Colors.RED))
return 1
job = result.get("job", {})
name = job.get("name", job_id)
print(color(f"Cleared stale error state for job '{name}'", Colors.GREEN))
if job.get("error_cleared_at"):
print(f" Cleared at: {job['error_cleared_at']}")
return 0
if action == "run" and now:
# Synchronous execution — run job immediately and show result
result = _cron_api(action="run_now", job_id=job_id)
if not result.get("success"):
@@ -292,9 +336,13 @@ def cron_command(args):
now = getattr(args, 'now', False)
return _job_action("run", args.job_id, "Triggered", now=now)
if subcmd == "clear-error":
return _job_action("clear_error", args.job_id, "Cleared")
if subcmd in {"remove", "rm", "delete"}:
return _job_action("remove", args.job_id, "Removed")
print(f"Unknown cron command: {subcmd}")
print("Usage: hermes cron [list|create|edit|pause|resume|run|remove|status|tick]")
print("Usage: hermes cron [list|create|edit|pause|resume|run|remove|clear-error|status|tick]")
sys.exit(1)

View File

@@ -4576,6 +4576,9 @@ For more help on a command:
cron_run.add_argument("job_id", help="Job ID to trigger")
cron_run.add_argument("--now", action="store_true", help="Execute immediately and wait for result (clears stale errors)")
cron_clear_error = cron_subparsers.add_parser("clear-error", help="Clear stale error state for a job")
cron_clear_error.add_argument("job_id", help="Job ID to clear error for")
cron_remove = cron_subparsers.add_parser("remove", aliases=["rm", "delete"], help="Remove a scheduled job")
cron_remove.add_argument("job_id", help="Job ID to remove")

View File

@@ -1,120 +0,0 @@
"""Tests for cron cloud context warning injection (fix #378, #456).
When a cron job runs on a cloud endpoint but its prompt references local
services (Ollama, localhost, etc.), inject a warning so the agent reports
the limitation instead of wasting iterations on doomed connections.
"""
import pytest
from cron.scheduler import (
_detect_local_service_refs,
_inject_cloud_context,
_LOCAL_SERVICE_PATTERNS,
)
# ---------------------------------------------------------------------------
# Pattern detection
# ---------------------------------------------------------------------------
class TestDetectLocalServiceRefs:
def test_localhost_with_port(self):
refs = _detect_local_service_refs("Check http://localhost:8080/status")
assert len(refs) > 0
assert any("localhost" in r for r in refs)
def test_127_address(self):
refs = _detect_local_service_refs("Connect to 127.0.0.1:11434")
assert len(refs) > 0
def test_ollama_reference(self):
refs = _detect_local_service_refs("Run this on Ollama with gemma3")
assert len(refs) > 0
assert any("ollama" in r.lower() for r in refs)
def test_curl_localhost(self):
refs = _detect_local_service_refs("curl localhost:3000/api/data")
assert len(refs) > 0
def test_wget_localhost(self):
refs = _detect_local_service_refs("wget http://localhost/file.txt")
assert len(refs) > 0
def test_http_localhost(self):
refs = _detect_local_service_refs("http://localhost:8642/health")
assert len(refs) > 0
def test_https_127(self):
refs = _detect_local_service_refs("https://127.0.0.1:443/secure")
assert len(refs) > 0
def test_0000_address(self):
refs = _detect_local_service_refs("Bind to 0.0.0.0:9090")
assert len(refs) > 0
def test_no_match_for_remote(self):
refs = _detect_local_service_refs("Check https://api.openai.com/v1/models")
assert len(refs) == 0
def test_no_match_for_gitea(self):
refs = _detect_local_service_refs("Query forge.alexanderwhitestone.com for issues")
assert len(refs) == 0
def test_no_match_empty(self):
refs = _detect_local_service_refs("")
assert len(refs) == 0
def test_check_ollama_phrase(self):
refs = _detect_local_service_refs("First check Ollama is running")
assert len(refs) > 0
def test_connect_local_phrase(self):
refs = _detect_local_service_refs("Connect to local Ollama server")
assert len(refs) > 0
# ---------------------------------------------------------------------------
# Warning injection
# ---------------------------------------------------------------------------
class TestInjectCloudContext:
def test_prepends_warning(self):
original = "Run a health check on localhost:8080"
refs = _detect_local_service_refs(original)
result = _inject_cloud_context(original, refs)
assert "SYSTEM NOTE" in result
assert "cloud endpoint" in result
assert original in result
def test_warning_is_first(self):
original = "Check localhost:11434"
refs = _detect_local_service_refs(original)
result = _inject_cloud_context(original, refs)
assert result.startswith("[SYSTEM NOTE")
def test_preserves_original_prompt(self):
original = "Do something with Ollama and then report results"
refs = _detect_local_service_refs(original)
result = _inject_cloud_context(original, refs)
assert "Do something with Ollama" in result
def test_mentions_cannot_reach(self):
original = "curl localhost:8080"
refs = _detect_local_service_refs(original)
result = _inject_cloud_context(original, refs)
assert "cannot reach" in result.lower() or "cannot" in result.lower()
# ---------------------------------------------------------------------------
# Pattern coverage
# ---------------------------------------------------------------------------
class TestPatternCoverage:
def test_at_least_10_patterns(self):
assert len(_LOCAL_SERVICE_PATTERNS) >= 10
def test_patterns_are_strings(self):
for p in _LOCAL_SERVICE_PATTERNS:
assert isinstance(p, str)
assert len(p) > 0

View File

@@ -201,6 +201,17 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]:
"paused_at": job.get("paused_at"),
"paused_reason": job.get("paused_reason"),
}
# Health timestamps
if job.get("last_error_at"):
result["last_error_at"] = job["last_error_at"]
if job.get("last_success_at"):
result["last_success_at"] = job["last_success_at"]
if job.get("error_resolved_at"):
result["error_resolved_at"] = job["error_resolved_at"]
if job.get("error_cleared_at"):
result["error_cleared_at"] = job["error_cleared_at"]
if job.get("script"):
result["script"] = job["script"]
return result
@@ -326,6 +337,13 @@ def cronjob(
if result is None:
return json.dumps({"success": False, "error": "Job not found"}, indent=2)
return json.dumps(result, indent=2)
if normalized == "clear_error":
from cron.jobs import clear_job_error
job = clear_job_error(job_id)
if job is None:
return json.dumps({"success": False, "error": "Job not found"}, indent=2)
return json.dumps({"success": True, "job": _format_job(job)}, indent=2)
if normalized == "update":
updates: Dict[str, Any] = {}