From fff237e11198a8918086bc4a2f53300a0a48dfcf Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Tue, 7 Apr 2026 22:49:01 -0700 Subject: [PATCH] feat(cron): track delivery failures in job status (#6042) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit _deliver_result() now returns Optional[str] — None on success, error message on failure. All failure paths (unknown platform, platform disabled, config load error, send failure, unresolvable target) return descriptive error strings. mark_job_run() gains delivery_error param, tracked as last_delivery_error on the job — separate from agent execution errors. A job where the agent succeeded but delivery failed shows last_status='ok' + last_delivery_error='...'. The cronjob list tool now surfaces last_delivery_error so agents and users can see when cron outputs aren't arriving. Inspired by PR #5863 (oxngon) — reimplemented with proper wiring. Tests: 3 new mark_job_run tests + 6 new _deliver_result return tests. --- cron/jobs.py | 8 +++- cron/scheduler.py | 51 +++++++++++++--------- hermes_cli/cron.py | 15 +++++++ tests/cron/test_jobs.py | 30 +++++++++++++ tests/cron/test_scheduler.py | 84 ++++++++++++++++++++++++++++++++++++ tools/cronjob_tools.py | 1 + 6 files changed, 167 insertions(+), 22 deletions(-) diff --git a/cron/jobs.py b/cron/jobs.py index 214da521f..4096d1fd8 100644 --- a/cron/jobs.py +++ b/cron/jobs.py @@ -574,12 +574,16 @@ def remove_job(job_id: str) -> bool: return False -def mark_job_run(job_id: str, success: bool, error: Optional[str] = None): +def mark_job_run(job_id: str, success: bool, error: Optional[str] = None, + delivery_error: Optional[str] = None): """ Mark a job as having been run. Updates last_run_at, last_status, increments completed count, computes next_run_at, and auto-deletes if repeat limit reached. + + ``delivery_error`` is tracked separately from the agent error — a job + can succeed (agent produced output) but fail delivery (platform down). """ jobs = load_jobs() for i, job in enumerate(jobs): @@ -588,6 +592,8 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None): job["last_run_at"] = now job["last_status"] = "ok" if success else "error" job["last_error"] = error if not success else None + # Track delivery failures separately — cleared on successful delivery + job["last_delivery_error"] = delivery_error # Increment completed count if job.get("repeat"): diff --git a/cron/scheduler.py b/cron/scheduler.py index 8d71248b4..e164c1f33 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -196,7 +196,7 @@ def _send_media_via_adapter(adapter, chat_id: str, media_files: list, metadata: logger.warning("Job '%s': failed to send media %s: %s", job.get("id", "?"), media_path, e) -def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> None: +def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Optional[str]: """ Deliver job output to the configured target (origin chat, specific platform, etc.). @@ -204,16 +204,16 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> None: use the live adapter first — this supports E2EE rooms (e.g. Matrix) where the standalone HTTP path cannot encrypt. Falls back to standalone send if the adapter path fails or is unavailable. + + Returns None on success, or an error string on failure. """ target = _resolve_delivery_target(job) if not target: if job.get("deliver", "local") != "local": - logger.warning( - "Job '%s' deliver=%s but no concrete delivery target could be resolved", - job["id"], - job.get("deliver", "local"), - ) - return + msg = f"no delivery target resolved for deliver={job.get('deliver', 'local')}" + logger.warning("Job '%s': %s", job["id"], msg) + return msg + return None # local-only jobs don't deliver — not a failure platform_name = target["platform"] chat_id = target["chat_id"] @@ -239,19 +239,22 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> None: } platform = platform_map.get(platform_name.lower()) if not platform: - logger.warning("Job '%s': unknown platform '%s' for delivery", job["id"], platform_name) - return + msg = f"unknown platform '{platform_name}'" + logger.warning("Job '%s': %s", job["id"], msg) + return msg try: config = load_gateway_config() except Exception as e: - logger.error("Job '%s': failed to load gateway config for delivery: %s", job["id"], e) - return + msg = f"failed to load gateway config: {e}" + logger.error("Job '%s': %s", job["id"], msg) + return msg pconfig = config.platforms.get(platform) if not pconfig or not pconfig.enabled: - logger.warning("Job '%s': platform '%s' not configured/enabled", job["id"], platform_name) - return + msg = f"platform '{platform_name}' not configured/enabled" + logger.warning("Job '%s': %s", job["id"], msg) + return msg # Optionally wrap the content with a header/footer so the user knows this # is a cron delivery. Wrapping is on by default; set cron.wrap_response: false @@ -307,7 +310,7 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> None: if adapter_ok: logger.info("Job '%s': delivered to %s:%s via live adapter", job["id"], platform_name, chat_id) - return + return None except Exception as e: logger.warning( "Job '%s': live adapter delivery to %s:%s failed (%s), falling back to standalone", @@ -329,13 +332,17 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> None: future = pool.submit(asyncio.run, _send_to_platform(platform, pconfig, chat_id, cleaned_delivery_content, thread_id=thread_id, media_files=media_files)) result = future.result(timeout=30) except Exception as e: - logger.error("Job '%s': delivery to %s:%s failed: %s", job["id"], platform_name, chat_id, e) - return + msg = f"delivery to {platform_name}:{chat_id} failed: {e}" + logger.error("Job '%s': %s", job["id"], msg) + return msg if result and result.get("error"): - logger.error("Job '%s': delivery error: %s", job["id"], result["error"]) - else: - logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id) + msg = f"delivery error: {result['error']}" + logger.error("Job '%s': %s", job["id"], msg) + return msg + + logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id) + return None _SCRIPT_TIMEOUT = 120 # seconds @@ -868,13 +875,15 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int: logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER) should_deliver = False + delivery_error = None if should_deliver: try: - _deliver_result(job, deliver_content, adapters=adapters, loop=loop) + delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop) except Exception as de: + delivery_error = str(de) logger.error("Delivery failed for job %s: %s", job["id"], de) - mark_job_run(job["id"], success, error) + mark_job_run(job["id"], success, error, delivery_error=delivery_error) executed += 1 except Exception as e: diff --git a/hermes_cli/cron.py b/hermes_cli/cron.py index d10513a28..e0ab6007a 100644 --- a/hermes_cli/cron.py +++ b/hermes_cli/cron.py @@ -93,6 +93,21 @@ def cron_list(show_all: bool = False): script = job.get("script") if script: print(f" Script: {script}") + + # Execution history + last_status = job.get("last_status") + if last_status: + last_run = job.get("last_run_at", "?") + if last_status == "ok": + status_display = color("ok", Colors.GREEN) + else: + status_display = color(f"{last_status}: {job.get('last_error', '?')}", Colors.RED) + print(f" Last run: {last_run} {status_display}") + + delivery_err = job.get("last_delivery_error") + if delivery_err: + print(f" {color('⚠ Delivery failed:', Colors.YELLOW)} {delivery_err}") + print() from hermes_cli.gateway import find_gateway_pids diff --git a/tests/cron/test_jobs.py b/tests/cron/test_jobs.py index cca460100..e0f56b961 100644 --- a/tests/cron/test_jobs.py +++ b/tests/cron/test_jobs.py @@ -339,6 +339,36 @@ class TestMarkJobRun: assert updated["last_status"] == "error" assert updated["last_error"] == "timeout" + def test_delivery_error_tracked_separately(self, tmp_cron_dir): + """Agent succeeds but delivery fails — both tracked independently.""" + job = create_job(prompt="Report", schedule="every 1h") + mark_job_run(job["id"], success=True, delivery_error="platform 'telegram' not configured") + updated = get_job(job["id"]) + assert updated["last_status"] == "ok" + assert updated["last_error"] is None + assert updated["last_delivery_error"] == "platform 'telegram' not configured" + + def test_delivery_error_cleared_on_success(self, tmp_cron_dir): + """Successful delivery clears the previous delivery error.""" + job = create_job(prompt="Report", schedule="every 1h") + mark_job_run(job["id"], success=True, delivery_error="network timeout") + updated = get_job(job["id"]) + assert updated["last_delivery_error"] == "network timeout" + # Next run delivers successfully + mark_job_run(job["id"], success=True, delivery_error=None) + updated = get_job(job["id"]) + assert updated["last_delivery_error"] is None + + def test_both_agent_and_delivery_error(self, tmp_cron_dir): + """Agent fails AND delivery fails — both errors recorded.""" + job = create_job(prompt="Report", schedule="every 1h") + mark_job_run(job["id"], success=False, error="model timeout", + delivery_error="platform 'discord' not enabled") + updated = get_job(job["id"]) + assert updated["last_status"] == "error" + assert updated["last_error"] == "model timeout" + assert updated["last_delivery_error"] == "platform 'discord' not enabled" + class TestAdvanceNextRun: """Tests for advance_next_run() — crash-safety for recurring jobs.""" diff --git a/tests/cron/test_scheduler.py b/tests/cron/test_scheduler.py index 4a15fa223..c07663a37 100644 --- a/tests/cron/test_scheduler.py +++ b/tests/cron/test_scheduler.py @@ -508,6 +508,90 @@ class TestDeliverResultWrapping: assert send_mock.call_args.kwargs["thread_id"] == "17585" +class TestDeliverResultErrorReturns: + """Verify _deliver_result returns error strings on failure, None on success.""" + + def test_returns_none_on_successful_delivery(self): + from gateway.config import Platform + + pconfig = MagicMock() + pconfig.enabled = True + mock_cfg = MagicMock() + mock_cfg.platforms = {Platform.TELEGRAM: pconfig} + + with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ + patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})): + job = { + "id": "ok-job", + "deliver": "origin", + "origin": {"platform": "telegram", "chat_id": "123"}, + } + result = _deliver_result(job, "Output.") + assert result is None + + def test_returns_none_for_local_delivery(self): + """local-only jobs don't deliver — not a failure.""" + job = {"id": "local-job", "deliver": "local"} + result = _deliver_result(job, "Output.") + assert result is None + + def test_returns_error_for_unknown_platform(self): + job = { + "id": "bad-platform", + "deliver": "origin", + "origin": {"platform": "fax", "chat_id": "123"}, + } + with patch("gateway.config.load_gateway_config"): + result = _deliver_result(job, "Output.") + assert result is not None + assert "unknown platform" in result + + def test_returns_error_when_platform_disabled(self): + from gateway.config import Platform + + pconfig = MagicMock() + pconfig.enabled = False + mock_cfg = MagicMock() + mock_cfg.platforms = {Platform.TELEGRAM: pconfig} + + with patch("gateway.config.load_gateway_config", return_value=mock_cfg): + job = { + "id": "disabled", + "deliver": "origin", + "origin": {"platform": "telegram", "chat_id": "123"}, + } + result = _deliver_result(job, "Output.") + assert result is not None + assert "not configured" in result + + def test_returns_error_on_send_failure(self): + from gateway.config import Platform + + pconfig = MagicMock() + pconfig.enabled = True + mock_cfg = MagicMock() + mock_cfg.platforms = {Platform.TELEGRAM: pconfig} + + with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ + patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"error": "rate limited"})): + job = { + "id": "rate-limited", + "deliver": "origin", + "origin": {"platform": "telegram", "chat_id": "123"}, + } + result = _deliver_result(job, "Output.") + assert result is not None + assert "rate limited" in result + + def test_returns_error_for_unresolved_target(self, monkeypatch): + """Non-local delivery with no resolvable target should return an error.""" + monkeypatch.delenv("TELEGRAM_HOME_CHANNEL", raising=False) + job = {"id": "no-target", "deliver": "telegram"} + result = _deliver_result(job, "Output.") + assert result is not None + assert "no delivery target" in result + + class TestRunJobSessionPersistence: def test_run_job_passes_session_db_and_cron_platform(self, tmp_path): job = { diff --git a/tools/cronjob_tools.py b/tools/cronjob_tools.py index 82d43c588..595ad8bc7 100644 --- a/tools/cronjob_tools.py +++ b/tools/cronjob_tools.py @@ -195,6 +195,7 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]: "next_run_at": job.get("next_run_at"), "last_run_at": job.get("last_run_at"), "last_status": job.get("last_status"), + "last_delivery_error": job.get("last_delivery_error"), "enabled": job.get("enabled", True), "state": job.get("state", "scheduled" if job.get("enabled", True) else "paused"), "paused_at": job.get("paused_at"),