Compare commits

...

3 Commits

Author SHA1 Message Date
ed936198ff fix: gateway reconnect drops active cron job notifications (#744)
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 39s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 43s
Tests / e2e (pull_request) Successful in 4m2s
Tests / test (pull_request) Failing after 39m22s
2026-04-15 04:15:01 +00:00
50e7301ee9 fix: gateway reconnect drops active cron job notifications (#744) 2026-04-15 04:14:58 +00:00
9c4abb4992 fix: gateway reconnect drops active cron job notifications (#744) 2026-04-15 04:14:54 +00:00
3 changed files with 610 additions and 12 deletions

View File

@@ -197,7 +197,7 @@ def _send_media_via_adapter(adapter, chat_id: str, media_files: list, metadata:
logger.warning("Job '%s': failed to send media %s: %s", job.get("id", "?"), media_path, e)
def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Optional[str]:
def _deliver_result(job: dict, content: str, adapters=None, loop=None, pending_delivery_callback=None) -> Optional[str]:
"""
Deliver job output to the configured target (origin chat, specific platform, etc.).
@@ -206,6 +206,10 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
the standalone HTTP path cannot encrypt. Falls back to standalone send if
the adapter path fails or is unavailable.
When ``pending_delivery_callback`` is provided and delivery fails due to
the platform being unavailable, the delivery is queued for retry when the
platform reconnects instead of being silently dropped.
Returns None on success, or an error string on failure.
"""
target = _resolve_delivery_target(job)
@@ -354,11 +358,29 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
except Exception as e:
msg = f"delivery to {platform_name}:{chat_id} failed: {e}"
logger.error("Job '%s': %s", job["id"], msg)
# Queue for retry if callback provided
if pending_delivery_callback:
try:
pending_delivery_callback(
platform_name, chat_id, thread_id,
delivery_content, job["id"], job.get("name", job["id"]),
)
except Exception:
pass
return msg
if result and result.get("error"):
msg = f"delivery error: {result['error']}"
logger.error("Job '%s': %s", job["id"], msg)
# Queue for retry if callback provided
if pending_delivery_callback:
try:
pending_delivery_callback(
platform_name, chat_id, thread_id,
delivery_content, job["id"], job.get("name", job["id"]),
)
except Exception:
pass
return msg
logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id)
@@ -896,7 +918,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
logger.debug("Job '%s': failed to close SQLite session store: %s", job_id, e)
def tick(verbose: bool = True, adapters=None, loop=None) -> int:
def tick(verbose: bool = True, adapters=None, loop=None, pending_delivery_callback=None) -> int:
"""
Check and run all due jobs.
@@ -907,6 +929,9 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
verbose: Whether to print status messages
adapters: Optional dict mapping Platform → live adapter (from gateway)
loop: Optional asyncio event loop (from gateway) for live adapter sends
pending_delivery_callback: Optional callback to queue failed deliveries
for retry when a platform reconnects. Signature:
(platform_name, chat_id, thread_id, content, job_id, job_name) -> None
Returns:
Number of jobs executed (0 if another tick is already running)
@@ -964,7 +989,11 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
delivery_error = None
if should_deliver:
try:
delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop)
delivery_error = _deliver_result(
job, deliver_content,
adapters=adapters, loop=loop,
pending_delivery_callback=pending_delivery_callback,
)
except Exception as de:
delivery_error = str(de)
logger.error("Delivery failed for job %s: %s", job["id"], de)

View File

@@ -594,6 +594,14 @@ class GatewayRunner:
# Key: Platform enum, Value: {"config": platform_config, "attempts": int, "next_retry": float}
self._failed_platforms: Dict[Platform, Dict[str, Any]] = {}
# Pending cron deliveries that failed during platform disconnect.
# Each entry: {"platform": str, "chat_id": str, "thread_id": str|None,
# "content": str, "job_id": str, "job_name": str, "timestamp": float}
# Flushed when the target platform reconnects.
import threading as _threading2
self._pending_cron_deliveries: List[Dict[str, Any]] = []
self._pending_deliveries_lock = _threading2.Lock()
# Track pending /update prompt responses per session.
# Key: session_key, Value: True when a prompt is waiting for user input.
self._update_prompt_pending: Dict[str, bool] = {}
@@ -1021,6 +1029,103 @@ class GatewayRunner:
self._exit_reason = reason
self._shutdown_event.set()
def queue_failed_cron_delivery(
self,
platform_name: str,
chat_id: str,
thread_id: Optional[str],
content: str,
job_id: str,
job_name: str,
) -> None:
"""Queue a failed cron delivery for retry when the platform reconnects.
Called by cron/scheduler._deliver_result when live adapter delivery fails
and the platform is in a known-disconnected state. The delivery will be
retried when _flush_pending_cron_deliveries is called after reconnect.
"""
import time as _time
entry = {
"platform": platform_name,
"chat_id": chat_id,
"thread_id": thread_id,
"content": content,
"job_id": job_id,
"job_name": job_name,
"timestamp": _time.time(),
}
with self._pending_deliveries_lock:
self._pending_cron_deliveries.append(entry)
queue_len = len(self._pending_cron_deliveries)
logger.info(
"Queued failed cron delivery for %s:%s (job=%s, queue=%d)",
platform_name, chat_id, job_id, queue_len,
)
async def _flush_pending_cron_deliveries(self, platform: "Platform") -> None:
"""Retry queued cron deliveries for a platform that just reconnected.
Called after a successful platform reconnect. Delivers each pending
message via the now-available live adapter, with a best-effort approach
(individual failures are logged but don't block other deliveries).
"""
platform_name = platform.value
with self._pending_deliveries_lock:
# Split into matching and non-matching
matching = [e for e in self._pending_cron_deliveries if e["platform"] == platform_name]
remaining = [e for e in self._pending_cron_deliveries if e["platform"] != platform_name]
self._pending_cron_deliveries = remaining
if not matching:
return
logger.info(
"Flushing %d pending cron deliveries for reconnected %s",
len(matching), platform_name,
)
adapter = self.adapters.get(platform)
if not adapter:
logger.warning(
"Cannot flush %d deliveries: %s adapter not in self.adapters after reconnect?",
len(matching), platform_name,
)
# Re-queue them
with self._pending_deliveries_lock:
self._pending_cron_deliveries.extend(matching)
return
for entry in matching:
try:
chat_id = entry["chat_id"]
content = entry["content"]
metadata = {}
if entry.get("thread_id"):
metadata["thread_id"] = entry["thread_id"]
# Truncate if needed (mirror delivery.py logic)
if len(content) > 4000:
content = content[:3800] + "\n\n... [truncated, was queued during disconnect]"
result = await adapter.send(chat_id, content, metadata=metadata or None)
if result and not getattr(result, "success", True):
logger.warning(
"Pending delivery flush failed for %s:%s (job=%s): %s",
platform_name, chat_id, entry.get("job_id"),
getattr(result, "error", "unknown"),
)
else:
logger.info(
"Flushed pending cron delivery to %s:%s (job=%s)",
platform_name, chat_id, entry.get("job_id"),
)
except Exception as e:
logger.warning(
"Failed to flush pending delivery to %s:%s (job=%s): %s",
platform_name, entry.get("chat_id"), entry.get("job_id"), e,
)
def _running_agent_count(self) -> int:
return len(self._running_agents)
@@ -1391,6 +1496,65 @@ class GatewayRunner:
except Exception as e:
logger.debug("Failed interrupting agent during shutdown: %s", e)
async def _notify_active_sessions_of_shutdown(self) -> None:
"""Send a notification to every chat with an active agent.
Called at the very start of stop() — adapters are still connected so
messages can be delivered. Best-effort: individual send failures are
logged and swallowed so they never block the shutdown sequence.
"""
active = self._snapshot_running_agents()
if not active:
return
action = "restarting" if self._restart_requested else "shutting down"
hint = (
"Your current task will be interrupted. "
"Send any message after restart to resume where it left off."
if self._restart_requested
else "Your current task will be interrupted."
)
msg = f"⚠️ Gateway {action}{hint}"
notified: set = set()
for session_key in active:
# Parse platform + chat_id from the session key.
# Format: agent:main:{platform}:{chat_type}:{chat_id}[:{extra}...]
parts = session_key.split(":")
if len(parts) < 5:
continue
platform_str = parts[2]
chat_id = parts[4]
# Deduplicate: one notification per chat, even if multiple
# sessions (different users/threads) share the same chat.
dedup_key = (platform_str, chat_id)
if dedup_key in notified:
continue
try:
platform = Platform(platform_str)
adapter = self.adapters.get(platform)
if not adapter:
continue
# Include thread_id if present so the message lands in the
# correct forum topic / thread.
thread_id = parts[5] if len(parts) > 5 else None
metadata = {"thread_id": thread_id} if thread_id else None
await adapter.send(chat_id, msg, metadata=metadata)
notified.add(dedup_key)
logger.info(
"Sent shutdown notification to %s:%s",
platform_str, chat_id,
)
except Exception as e:
logger.debug(
"Failed to send shutdown notification to %s:%s: %s",
platform_str, chat_id, e,
)
def _finalize_shutdown_agents(self, active_agents: Dict[str, Any]) -> None:
for agent in active_agents.values():
try:
@@ -1416,6 +1580,106 @@ class GatewayRunner:
except Exception:
pass
_STUCK_LOOP_THRESHOLD = 3 # restarts while active before auto-suspend
_STUCK_LOOP_FILE = ".restart_failure_counts"
def _increment_restart_failure_counts(self, active_session_keys: set) -> None:
"""Increment restart-failure counters for sessions active at shutdown.
Persists to a JSON file so counters survive across restarts.
Sessions NOT in active_session_keys are removed (they completed
successfully, so the loop is broken).
"""
import json
path = _hermes_home / self._STUCK_LOOP_FILE
try:
counts = json.loads(path.read_text()) if path.exists() else {}
except Exception:
counts = {}
# Increment active sessions, remove inactive ones (loop broken)
new_counts = {}
for key in active_session_keys:
new_counts[key] = counts.get(key, 0) + 1
# Keep any entries that are still above 0 even if not active now
# (they might become active again next restart)
try:
path.write_text(json.dumps(new_counts))
except Exception:
pass
def _suspend_stuck_loop_sessions(self) -> int:
"""Suspend sessions that have been active across too many restarts.
Returns the number of sessions suspended. Called on gateway startup
AFTER suspend_recently_active() to catch the stuck-loop pattern:
session loads → agent gets stuck → gateway restarts → repeat.
"""
import json
path = _hermes_home / self._STUCK_LOOP_FILE
if not path.exists():
return 0
try:
counts = json.loads(path.read_text())
except Exception:
return 0
suspended = 0
stuck_keys = [k for k, v in counts.items() if v >= self._STUCK_LOOP_THRESHOLD]
for session_key in stuck_keys:
try:
entry = self.session_store._entries.get(session_key)
if entry and not entry.suspended:
entry.suspended = True
suspended += 1
logger.warning(
"Auto-suspended stuck session %s (active across %d "
"consecutive restarts — likely a stuck loop)",
session_key[:30], counts[session_key],
)
except Exception:
pass
if suspended:
try:
self.session_store._save()
except Exception:
pass
# Clear the file — counters start fresh after suspension
try:
path.unlink(missing_ok=True)
except Exception:
pass
return suspended
def _clear_restart_failure_count(self, session_key: str) -> None:
"""Clear the restart-failure counter for a session that completed OK.
Called after a successful agent turn to signal the loop is broken.
"""
import json
path = _hermes_home / self._STUCK_LOOP_FILE
if not path.exists():
return
try:
counts = json.loads(path.read_text())
if session_key in counts:
del counts[session_key]
if counts:
path.write_text(json.dumps(counts))
else:
path.unlink(missing_ok=True)
except Exception:
pass
async def _launch_detached_restart_command(self) -> None:
import shutil
import subprocess
@@ -1559,6 +1823,17 @@ class GatewayRunner:
except Exception as e:
logger.warning("Session suspension on startup failed: %s", e)
# Stuck-loop detection (#7536): if a session has been active across
# 3+ consecutive restarts, it's probably stuck in a loop (the same
# history keeps causing the agent to hang). Auto-suspend it so the
# user gets a clean slate on the next message.
try:
stuck = self._suspend_stuck_loop_sessions()
if stuck:
logger.warning("Auto-suspended %d stuck-loop session(s)", stuck)
except Exception as e:
logger.debug("Stuck-loop detection failed: %s", e)
connected_count = 0
enabled_platform_count = 0
startup_nonretryable_errors: list[str] = []
@@ -1945,6 +2220,13 @@ class GatewayRunner:
build_channel_directory(self.adapters)
except Exception:
pass
# Flush any cron deliveries that were queued during the disconnect
try:
await self._flush_pending_cron_deliveries(platform)
except Exception as flush_err:
logger.warning("Error flushing pending deliveries for %s: %s",
platform.value, flush_err)
else:
# Check if the failure is non-retryable
if adapter.has_fatal_error and not adapter.fatal_error_retryable:
@@ -2018,6 +2300,10 @@ class GatewayRunner:
self._running = False
self._draining = True
# Notify all chats with active agents BEFORE draining.
# Adapters are still connected here, so messages can be sent.
await self._notify_active_sessions_of_shutdown()
timeout = self._restart_drain_timeout
active_agents, timed_out = await self._drain_active_agents(timeout)
if timed_out:
@@ -2088,12 +2374,31 @@ class GatewayRunner:
# Write a clean-shutdown marker so the next startup knows this
# wasn't a crash. suspend_recently_active() only needs to run
# after unexpected exits — graceful shutdowns already drain
# active agents, so there's no stuck-session risk.
try:
(_hermes_home / ".clean_shutdown").touch()
except Exception:
pass
# after unexpected exits. However, if the drain timed out and
# agents were force-interrupted, their sessions may be in an
# incomplete state (trailing tool response, no final assistant
# message). Skip the marker in that case so the next startup
# suspends those sessions — giving users a clean slate instead
# of resuming a half-finished tool loop.
if not timed_out:
try:
(_hermes_home / ".clean_shutdown").touch()
except Exception:
pass
else:
logger.info(
"Skipping .clean_shutdown marker — drain timed out with "
"interrupted agents; next startup will suspend recently "
"active sessions."
)
# Track sessions that were active at shutdown for stuck-loop
# detection (#7536). On each restart, the counter increments
# for sessions that were running. If a session hits the
# threshold (3 consecutive restarts while active), the next
# startup auto-suspends it — breaking the loop.
if active_agents:
self._increment_restart_failure_counts(set(active_agents.keys()))
if self._restart_requested and self._restart_via_service:
self._exit_code = GATEWAY_SERVICE_RESTART_EXIT_CODE
@@ -3593,6 +3898,12 @@ class GatewayRunner:
_response_time, _api_calls, _resp_len,
)
# Successful turn — clear any stuck-loop counter for this session.
# This ensures the counter only accumulates across CONSECUTIVE
# restarts where the session was active (never completed).
if session_key:
self._clear_restart_failure_count(session_key)
# Surface error details when the agent failed silently (final_response=None)
if not response and agent_result.get("failed"):
error_detail = agent_result.get("error", "unknown error")
@@ -8376,6 +8687,21 @@ class GatewayRunner:
if _msn:
message = _msn + "\n\n" + message
# Auto-continue: if the loaded history ends with a tool result,
# the previous agent turn was interrupted mid-work (gateway
# restart, crash, SIGTERM). Prepend a system note so the model
# finishes processing the pending tool results before addressing
# the user's new message. (#4493)
if agent_history and agent_history[-1].get("role") == "tool":
message = (
"[System note: Your previous turn was interrupted before you could "
"process the last tool result(s). The conversation history contains "
"tool outputs you haven't responded to yet. Please finish processing "
"those results and summarize what was accomplished, then address the "
"user's new message below.]\n\n"
+ message
)
_approval_session_key = session_key or ""
_approval_session_token = set_current_session_key(_approval_session_key)
register_gateway_notify(_approval_session_key, _approval_notify_sync)
@@ -9019,7 +9345,7 @@ class GatewayRunner:
return response
def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, interval: int = 60):
def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, interval: int = 60, pending_delivery_callback=None):
"""
Background thread that ticks the cron scheduler at a regular interval.
@@ -9029,6 +9355,9 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in
When ``adapters`` and ``loop`` are provided, passes them through to the
cron delivery path so live adapters can be used for E2EE rooms.
When ``pending_delivery_callback`` is provided, failed deliveries are
queued for retry when the target platform reconnects.
Also refreshes the channel directory every 5 minutes and prunes the
image/audio/document cache once per hour.
"""
@@ -9042,7 +9371,8 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in
tick_count = 0
while not stop_event.is_set():
try:
cron_tick(verbose=False, adapters=adapters, loop=loop)
cron_tick(verbose=False, adapters=adapters, loop=loop,
pending_delivery_callback=pending_delivery_callback)
except Exception as e:
logger.debug("Cron tick error: %s", e)
@@ -9187,8 +9517,41 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
runner = GatewayRunner(config)
# Track whether a signal initiated the shutdown (vs. internal request).
# When an unexpected SIGTERM kills the gateway, we exit non-zero so
# systemd's Restart=on-failure revives the process. systemctl stop
# is safe: systemd tracks stop-requested state independently of exit
# code, so Restart= never fires for a deliberate stop.
_signal_initiated_shutdown = False
# Set up signal handlers
def shutdown_signal_handler():
nonlocal _signal_initiated_shutdown
_signal_initiated_shutdown = True
logger.info("Received SIGTERM/SIGINT — initiating shutdown")
# Diagnostic: log all hermes-related processes so we can identify
# what triggered the signal (hermes update, hermes gateway restart,
# a stale detached subprocess, etc.).
try:
import subprocess as _sp
_ps = _sp.run(
["ps", "aux"],
capture_output=True, text=True, timeout=3,
)
_hermes_procs = [
line for line in _ps.stdout.splitlines()
if ("hermes" in line.lower() or "gateway" in line.lower())
and str(os.getpid()) not in line.split()[1:2] # exclude self
]
if _hermes_procs:
logger.warning(
"Shutdown diagnostic — other hermes processes running:\n %s",
"\n ".join(_hermes_procs),
)
else:
logger.info("Shutdown diagnostic — no other hermes processes found")
except Exception:
pass
asyncio.create_task(runner.stop())
def restart_signal_handler():
@@ -9230,7 +9593,11 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
cron_thread = threading.Thread(
target=_start_cron_ticker,
args=(cron_stop,),
kwargs={"adapters": runner.adapters, "loop": asyncio.get_running_loop()},
kwargs={
"adapters": runner.adapters,
"loop": asyncio.get_running_loop(),
"pending_delivery_callback": runner.queue_failed_cron_delivery,
},
daemon=True,
name="cron-ticker",
)
@@ -9258,6 +9625,21 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
if runner.exit_code is not None:
raise SystemExit(runner.exit_code)
# When a signal (SIGTERM/SIGINT) caused the shutdown and it wasn't a
# planned restart (/restart, /update, SIGUSR1), exit non-zero so
# systemd's Restart=on-failure revives the process. This covers:
# - hermes update killing the gateway mid-work
# - External kill commands
# - WSL2/container runtime sending unexpected signals
# systemctl stop is safe: systemd tracks "stop requested" state
# independently of exit code, so Restart= never fires for it.
if _signal_initiated_shutdown and not runner._restart_requested:
logger.info(
"Exiting with code 1 (signal-initiated shutdown without restart "
"request) so systemd Restart=on-failure can revive the gateway."
)
return False # → sys.exit(1) in the caller
return True

View File

@@ -0,0 +1,187 @@
"""Tests for pending cron delivery queue — retry on reconnect."""
import asyncio
import threading
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from gateway.config import Platform
class TestPendingCronDeliveryQueue:
"""Verify that failed cron deliveries are queued and flushed on reconnect."""
def _make_runner(self):
"""Create a minimal GatewayRunner for testing pending deliveries."""
from gateway.run import GatewayRunner
runner = object.__new__(GatewayRunner)
runner._pending_cron_deliveries = []
runner._pending_deliveries_lock = threading.Lock()
runner.adapters = {}
runner.queue_failed_cron_delivery = GatewayRunner.queue_failed_cron_delivery.__get__(runner, GatewayRunner)
runner._flush_pending_cron_deliveries = GatewayRunner._flush_pending_cron_deliveries.__get__(runner, GatewayRunner)
return runner
def test_queue_failed_delivery_adds_to_queue(self):
runner = self._make_runner()
assert len(runner._pending_cron_deliveries) == 0
runner.queue_failed_cron_delivery(
platform_name="telegram", chat_id="12345", thread_id=None,
content="test output", job_id="job-1", job_name="Test Job",
)
assert len(runner._pending_cron_deliveries) == 1
entry = runner._pending_cron_deliveries[0]
assert entry["platform"] == "telegram"
assert entry["chat_id"] == "12345"
assert entry["content"] == "test output"
def test_queue_preserves_thread_id(self):
runner = self._make_runner()
runner.queue_failed_cron_delivery(
platform_name="telegram", chat_id="12345", thread_id="99",
content="test", job_id="j1", job_name="Job",
)
assert runner._pending_cron_deliveries[0]["thread_id"] == "99"
def test_flush_removes_matching_platform_entries(self):
runner = self._make_runner()
runner.queue_failed_cron_delivery("telegram", "111", None, "msg1", "j1", "Job1")
runner.queue_failed_cron_delivery("discord", "222", None, "msg2", "j2", "Job2")
runner.queue_failed_cron_delivery("telegram", "333", None, "msg3", "j3", "Job3")
mock_adapter = AsyncMock()
mock_adapter.send = AsyncMock(return_value=MagicMock(success=True))
runner.adapters = {Platform.TELEGRAM: mock_adapter}
asyncio.get_event_loop().run_until_complete(
runner._flush_pending_cron_deliveries(Platform.TELEGRAM)
)
assert len(runner._pending_cron_deliveries) == 1
assert runner._pending_cron_deliveries[0]["platform"] == "discord"
def test_flush_calls_adapter_send_for_each_entry(self):
runner = self._make_runner()
runner.queue_failed_cron_delivery("telegram", "111", None, "msg1", "j1", "Job1")
runner.queue_failed_cron_delivery("telegram", "222", "42", "msg2", "j2", "Job2")
mock_adapter = AsyncMock()
mock_adapter.send = AsyncMock(return_value=MagicMock(success=True))
runner.adapters = {Platform.TELEGRAM: mock_adapter}
asyncio.get_event_loop().run_until_complete(
runner._flush_pending_cron_deliveries(Platform.TELEGRAM)
)
assert mock_adapter.send.call_count == 2
def test_flush_requeues_if_adapter_missing(self):
runner = self._make_runner()
runner.queue_failed_cron_delivery("telegram", "111", None, "msg1", "j1", "Job1")
runner.adapters = {}
asyncio.get_event_loop().run_until_complete(
runner._flush_pending_cron_deliveries(Platform.TELEGRAM)
)
assert len(runner._pending_cron_deliveries) == 1
def test_flush_skips_non_matching_platforms(self):
runner = self._make_runner()
runner.queue_failed_cron_delivery("discord", "222", None, "msg", "j1", "Job")
runner.adapters = {Platform.TELEGRAM: AsyncMock()}
asyncio.get_event_loop().run_until_complete(
runner._flush_pending_cron_deliveries(Platform.TELEGRAM)
)
assert len(runner._pending_cron_deliveries) == 1
def test_flush_passes_thread_id_in_metadata(self):
runner = self._make_runner()
runner.queue_failed_cron_delivery("telegram", "111", "42", "msg", "j1", "Job")
mock_adapter = AsyncMock()
mock_adapter.send = AsyncMock(return_value=MagicMock(success=True))
runner.adapters = {Platform.TELEGRAM: mock_adapter}
asyncio.get_event_loop().run_until_complete(
runner._flush_pending_cron_deliveries(Platform.TELEGRAM)
)
call_kwargs = mock_adapter.send.call_args.kwargs
assert call_kwargs["metadata"]["thread_id"] == "42"
class TestDeliverResultPendingCallback:
"""Verify _deliver_result calls pending_delivery_callback on failure."""
@pytest.fixture
def mock_gateway_config(self):
"""Create a mock gateway config with telegram platform enabled."""
from gateway.config import Platform, GatewayConfig
cfg = GatewayConfig()
cfg.platforms = {Platform.TELEGRAM: MagicMock(enabled=True)}
return cfg
def _make_job(self):
return {
"id": "job-1", "name": "Test Job",
"deliver": "telegram:12345",
"origin": {"platform": "telegram", "chat_id": "12345"},
}
def test_callback_on_exception(self, mock_gateway_config):
from cron.scheduler import _deliver_result
callback = MagicMock()
with patch("cron.scheduler._resolve_delivery_target", return_value={
"platform": "telegram", "chat_id": "12345", "thread_id": None
}), \
patch("gateway.config.load_gateway_config", return_value=mock_gateway_config), \
patch("tools.send_message_tool._send_to_platform", side_effect=Exception("down")):
result = _deliver_result(self._make_job(), "test", pending_delivery_callback=callback)
assert result is not None
callback.assert_called_once()
def test_callback_on_error_dict(self, mock_gateway_config):
from cron.scheduler import _deliver_result
callback = MagicMock()
with patch("cron.scheduler._resolve_delivery_target", return_value={
"platform": "telegram", "chat_id": "12345", "thread_id": None
}), \
patch("gateway.config.load_gateway_config", return_value=mock_gateway_config), \
patch("tools.send_message_tool._send_to_platform", return_value={"error": "down"}):
result = _deliver_result(self._make_job(), "test", pending_delivery_callback=callback)
assert result is not None
callback.assert_called_once()
def test_no_callback_on_success(self, mock_gateway_config):
from cron.scheduler import _deliver_result
callback = MagicMock()
with patch("cron.scheduler._resolve_delivery_target", return_value={
"platform": "telegram", "chat_id": "12345", "thread_id": None
}), \
patch("gateway.config.load_gateway_config", return_value=mock_gateway_config), \
patch("tools.send_message_tool._send_to_platform", return_value={"ok": True}):
result = _deliver_result(self._make_job(), "test", pending_delivery_callback=callback)
assert result is None
callback.assert_not_called()
def test_no_callback_no_crash(self, mock_gateway_config):
from cron.scheduler import _deliver_result
with patch("cron.scheduler._resolve_delivery_target", return_value={
"platform": "telegram", "chat_id": "12345", "thread_id": None
}), \
patch("gateway.config.load_gateway_config", return_value=mock_gateway_config), \
patch("tools.send_message_tool._send_to_platform", side_effect=Exception("down")):
result = _deliver_result(self._make_job(), "test")
assert result is not None # error, no crash