Compare commits
3 Commits
fix/658
...
fix/744-ga
| Author | SHA1 | Date | |
|---|---|---|---|
| ed936198ff | |||
| 50e7301ee9 | |||
| 9c4abb4992 |
@@ -197,7 +197,7 @@ def _send_media_via_adapter(adapter, chat_id: str, media_files: list, metadata:
|
||||
logger.warning("Job '%s': failed to send media %s: %s", job.get("id", "?"), media_path, e)
|
||||
|
||||
|
||||
def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Optional[str]:
|
||||
def _deliver_result(job: dict, content: str, adapters=None, loop=None, pending_delivery_callback=None) -> Optional[str]:
|
||||
"""
|
||||
Deliver job output to the configured target (origin chat, specific platform, etc.).
|
||||
|
||||
@@ -206,6 +206,10 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
|
||||
the standalone HTTP path cannot encrypt. Falls back to standalone send if
|
||||
the adapter path fails or is unavailable.
|
||||
|
||||
When ``pending_delivery_callback`` is provided and delivery fails due to
|
||||
the platform being unavailable, the delivery is queued for retry when the
|
||||
platform reconnects instead of being silently dropped.
|
||||
|
||||
Returns None on success, or an error string on failure.
|
||||
"""
|
||||
target = _resolve_delivery_target(job)
|
||||
@@ -354,11 +358,29 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
|
||||
except Exception as e:
|
||||
msg = f"delivery to {platform_name}:{chat_id} failed: {e}"
|
||||
logger.error("Job '%s': %s", job["id"], msg)
|
||||
# Queue for retry if callback provided
|
||||
if pending_delivery_callback:
|
||||
try:
|
||||
pending_delivery_callback(
|
||||
platform_name, chat_id, thread_id,
|
||||
delivery_content, job["id"], job.get("name", job["id"]),
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
return msg
|
||||
|
||||
if result and result.get("error"):
|
||||
msg = f"delivery error: {result['error']}"
|
||||
logger.error("Job '%s': %s", job["id"], msg)
|
||||
# Queue for retry if callback provided
|
||||
if pending_delivery_callback:
|
||||
try:
|
||||
pending_delivery_callback(
|
||||
platform_name, chat_id, thread_id,
|
||||
delivery_content, job["id"], job.get("name", job["id"]),
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
return msg
|
||||
|
||||
logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id)
|
||||
@@ -896,7 +918,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
logger.debug("Job '%s': failed to close SQLite session store: %s", job_id, e)
|
||||
|
||||
|
||||
def tick(verbose: bool = True, adapters=None, loop=None) -> int:
|
||||
def tick(verbose: bool = True, adapters=None, loop=None, pending_delivery_callback=None) -> int:
|
||||
"""
|
||||
Check and run all due jobs.
|
||||
|
||||
@@ -907,6 +929,9 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
|
||||
verbose: Whether to print status messages
|
||||
adapters: Optional dict mapping Platform → live adapter (from gateway)
|
||||
loop: Optional asyncio event loop (from gateway) for live adapter sends
|
||||
pending_delivery_callback: Optional callback to queue failed deliveries
|
||||
for retry when a platform reconnects. Signature:
|
||||
(platform_name, chat_id, thread_id, content, job_id, job_name) -> None
|
||||
|
||||
Returns:
|
||||
Number of jobs executed (0 if another tick is already running)
|
||||
@@ -964,7 +989,11 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
|
||||
delivery_error = None
|
||||
if should_deliver:
|
||||
try:
|
||||
delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop)
|
||||
delivery_error = _deliver_result(
|
||||
job, deliver_content,
|
||||
adapters=adapters, loop=loop,
|
||||
pending_delivery_callback=pending_delivery_callback,
|
||||
)
|
||||
except Exception as de:
|
||||
delivery_error = str(de)
|
||||
logger.error("Delivery failed for job %s: %s", job["id"], de)
|
||||
|
||||
400
gateway/run.py
400
gateway/run.py
@@ -594,6 +594,14 @@ class GatewayRunner:
|
||||
# Key: Platform enum, Value: {"config": platform_config, "attempts": int, "next_retry": float}
|
||||
self._failed_platforms: Dict[Platform, Dict[str, Any]] = {}
|
||||
|
||||
# Pending cron deliveries that failed during platform disconnect.
|
||||
# Each entry: {"platform": str, "chat_id": str, "thread_id": str|None,
|
||||
# "content": str, "job_id": str, "job_name": str, "timestamp": float}
|
||||
# Flushed when the target platform reconnects.
|
||||
import threading as _threading2
|
||||
self._pending_cron_deliveries: List[Dict[str, Any]] = []
|
||||
self._pending_deliveries_lock = _threading2.Lock()
|
||||
|
||||
# Track pending /update prompt responses per session.
|
||||
# Key: session_key, Value: True when a prompt is waiting for user input.
|
||||
self._update_prompt_pending: Dict[str, bool] = {}
|
||||
@@ -1021,6 +1029,103 @@ class GatewayRunner:
|
||||
self._exit_reason = reason
|
||||
self._shutdown_event.set()
|
||||
|
||||
def queue_failed_cron_delivery(
|
||||
self,
|
||||
platform_name: str,
|
||||
chat_id: str,
|
||||
thread_id: Optional[str],
|
||||
content: str,
|
||||
job_id: str,
|
||||
job_name: str,
|
||||
) -> None:
|
||||
"""Queue a failed cron delivery for retry when the platform reconnects.
|
||||
|
||||
Called by cron/scheduler._deliver_result when live adapter delivery fails
|
||||
and the platform is in a known-disconnected state. The delivery will be
|
||||
retried when _flush_pending_cron_deliveries is called after reconnect.
|
||||
"""
|
||||
import time as _time
|
||||
entry = {
|
||||
"platform": platform_name,
|
||||
"chat_id": chat_id,
|
||||
"thread_id": thread_id,
|
||||
"content": content,
|
||||
"job_id": job_id,
|
||||
"job_name": job_name,
|
||||
"timestamp": _time.time(),
|
||||
}
|
||||
with self._pending_deliveries_lock:
|
||||
self._pending_cron_deliveries.append(entry)
|
||||
queue_len = len(self._pending_cron_deliveries)
|
||||
logger.info(
|
||||
"Queued failed cron delivery for %s:%s (job=%s, queue=%d)",
|
||||
platform_name, chat_id, job_id, queue_len,
|
||||
)
|
||||
|
||||
async def _flush_pending_cron_deliveries(self, platform: "Platform") -> None:
|
||||
"""Retry queued cron deliveries for a platform that just reconnected.
|
||||
|
||||
Called after a successful platform reconnect. Delivers each pending
|
||||
message via the now-available live adapter, with a best-effort approach
|
||||
(individual failures are logged but don't block other deliveries).
|
||||
"""
|
||||
|
||||
platform_name = platform.value
|
||||
with self._pending_deliveries_lock:
|
||||
# Split into matching and non-matching
|
||||
matching = [e for e in self._pending_cron_deliveries if e["platform"] == platform_name]
|
||||
remaining = [e for e in self._pending_cron_deliveries if e["platform"] != platform_name]
|
||||
self._pending_cron_deliveries = remaining
|
||||
|
||||
if not matching:
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"Flushing %d pending cron deliveries for reconnected %s",
|
||||
len(matching), platform_name,
|
||||
)
|
||||
|
||||
adapter = self.adapters.get(platform)
|
||||
if not adapter:
|
||||
logger.warning(
|
||||
"Cannot flush %d deliveries: %s adapter not in self.adapters after reconnect?",
|
||||
len(matching), platform_name,
|
||||
)
|
||||
# Re-queue them
|
||||
with self._pending_deliveries_lock:
|
||||
self._pending_cron_deliveries.extend(matching)
|
||||
return
|
||||
|
||||
for entry in matching:
|
||||
try:
|
||||
chat_id = entry["chat_id"]
|
||||
content = entry["content"]
|
||||
metadata = {}
|
||||
if entry.get("thread_id"):
|
||||
metadata["thread_id"] = entry["thread_id"]
|
||||
|
||||
# Truncate if needed (mirror delivery.py logic)
|
||||
if len(content) > 4000:
|
||||
content = content[:3800] + "\n\n... [truncated, was queued during disconnect]"
|
||||
|
||||
result = await adapter.send(chat_id, content, metadata=metadata or None)
|
||||
if result and not getattr(result, "success", True):
|
||||
logger.warning(
|
||||
"Pending delivery flush failed for %s:%s (job=%s): %s",
|
||||
platform_name, chat_id, entry.get("job_id"),
|
||||
getattr(result, "error", "unknown"),
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
"Flushed pending cron delivery to %s:%s (job=%s)",
|
||||
platform_name, chat_id, entry.get("job_id"),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Failed to flush pending delivery to %s:%s (job=%s): %s",
|
||||
platform_name, entry.get("chat_id"), entry.get("job_id"), e,
|
||||
)
|
||||
|
||||
def _running_agent_count(self) -> int:
|
||||
return len(self._running_agents)
|
||||
|
||||
@@ -1391,6 +1496,65 @@ class GatewayRunner:
|
||||
except Exception as e:
|
||||
logger.debug("Failed interrupting agent during shutdown: %s", e)
|
||||
|
||||
async def _notify_active_sessions_of_shutdown(self) -> None:
|
||||
"""Send a notification to every chat with an active agent.
|
||||
|
||||
Called at the very start of stop() — adapters are still connected so
|
||||
messages can be delivered. Best-effort: individual send failures are
|
||||
logged and swallowed so they never block the shutdown sequence.
|
||||
"""
|
||||
active = self._snapshot_running_agents()
|
||||
if not active:
|
||||
return
|
||||
|
||||
action = "restarting" if self._restart_requested else "shutting down"
|
||||
hint = (
|
||||
"Your current task will be interrupted. "
|
||||
"Send any message after restart to resume where it left off."
|
||||
if self._restart_requested
|
||||
else "Your current task will be interrupted."
|
||||
)
|
||||
msg = f"⚠️ Gateway {action} — {hint}"
|
||||
|
||||
notified: set = set()
|
||||
for session_key in active:
|
||||
# Parse platform + chat_id from the session key.
|
||||
# Format: agent:main:{platform}:{chat_type}:{chat_id}[:{extra}...]
|
||||
parts = session_key.split(":")
|
||||
if len(parts) < 5:
|
||||
continue
|
||||
platform_str = parts[2]
|
||||
chat_id = parts[4]
|
||||
|
||||
# Deduplicate: one notification per chat, even if multiple
|
||||
# sessions (different users/threads) share the same chat.
|
||||
dedup_key = (platform_str, chat_id)
|
||||
if dedup_key in notified:
|
||||
continue
|
||||
|
||||
try:
|
||||
platform = Platform(platform_str)
|
||||
adapter = self.adapters.get(platform)
|
||||
if not adapter:
|
||||
continue
|
||||
|
||||
# Include thread_id if present so the message lands in the
|
||||
# correct forum topic / thread.
|
||||
thread_id = parts[5] if len(parts) > 5 else None
|
||||
metadata = {"thread_id": thread_id} if thread_id else None
|
||||
|
||||
await adapter.send(chat_id, msg, metadata=metadata)
|
||||
notified.add(dedup_key)
|
||||
logger.info(
|
||||
"Sent shutdown notification to %s:%s",
|
||||
platform_str, chat_id,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug(
|
||||
"Failed to send shutdown notification to %s:%s: %s",
|
||||
platform_str, chat_id, e,
|
||||
)
|
||||
|
||||
def _finalize_shutdown_agents(self, active_agents: Dict[str, Any]) -> None:
|
||||
for agent in active_agents.values():
|
||||
try:
|
||||
@@ -1416,6 +1580,106 @@ class GatewayRunner:
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
_STUCK_LOOP_THRESHOLD = 3 # restarts while active before auto-suspend
|
||||
_STUCK_LOOP_FILE = ".restart_failure_counts"
|
||||
|
||||
def _increment_restart_failure_counts(self, active_session_keys: set) -> None:
|
||||
"""Increment restart-failure counters for sessions active at shutdown.
|
||||
|
||||
Persists to a JSON file so counters survive across restarts.
|
||||
Sessions NOT in active_session_keys are removed (they completed
|
||||
successfully, so the loop is broken).
|
||||
"""
|
||||
import json
|
||||
|
||||
path = _hermes_home / self._STUCK_LOOP_FILE
|
||||
try:
|
||||
counts = json.loads(path.read_text()) if path.exists() else {}
|
||||
except Exception:
|
||||
counts = {}
|
||||
|
||||
# Increment active sessions, remove inactive ones (loop broken)
|
||||
new_counts = {}
|
||||
for key in active_session_keys:
|
||||
new_counts[key] = counts.get(key, 0) + 1
|
||||
# Keep any entries that are still above 0 even if not active now
|
||||
# (they might become active again next restart)
|
||||
|
||||
try:
|
||||
path.write_text(json.dumps(new_counts))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _suspend_stuck_loop_sessions(self) -> int:
|
||||
"""Suspend sessions that have been active across too many restarts.
|
||||
|
||||
Returns the number of sessions suspended. Called on gateway startup
|
||||
AFTER suspend_recently_active() to catch the stuck-loop pattern:
|
||||
session loads → agent gets stuck → gateway restarts → repeat.
|
||||
"""
|
||||
import json
|
||||
|
||||
path = _hermes_home / self._STUCK_LOOP_FILE
|
||||
if not path.exists():
|
||||
return 0
|
||||
|
||||
try:
|
||||
counts = json.loads(path.read_text())
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
suspended = 0
|
||||
stuck_keys = [k for k, v in counts.items() if v >= self._STUCK_LOOP_THRESHOLD]
|
||||
|
||||
for session_key in stuck_keys:
|
||||
try:
|
||||
entry = self.session_store._entries.get(session_key)
|
||||
if entry and not entry.suspended:
|
||||
entry.suspended = True
|
||||
suspended += 1
|
||||
logger.warning(
|
||||
"Auto-suspended stuck session %s (active across %d "
|
||||
"consecutive restarts — likely a stuck loop)",
|
||||
session_key[:30], counts[session_key],
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if suspended:
|
||||
try:
|
||||
self.session_store._save()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Clear the file — counters start fresh after suspension
|
||||
try:
|
||||
path.unlink(missing_ok=True)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return suspended
|
||||
|
||||
def _clear_restart_failure_count(self, session_key: str) -> None:
|
||||
"""Clear the restart-failure counter for a session that completed OK.
|
||||
|
||||
Called after a successful agent turn to signal the loop is broken.
|
||||
"""
|
||||
import json
|
||||
|
||||
path = _hermes_home / self._STUCK_LOOP_FILE
|
||||
if not path.exists():
|
||||
return
|
||||
try:
|
||||
counts = json.loads(path.read_text())
|
||||
if session_key in counts:
|
||||
del counts[session_key]
|
||||
if counts:
|
||||
path.write_text(json.dumps(counts))
|
||||
else:
|
||||
path.unlink(missing_ok=True)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def _launch_detached_restart_command(self) -> None:
|
||||
import shutil
|
||||
import subprocess
|
||||
@@ -1559,6 +1823,17 @@ class GatewayRunner:
|
||||
except Exception as e:
|
||||
logger.warning("Session suspension on startup failed: %s", e)
|
||||
|
||||
# Stuck-loop detection (#7536): if a session has been active across
|
||||
# 3+ consecutive restarts, it's probably stuck in a loop (the same
|
||||
# history keeps causing the agent to hang). Auto-suspend it so the
|
||||
# user gets a clean slate on the next message.
|
||||
try:
|
||||
stuck = self._suspend_stuck_loop_sessions()
|
||||
if stuck:
|
||||
logger.warning("Auto-suspended %d stuck-loop session(s)", stuck)
|
||||
except Exception as e:
|
||||
logger.debug("Stuck-loop detection failed: %s", e)
|
||||
|
||||
connected_count = 0
|
||||
enabled_platform_count = 0
|
||||
startup_nonretryable_errors: list[str] = []
|
||||
@@ -1945,6 +2220,13 @@ class GatewayRunner:
|
||||
build_channel_directory(self.adapters)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Flush any cron deliveries that were queued during the disconnect
|
||||
try:
|
||||
await self._flush_pending_cron_deliveries(platform)
|
||||
except Exception as flush_err:
|
||||
logger.warning("Error flushing pending deliveries for %s: %s",
|
||||
platform.value, flush_err)
|
||||
else:
|
||||
# Check if the failure is non-retryable
|
||||
if adapter.has_fatal_error and not adapter.fatal_error_retryable:
|
||||
@@ -2018,6 +2300,10 @@ class GatewayRunner:
|
||||
self._running = False
|
||||
self._draining = True
|
||||
|
||||
# Notify all chats with active agents BEFORE draining.
|
||||
# Adapters are still connected here, so messages can be sent.
|
||||
await self._notify_active_sessions_of_shutdown()
|
||||
|
||||
timeout = self._restart_drain_timeout
|
||||
active_agents, timed_out = await self._drain_active_agents(timeout)
|
||||
if timed_out:
|
||||
@@ -2088,12 +2374,31 @@ class GatewayRunner:
|
||||
|
||||
# Write a clean-shutdown marker so the next startup knows this
|
||||
# wasn't a crash. suspend_recently_active() only needs to run
|
||||
# after unexpected exits — graceful shutdowns already drain
|
||||
# active agents, so there's no stuck-session risk.
|
||||
try:
|
||||
(_hermes_home / ".clean_shutdown").touch()
|
||||
except Exception:
|
||||
pass
|
||||
# after unexpected exits. However, if the drain timed out and
|
||||
# agents were force-interrupted, their sessions may be in an
|
||||
# incomplete state (trailing tool response, no final assistant
|
||||
# message). Skip the marker in that case so the next startup
|
||||
# suspends those sessions — giving users a clean slate instead
|
||||
# of resuming a half-finished tool loop.
|
||||
if not timed_out:
|
||||
try:
|
||||
(_hermes_home / ".clean_shutdown").touch()
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
logger.info(
|
||||
"Skipping .clean_shutdown marker — drain timed out with "
|
||||
"interrupted agents; next startup will suspend recently "
|
||||
"active sessions."
|
||||
)
|
||||
|
||||
# Track sessions that were active at shutdown for stuck-loop
|
||||
# detection (#7536). On each restart, the counter increments
|
||||
# for sessions that were running. If a session hits the
|
||||
# threshold (3 consecutive restarts while active), the next
|
||||
# startup auto-suspends it — breaking the loop.
|
||||
if active_agents:
|
||||
self._increment_restart_failure_counts(set(active_agents.keys()))
|
||||
|
||||
if self._restart_requested and self._restart_via_service:
|
||||
self._exit_code = GATEWAY_SERVICE_RESTART_EXIT_CODE
|
||||
@@ -3593,6 +3898,12 @@ class GatewayRunner:
|
||||
_response_time, _api_calls, _resp_len,
|
||||
)
|
||||
|
||||
# Successful turn — clear any stuck-loop counter for this session.
|
||||
# This ensures the counter only accumulates across CONSECUTIVE
|
||||
# restarts where the session was active (never completed).
|
||||
if session_key:
|
||||
self._clear_restart_failure_count(session_key)
|
||||
|
||||
# Surface error details when the agent failed silently (final_response=None)
|
||||
if not response and agent_result.get("failed"):
|
||||
error_detail = agent_result.get("error", "unknown error")
|
||||
@@ -8376,6 +8687,21 @@ class GatewayRunner:
|
||||
if _msn:
|
||||
message = _msn + "\n\n" + message
|
||||
|
||||
# Auto-continue: if the loaded history ends with a tool result,
|
||||
# the previous agent turn was interrupted mid-work (gateway
|
||||
# restart, crash, SIGTERM). Prepend a system note so the model
|
||||
# finishes processing the pending tool results before addressing
|
||||
# the user's new message. (#4493)
|
||||
if agent_history and agent_history[-1].get("role") == "tool":
|
||||
message = (
|
||||
"[System note: Your previous turn was interrupted before you could "
|
||||
"process the last tool result(s). The conversation history contains "
|
||||
"tool outputs you haven't responded to yet. Please finish processing "
|
||||
"those results and summarize what was accomplished, then address the "
|
||||
"user's new message below.]\n\n"
|
||||
+ message
|
||||
)
|
||||
|
||||
_approval_session_key = session_key or ""
|
||||
_approval_session_token = set_current_session_key(_approval_session_key)
|
||||
register_gateway_notify(_approval_session_key, _approval_notify_sync)
|
||||
@@ -9019,7 +9345,7 @@ class GatewayRunner:
|
||||
return response
|
||||
|
||||
|
||||
def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, interval: int = 60):
|
||||
def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, interval: int = 60, pending_delivery_callback=None):
|
||||
"""
|
||||
Background thread that ticks the cron scheduler at a regular interval.
|
||||
|
||||
@@ -9029,6 +9355,9 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in
|
||||
When ``adapters`` and ``loop`` are provided, passes them through to the
|
||||
cron delivery path so live adapters can be used for E2EE rooms.
|
||||
|
||||
When ``pending_delivery_callback`` is provided, failed deliveries are
|
||||
queued for retry when the target platform reconnects.
|
||||
|
||||
Also refreshes the channel directory every 5 minutes and prunes the
|
||||
image/audio/document cache once per hour.
|
||||
"""
|
||||
@@ -9042,7 +9371,8 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in
|
||||
tick_count = 0
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
cron_tick(verbose=False, adapters=adapters, loop=loop)
|
||||
cron_tick(verbose=False, adapters=adapters, loop=loop,
|
||||
pending_delivery_callback=pending_delivery_callback)
|
||||
except Exception as e:
|
||||
logger.debug("Cron tick error: %s", e)
|
||||
|
||||
@@ -9187,8 +9517,41 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
|
||||
|
||||
runner = GatewayRunner(config)
|
||||
|
||||
# Track whether a signal initiated the shutdown (vs. internal request).
|
||||
# When an unexpected SIGTERM kills the gateway, we exit non-zero so
|
||||
# systemd's Restart=on-failure revives the process. systemctl stop
|
||||
# is safe: systemd tracks stop-requested state independently of exit
|
||||
# code, so Restart= never fires for a deliberate stop.
|
||||
_signal_initiated_shutdown = False
|
||||
|
||||
# Set up signal handlers
|
||||
def shutdown_signal_handler():
|
||||
nonlocal _signal_initiated_shutdown
|
||||
_signal_initiated_shutdown = True
|
||||
logger.info("Received SIGTERM/SIGINT — initiating shutdown")
|
||||
# Diagnostic: log all hermes-related processes so we can identify
|
||||
# what triggered the signal (hermes update, hermes gateway restart,
|
||||
# a stale detached subprocess, etc.).
|
||||
try:
|
||||
import subprocess as _sp
|
||||
_ps = _sp.run(
|
||||
["ps", "aux"],
|
||||
capture_output=True, text=True, timeout=3,
|
||||
)
|
||||
_hermes_procs = [
|
||||
line for line in _ps.stdout.splitlines()
|
||||
if ("hermes" in line.lower() or "gateway" in line.lower())
|
||||
and str(os.getpid()) not in line.split()[1:2] # exclude self
|
||||
]
|
||||
if _hermes_procs:
|
||||
logger.warning(
|
||||
"Shutdown diagnostic — other hermes processes running:\n %s",
|
||||
"\n ".join(_hermes_procs),
|
||||
)
|
||||
else:
|
||||
logger.info("Shutdown diagnostic — no other hermes processes found")
|
||||
except Exception:
|
||||
pass
|
||||
asyncio.create_task(runner.stop())
|
||||
|
||||
def restart_signal_handler():
|
||||
@@ -9230,7 +9593,11 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
|
||||
cron_thread = threading.Thread(
|
||||
target=_start_cron_ticker,
|
||||
args=(cron_stop,),
|
||||
kwargs={"adapters": runner.adapters, "loop": asyncio.get_running_loop()},
|
||||
kwargs={
|
||||
"adapters": runner.adapters,
|
||||
"loop": asyncio.get_running_loop(),
|
||||
"pending_delivery_callback": runner.queue_failed_cron_delivery,
|
||||
},
|
||||
daemon=True,
|
||||
name="cron-ticker",
|
||||
)
|
||||
@@ -9258,6 +9625,21 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
|
||||
if runner.exit_code is not None:
|
||||
raise SystemExit(runner.exit_code)
|
||||
|
||||
# When a signal (SIGTERM/SIGINT) caused the shutdown and it wasn't a
|
||||
# planned restart (/restart, /update, SIGUSR1), exit non-zero so
|
||||
# systemd's Restart=on-failure revives the process. This covers:
|
||||
# - hermes update killing the gateway mid-work
|
||||
# - External kill commands
|
||||
# - WSL2/container runtime sending unexpected signals
|
||||
# systemctl stop is safe: systemd tracks "stop requested" state
|
||||
# independently of exit code, so Restart= never fires for it.
|
||||
if _signal_initiated_shutdown and not runner._restart_requested:
|
||||
logger.info(
|
||||
"Exiting with code 1 (signal-initiated shutdown without restart "
|
||||
"request) so systemd Restart=on-failure can revive the gateway."
|
||||
)
|
||||
return False # → sys.exit(1) in the caller
|
||||
|
||||
return True
|
||||
|
||||
|
||||
|
||||
187
tests/gateway/test_pending_cron_delivery.py
Normal file
187
tests/gateway/test_pending_cron_delivery.py
Normal file
@@ -0,0 +1,187 @@
|
||||
"""Tests for pending cron delivery queue — retry on reconnect."""
|
||||
import asyncio
|
||||
import threading
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.config import Platform
|
||||
|
||||
|
||||
class TestPendingCronDeliveryQueue:
|
||||
"""Verify that failed cron deliveries are queued and flushed on reconnect."""
|
||||
|
||||
def _make_runner(self):
|
||||
"""Create a minimal GatewayRunner for testing pending deliveries."""
|
||||
from gateway.run import GatewayRunner
|
||||
|
||||
runner = object.__new__(GatewayRunner)
|
||||
runner._pending_cron_deliveries = []
|
||||
runner._pending_deliveries_lock = threading.Lock()
|
||||
runner.adapters = {}
|
||||
runner.queue_failed_cron_delivery = GatewayRunner.queue_failed_cron_delivery.__get__(runner, GatewayRunner)
|
||||
runner._flush_pending_cron_deliveries = GatewayRunner._flush_pending_cron_deliveries.__get__(runner, GatewayRunner)
|
||||
return runner
|
||||
|
||||
def test_queue_failed_delivery_adds_to_queue(self):
|
||||
runner = self._make_runner()
|
||||
assert len(runner._pending_cron_deliveries) == 0
|
||||
|
||||
runner.queue_failed_cron_delivery(
|
||||
platform_name="telegram", chat_id="12345", thread_id=None,
|
||||
content="test output", job_id="job-1", job_name="Test Job",
|
||||
)
|
||||
assert len(runner._pending_cron_deliveries) == 1
|
||||
entry = runner._pending_cron_deliveries[0]
|
||||
assert entry["platform"] == "telegram"
|
||||
assert entry["chat_id"] == "12345"
|
||||
assert entry["content"] == "test output"
|
||||
|
||||
def test_queue_preserves_thread_id(self):
|
||||
runner = self._make_runner()
|
||||
runner.queue_failed_cron_delivery(
|
||||
platform_name="telegram", chat_id="12345", thread_id="99",
|
||||
content="test", job_id="j1", job_name="Job",
|
||||
)
|
||||
assert runner._pending_cron_deliveries[0]["thread_id"] == "99"
|
||||
|
||||
def test_flush_removes_matching_platform_entries(self):
|
||||
runner = self._make_runner()
|
||||
runner.queue_failed_cron_delivery("telegram", "111", None, "msg1", "j1", "Job1")
|
||||
runner.queue_failed_cron_delivery("discord", "222", None, "msg2", "j2", "Job2")
|
||||
runner.queue_failed_cron_delivery("telegram", "333", None, "msg3", "j3", "Job3")
|
||||
|
||||
mock_adapter = AsyncMock()
|
||||
mock_adapter.send = AsyncMock(return_value=MagicMock(success=True))
|
||||
runner.adapters = {Platform.TELEGRAM: mock_adapter}
|
||||
|
||||
asyncio.get_event_loop().run_until_complete(
|
||||
runner._flush_pending_cron_deliveries(Platform.TELEGRAM)
|
||||
)
|
||||
assert len(runner._pending_cron_deliveries) == 1
|
||||
assert runner._pending_cron_deliveries[0]["platform"] == "discord"
|
||||
|
||||
def test_flush_calls_adapter_send_for_each_entry(self):
|
||||
runner = self._make_runner()
|
||||
runner.queue_failed_cron_delivery("telegram", "111", None, "msg1", "j1", "Job1")
|
||||
runner.queue_failed_cron_delivery("telegram", "222", "42", "msg2", "j2", "Job2")
|
||||
|
||||
mock_adapter = AsyncMock()
|
||||
mock_adapter.send = AsyncMock(return_value=MagicMock(success=True))
|
||||
runner.adapters = {Platform.TELEGRAM: mock_adapter}
|
||||
|
||||
asyncio.get_event_loop().run_until_complete(
|
||||
runner._flush_pending_cron_deliveries(Platform.TELEGRAM)
|
||||
)
|
||||
assert mock_adapter.send.call_count == 2
|
||||
|
||||
def test_flush_requeues_if_adapter_missing(self):
|
||||
runner = self._make_runner()
|
||||
runner.queue_failed_cron_delivery("telegram", "111", None, "msg1", "j1", "Job1")
|
||||
runner.adapters = {}
|
||||
|
||||
asyncio.get_event_loop().run_until_complete(
|
||||
runner._flush_pending_cron_deliveries(Platform.TELEGRAM)
|
||||
)
|
||||
assert len(runner._pending_cron_deliveries) == 1
|
||||
|
||||
def test_flush_skips_non_matching_platforms(self):
|
||||
runner = self._make_runner()
|
||||
runner.queue_failed_cron_delivery("discord", "222", None, "msg", "j1", "Job")
|
||||
runner.adapters = {Platform.TELEGRAM: AsyncMock()}
|
||||
|
||||
asyncio.get_event_loop().run_until_complete(
|
||||
runner._flush_pending_cron_deliveries(Platform.TELEGRAM)
|
||||
)
|
||||
assert len(runner._pending_cron_deliveries) == 1
|
||||
|
||||
def test_flush_passes_thread_id_in_metadata(self):
|
||||
runner = self._make_runner()
|
||||
runner.queue_failed_cron_delivery("telegram", "111", "42", "msg", "j1", "Job")
|
||||
|
||||
mock_adapter = AsyncMock()
|
||||
mock_adapter.send = AsyncMock(return_value=MagicMock(success=True))
|
||||
runner.adapters = {Platform.TELEGRAM: mock_adapter}
|
||||
|
||||
asyncio.get_event_loop().run_until_complete(
|
||||
runner._flush_pending_cron_deliveries(Platform.TELEGRAM)
|
||||
)
|
||||
call_kwargs = mock_adapter.send.call_args.kwargs
|
||||
assert call_kwargs["metadata"]["thread_id"] == "42"
|
||||
|
||||
|
||||
class TestDeliverResultPendingCallback:
|
||||
"""Verify _deliver_result calls pending_delivery_callback on failure."""
|
||||
|
||||
@pytest.fixture
|
||||
def mock_gateway_config(self):
|
||||
"""Create a mock gateway config with telegram platform enabled."""
|
||||
from gateway.config import Platform, GatewayConfig
|
||||
cfg = GatewayConfig()
|
||||
cfg.platforms = {Platform.TELEGRAM: MagicMock(enabled=True)}
|
||||
return cfg
|
||||
|
||||
def _make_job(self):
|
||||
return {
|
||||
"id": "job-1", "name": "Test Job",
|
||||
"deliver": "telegram:12345",
|
||||
"origin": {"platform": "telegram", "chat_id": "12345"},
|
||||
}
|
||||
|
||||
def test_callback_on_exception(self, mock_gateway_config):
|
||||
from cron.scheduler import _deliver_result
|
||||
callback = MagicMock()
|
||||
|
||||
with patch("cron.scheduler._resolve_delivery_target", return_value={
|
||||
"platform": "telegram", "chat_id": "12345", "thread_id": None
|
||||
}), \
|
||||
patch("gateway.config.load_gateway_config", return_value=mock_gateway_config), \
|
||||
patch("tools.send_message_tool._send_to_platform", side_effect=Exception("down")):
|
||||
|
||||
result = _deliver_result(self._make_job(), "test", pending_delivery_callback=callback)
|
||||
|
||||
assert result is not None
|
||||
callback.assert_called_once()
|
||||
|
||||
def test_callback_on_error_dict(self, mock_gateway_config):
|
||||
from cron.scheduler import _deliver_result
|
||||
callback = MagicMock()
|
||||
|
||||
with patch("cron.scheduler._resolve_delivery_target", return_value={
|
||||
"platform": "telegram", "chat_id": "12345", "thread_id": None
|
||||
}), \
|
||||
patch("gateway.config.load_gateway_config", return_value=mock_gateway_config), \
|
||||
patch("tools.send_message_tool._send_to_platform", return_value={"error": "down"}):
|
||||
|
||||
result = _deliver_result(self._make_job(), "test", pending_delivery_callback=callback)
|
||||
|
||||
assert result is not None
|
||||
callback.assert_called_once()
|
||||
|
||||
def test_no_callback_on_success(self, mock_gateway_config):
|
||||
from cron.scheduler import _deliver_result
|
||||
callback = MagicMock()
|
||||
|
||||
with patch("cron.scheduler._resolve_delivery_target", return_value={
|
||||
"platform": "telegram", "chat_id": "12345", "thread_id": None
|
||||
}), \
|
||||
patch("gateway.config.load_gateway_config", return_value=mock_gateway_config), \
|
||||
patch("tools.send_message_tool._send_to_platform", return_value={"ok": True}):
|
||||
|
||||
result = _deliver_result(self._make_job(), "test", pending_delivery_callback=callback)
|
||||
|
||||
assert result is None
|
||||
callback.assert_not_called()
|
||||
|
||||
def test_no_callback_no_crash(self, mock_gateway_config):
|
||||
from cron.scheduler import _deliver_result
|
||||
|
||||
with patch("cron.scheduler._resolve_delivery_target", return_value={
|
||||
"platform": "telegram", "chat_id": "12345", "thread_id": None
|
||||
}), \
|
||||
patch("gateway.config.load_gateway_config", return_value=mock_gateway_config), \
|
||||
patch("tools.send_message_tool._send_to_platform", side_effect=Exception("down")):
|
||||
|
||||
result = _deliver_result(self._make_job(), "test")
|
||||
|
||||
assert result is not None # error, no crash
|
||||
Reference in New Issue
Block a user