Compare commits
3 Commits
fix/format
...
fix/744-ga
| Author | SHA1 | Date | |
|---|---|---|---|
| ed936198ff | |||
| 50e7301ee9 | |||
| 9c4abb4992 |
@@ -197,7 +197,7 @@ def _send_media_via_adapter(adapter, chat_id: str, media_files: list, metadata:
|
|||||||
logger.warning("Job '%s': failed to send media %s: %s", job.get("id", "?"), media_path, e)
|
logger.warning("Job '%s': failed to send media %s: %s", job.get("id", "?"), media_path, e)
|
||||||
|
|
||||||
|
|
||||||
def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Optional[str]:
|
def _deliver_result(job: dict, content: str, adapters=None, loop=None, pending_delivery_callback=None) -> Optional[str]:
|
||||||
"""
|
"""
|
||||||
Deliver job output to the configured target (origin chat, specific platform, etc.).
|
Deliver job output to the configured target (origin chat, specific platform, etc.).
|
||||||
|
|
||||||
@@ -206,6 +206,10 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
|
|||||||
the standalone HTTP path cannot encrypt. Falls back to standalone send if
|
the standalone HTTP path cannot encrypt. Falls back to standalone send if
|
||||||
the adapter path fails or is unavailable.
|
the adapter path fails or is unavailable.
|
||||||
|
|
||||||
|
When ``pending_delivery_callback`` is provided and delivery fails due to
|
||||||
|
the platform being unavailable, the delivery is queued for retry when the
|
||||||
|
platform reconnects instead of being silently dropped.
|
||||||
|
|
||||||
Returns None on success, or an error string on failure.
|
Returns None on success, or an error string on failure.
|
||||||
"""
|
"""
|
||||||
target = _resolve_delivery_target(job)
|
target = _resolve_delivery_target(job)
|
||||||
@@ -354,11 +358,29 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
msg = f"delivery to {platform_name}:{chat_id} failed: {e}"
|
msg = f"delivery to {platform_name}:{chat_id} failed: {e}"
|
||||||
logger.error("Job '%s': %s", job["id"], msg)
|
logger.error("Job '%s': %s", job["id"], msg)
|
||||||
|
# Queue for retry if callback provided
|
||||||
|
if pending_delivery_callback:
|
||||||
|
try:
|
||||||
|
pending_delivery_callback(
|
||||||
|
platform_name, chat_id, thread_id,
|
||||||
|
delivery_content, job["id"], job.get("name", job["id"]),
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
return msg
|
return msg
|
||||||
|
|
||||||
if result and result.get("error"):
|
if result and result.get("error"):
|
||||||
msg = f"delivery error: {result['error']}"
|
msg = f"delivery error: {result['error']}"
|
||||||
logger.error("Job '%s': %s", job["id"], msg)
|
logger.error("Job '%s': %s", job["id"], msg)
|
||||||
|
# Queue for retry if callback provided
|
||||||
|
if pending_delivery_callback:
|
||||||
|
try:
|
||||||
|
pending_delivery_callback(
|
||||||
|
platform_name, chat_id, thread_id,
|
||||||
|
delivery_content, job["id"], job.get("name", job["id"]),
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
return msg
|
return msg
|
||||||
|
|
||||||
logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id)
|
logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id)
|
||||||
@@ -896,7 +918,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
|||||||
logger.debug("Job '%s': failed to close SQLite session store: %s", job_id, e)
|
logger.debug("Job '%s': failed to close SQLite session store: %s", job_id, e)
|
||||||
|
|
||||||
|
|
||||||
def tick(verbose: bool = True, adapters=None, loop=None) -> int:
|
def tick(verbose: bool = True, adapters=None, loop=None, pending_delivery_callback=None) -> int:
|
||||||
"""
|
"""
|
||||||
Check and run all due jobs.
|
Check and run all due jobs.
|
||||||
|
|
||||||
@@ -907,6 +929,9 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
|
|||||||
verbose: Whether to print status messages
|
verbose: Whether to print status messages
|
||||||
adapters: Optional dict mapping Platform → live adapter (from gateway)
|
adapters: Optional dict mapping Platform → live adapter (from gateway)
|
||||||
loop: Optional asyncio event loop (from gateway) for live adapter sends
|
loop: Optional asyncio event loop (from gateway) for live adapter sends
|
||||||
|
pending_delivery_callback: Optional callback to queue failed deliveries
|
||||||
|
for retry when a platform reconnects. Signature:
|
||||||
|
(platform_name, chat_id, thread_id, content, job_id, job_name) -> None
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Number of jobs executed (0 if another tick is already running)
|
Number of jobs executed (0 if another tick is already running)
|
||||||
@@ -964,7 +989,11 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
|
|||||||
delivery_error = None
|
delivery_error = None
|
||||||
if should_deliver:
|
if should_deliver:
|
||||||
try:
|
try:
|
||||||
delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop)
|
delivery_error = _deliver_result(
|
||||||
|
job, deliver_content,
|
||||||
|
adapters=adapters, loop=loop,
|
||||||
|
pending_delivery_callback=pending_delivery_callback,
|
||||||
|
)
|
||||||
except Exception as de:
|
except Exception as de:
|
||||||
delivery_error = str(de)
|
delivery_error = str(de)
|
||||||
logger.error("Delivery failed for job %s: %s", job["id"], de)
|
logger.error("Delivery failed for job %s: %s", job["id"], de)
|
||||||
|
|||||||
392
gateway/run.py
392
gateway/run.py
@@ -594,6 +594,14 @@ class GatewayRunner:
|
|||||||
# Key: Platform enum, Value: {"config": platform_config, "attempts": int, "next_retry": float}
|
# Key: Platform enum, Value: {"config": platform_config, "attempts": int, "next_retry": float}
|
||||||
self._failed_platforms: Dict[Platform, Dict[str, Any]] = {}
|
self._failed_platforms: Dict[Platform, Dict[str, Any]] = {}
|
||||||
|
|
||||||
|
# Pending cron deliveries that failed during platform disconnect.
|
||||||
|
# Each entry: {"platform": str, "chat_id": str, "thread_id": str|None,
|
||||||
|
# "content": str, "job_id": str, "job_name": str, "timestamp": float}
|
||||||
|
# Flushed when the target platform reconnects.
|
||||||
|
import threading as _threading2
|
||||||
|
self._pending_cron_deliveries: List[Dict[str, Any]] = []
|
||||||
|
self._pending_deliveries_lock = _threading2.Lock()
|
||||||
|
|
||||||
# Track pending /update prompt responses per session.
|
# Track pending /update prompt responses per session.
|
||||||
# Key: session_key, Value: True when a prompt is waiting for user input.
|
# Key: session_key, Value: True when a prompt is waiting for user input.
|
||||||
self._update_prompt_pending: Dict[str, bool] = {}
|
self._update_prompt_pending: Dict[str, bool] = {}
|
||||||
@@ -1021,6 +1029,103 @@ class GatewayRunner:
|
|||||||
self._exit_reason = reason
|
self._exit_reason = reason
|
||||||
self._shutdown_event.set()
|
self._shutdown_event.set()
|
||||||
|
|
||||||
|
def queue_failed_cron_delivery(
|
||||||
|
self,
|
||||||
|
platform_name: str,
|
||||||
|
chat_id: str,
|
||||||
|
thread_id: Optional[str],
|
||||||
|
content: str,
|
||||||
|
job_id: str,
|
||||||
|
job_name: str,
|
||||||
|
) -> None:
|
||||||
|
"""Queue a failed cron delivery for retry when the platform reconnects.
|
||||||
|
|
||||||
|
Called by cron/scheduler._deliver_result when live adapter delivery fails
|
||||||
|
and the platform is in a known-disconnected state. The delivery will be
|
||||||
|
retried when _flush_pending_cron_deliveries is called after reconnect.
|
||||||
|
"""
|
||||||
|
import time as _time
|
||||||
|
entry = {
|
||||||
|
"platform": platform_name,
|
||||||
|
"chat_id": chat_id,
|
||||||
|
"thread_id": thread_id,
|
||||||
|
"content": content,
|
||||||
|
"job_id": job_id,
|
||||||
|
"job_name": job_name,
|
||||||
|
"timestamp": _time.time(),
|
||||||
|
}
|
||||||
|
with self._pending_deliveries_lock:
|
||||||
|
self._pending_cron_deliveries.append(entry)
|
||||||
|
queue_len = len(self._pending_cron_deliveries)
|
||||||
|
logger.info(
|
||||||
|
"Queued failed cron delivery for %s:%s (job=%s, queue=%d)",
|
||||||
|
platform_name, chat_id, job_id, queue_len,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _flush_pending_cron_deliveries(self, platform: "Platform") -> None:
|
||||||
|
"""Retry queued cron deliveries for a platform that just reconnected.
|
||||||
|
|
||||||
|
Called after a successful platform reconnect. Delivers each pending
|
||||||
|
message via the now-available live adapter, with a best-effort approach
|
||||||
|
(individual failures are logged but don't block other deliveries).
|
||||||
|
"""
|
||||||
|
|
||||||
|
platform_name = platform.value
|
||||||
|
with self._pending_deliveries_lock:
|
||||||
|
# Split into matching and non-matching
|
||||||
|
matching = [e for e in self._pending_cron_deliveries if e["platform"] == platform_name]
|
||||||
|
remaining = [e for e in self._pending_cron_deliveries if e["platform"] != platform_name]
|
||||||
|
self._pending_cron_deliveries = remaining
|
||||||
|
|
||||||
|
if not matching:
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"Flushing %d pending cron deliveries for reconnected %s",
|
||||||
|
len(matching), platform_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
adapter = self.adapters.get(platform)
|
||||||
|
if not adapter:
|
||||||
|
logger.warning(
|
||||||
|
"Cannot flush %d deliveries: %s adapter not in self.adapters after reconnect?",
|
||||||
|
len(matching), platform_name,
|
||||||
|
)
|
||||||
|
# Re-queue them
|
||||||
|
with self._pending_deliveries_lock:
|
||||||
|
self._pending_cron_deliveries.extend(matching)
|
||||||
|
return
|
||||||
|
|
||||||
|
for entry in matching:
|
||||||
|
try:
|
||||||
|
chat_id = entry["chat_id"]
|
||||||
|
content = entry["content"]
|
||||||
|
metadata = {}
|
||||||
|
if entry.get("thread_id"):
|
||||||
|
metadata["thread_id"] = entry["thread_id"]
|
||||||
|
|
||||||
|
# Truncate if needed (mirror delivery.py logic)
|
||||||
|
if len(content) > 4000:
|
||||||
|
content = content[:3800] + "\n\n... [truncated, was queued during disconnect]"
|
||||||
|
|
||||||
|
result = await adapter.send(chat_id, content, metadata=metadata or None)
|
||||||
|
if result and not getattr(result, "success", True):
|
||||||
|
logger.warning(
|
||||||
|
"Pending delivery flush failed for %s:%s (job=%s): %s",
|
||||||
|
platform_name, chat_id, entry.get("job_id"),
|
||||||
|
getattr(result, "error", "unknown"),
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.info(
|
||||||
|
"Flushed pending cron delivery to %s:%s (job=%s)",
|
||||||
|
platform_name, chat_id, entry.get("job_id"),
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(
|
||||||
|
"Failed to flush pending delivery to %s:%s (job=%s): %s",
|
||||||
|
platform_name, entry.get("chat_id"), entry.get("job_id"), e,
|
||||||
|
)
|
||||||
|
|
||||||
def _running_agent_count(self) -> int:
|
def _running_agent_count(self) -> int:
|
||||||
return len(self._running_agents)
|
return len(self._running_agents)
|
||||||
|
|
||||||
@@ -1391,6 +1496,65 @@ class GatewayRunner:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug("Failed interrupting agent during shutdown: %s", e)
|
logger.debug("Failed interrupting agent during shutdown: %s", e)
|
||||||
|
|
||||||
|
async def _notify_active_sessions_of_shutdown(self) -> None:
|
||||||
|
"""Send a notification to every chat with an active agent.
|
||||||
|
|
||||||
|
Called at the very start of stop() — adapters are still connected so
|
||||||
|
messages can be delivered. Best-effort: individual send failures are
|
||||||
|
logged and swallowed so they never block the shutdown sequence.
|
||||||
|
"""
|
||||||
|
active = self._snapshot_running_agents()
|
||||||
|
if not active:
|
||||||
|
return
|
||||||
|
|
||||||
|
action = "restarting" if self._restart_requested else "shutting down"
|
||||||
|
hint = (
|
||||||
|
"Your current task will be interrupted. "
|
||||||
|
"Send any message after restart to resume where it left off."
|
||||||
|
if self._restart_requested
|
||||||
|
else "Your current task will be interrupted."
|
||||||
|
)
|
||||||
|
msg = f"⚠️ Gateway {action} — {hint}"
|
||||||
|
|
||||||
|
notified: set = set()
|
||||||
|
for session_key in active:
|
||||||
|
# Parse platform + chat_id from the session key.
|
||||||
|
# Format: agent:main:{platform}:{chat_type}:{chat_id}[:{extra}...]
|
||||||
|
parts = session_key.split(":")
|
||||||
|
if len(parts) < 5:
|
||||||
|
continue
|
||||||
|
platform_str = parts[2]
|
||||||
|
chat_id = parts[4]
|
||||||
|
|
||||||
|
# Deduplicate: one notification per chat, even if multiple
|
||||||
|
# sessions (different users/threads) share the same chat.
|
||||||
|
dedup_key = (platform_str, chat_id)
|
||||||
|
if dedup_key in notified:
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
platform = Platform(platform_str)
|
||||||
|
adapter = self.adapters.get(platform)
|
||||||
|
if not adapter:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Include thread_id if present so the message lands in the
|
||||||
|
# correct forum topic / thread.
|
||||||
|
thread_id = parts[5] if len(parts) > 5 else None
|
||||||
|
metadata = {"thread_id": thread_id} if thread_id else None
|
||||||
|
|
||||||
|
await adapter.send(chat_id, msg, metadata=metadata)
|
||||||
|
notified.add(dedup_key)
|
||||||
|
logger.info(
|
||||||
|
"Sent shutdown notification to %s:%s",
|
||||||
|
platform_str, chat_id,
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(
|
||||||
|
"Failed to send shutdown notification to %s:%s: %s",
|
||||||
|
platform_str, chat_id, e,
|
||||||
|
)
|
||||||
|
|
||||||
def _finalize_shutdown_agents(self, active_agents: Dict[str, Any]) -> None:
|
def _finalize_shutdown_agents(self, active_agents: Dict[str, Any]) -> None:
|
||||||
for agent in active_agents.values():
|
for agent in active_agents.values():
|
||||||
try:
|
try:
|
||||||
@@ -1416,6 +1580,106 @@ class GatewayRunner:
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
_STUCK_LOOP_THRESHOLD = 3 # restarts while active before auto-suspend
|
||||||
|
_STUCK_LOOP_FILE = ".restart_failure_counts"
|
||||||
|
|
||||||
|
def _increment_restart_failure_counts(self, active_session_keys: set) -> None:
|
||||||
|
"""Increment restart-failure counters for sessions active at shutdown.
|
||||||
|
|
||||||
|
Persists to a JSON file so counters survive across restarts.
|
||||||
|
Sessions NOT in active_session_keys are removed (they completed
|
||||||
|
successfully, so the loop is broken).
|
||||||
|
"""
|
||||||
|
import json
|
||||||
|
|
||||||
|
path = _hermes_home / self._STUCK_LOOP_FILE
|
||||||
|
try:
|
||||||
|
counts = json.loads(path.read_text()) if path.exists() else {}
|
||||||
|
except Exception:
|
||||||
|
counts = {}
|
||||||
|
|
||||||
|
# Increment active sessions, remove inactive ones (loop broken)
|
||||||
|
new_counts = {}
|
||||||
|
for key in active_session_keys:
|
||||||
|
new_counts[key] = counts.get(key, 0) + 1
|
||||||
|
# Keep any entries that are still above 0 even if not active now
|
||||||
|
# (they might become active again next restart)
|
||||||
|
|
||||||
|
try:
|
||||||
|
path.write_text(json.dumps(new_counts))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _suspend_stuck_loop_sessions(self) -> int:
|
||||||
|
"""Suspend sessions that have been active across too many restarts.
|
||||||
|
|
||||||
|
Returns the number of sessions suspended. Called on gateway startup
|
||||||
|
AFTER suspend_recently_active() to catch the stuck-loop pattern:
|
||||||
|
session loads → agent gets stuck → gateway restarts → repeat.
|
||||||
|
"""
|
||||||
|
import json
|
||||||
|
|
||||||
|
path = _hermes_home / self._STUCK_LOOP_FILE
|
||||||
|
if not path.exists():
|
||||||
|
return 0
|
||||||
|
|
||||||
|
try:
|
||||||
|
counts = json.loads(path.read_text())
|
||||||
|
except Exception:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
suspended = 0
|
||||||
|
stuck_keys = [k for k, v in counts.items() if v >= self._STUCK_LOOP_THRESHOLD]
|
||||||
|
|
||||||
|
for session_key in stuck_keys:
|
||||||
|
try:
|
||||||
|
entry = self.session_store._entries.get(session_key)
|
||||||
|
if entry and not entry.suspended:
|
||||||
|
entry.suspended = True
|
||||||
|
suspended += 1
|
||||||
|
logger.warning(
|
||||||
|
"Auto-suspended stuck session %s (active across %d "
|
||||||
|
"consecutive restarts — likely a stuck loop)",
|
||||||
|
session_key[:30], counts[session_key],
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if suspended:
|
||||||
|
try:
|
||||||
|
self.session_store._save()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Clear the file — counters start fresh after suspension
|
||||||
|
try:
|
||||||
|
path.unlink(missing_ok=True)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return suspended
|
||||||
|
|
||||||
|
def _clear_restart_failure_count(self, session_key: str) -> None:
|
||||||
|
"""Clear the restart-failure counter for a session that completed OK.
|
||||||
|
|
||||||
|
Called after a successful agent turn to signal the loop is broken.
|
||||||
|
"""
|
||||||
|
import json
|
||||||
|
|
||||||
|
path = _hermes_home / self._STUCK_LOOP_FILE
|
||||||
|
if not path.exists():
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
counts = json.loads(path.read_text())
|
||||||
|
if session_key in counts:
|
||||||
|
del counts[session_key]
|
||||||
|
if counts:
|
||||||
|
path.write_text(json.dumps(counts))
|
||||||
|
else:
|
||||||
|
path.unlink(missing_ok=True)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
async def _launch_detached_restart_command(self) -> None:
|
async def _launch_detached_restart_command(self) -> None:
|
||||||
import shutil
|
import shutil
|
||||||
import subprocess
|
import subprocess
|
||||||
@@ -1559,6 +1823,17 @@ class GatewayRunner:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("Session suspension on startup failed: %s", e)
|
logger.warning("Session suspension on startup failed: %s", e)
|
||||||
|
|
||||||
|
# Stuck-loop detection (#7536): if a session has been active across
|
||||||
|
# 3+ consecutive restarts, it's probably stuck in a loop (the same
|
||||||
|
# history keeps causing the agent to hang). Auto-suspend it so the
|
||||||
|
# user gets a clean slate on the next message.
|
||||||
|
try:
|
||||||
|
stuck = self._suspend_stuck_loop_sessions()
|
||||||
|
if stuck:
|
||||||
|
logger.warning("Auto-suspended %d stuck-loop session(s)", stuck)
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug("Stuck-loop detection failed: %s", e)
|
||||||
|
|
||||||
connected_count = 0
|
connected_count = 0
|
||||||
enabled_platform_count = 0
|
enabled_platform_count = 0
|
||||||
startup_nonretryable_errors: list[str] = []
|
startup_nonretryable_errors: list[str] = []
|
||||||
@@ -1945,6 +2220,13 @@ class GatewayRunner:
|
|||||||
build_channel_directory(self.adapters)
|
build_channel_directory(self.adapters)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
# Flush any cron deliveries that were queued during the disconnect
|
||||||
|
try:
|
||||||
|
await self._flush_pending_cron_deliveries(platform)
|
||||||
|
except Exception as flush_err:
|
||||||
|
logger.warning("Error flushing pending deliveries for %s: %s",
|
||||||
|
platform.value, flush_err)
|
||||||
else:
|
else:
|
||||||
# Check if the failure is non-retryable
|
# Check if the failure is non-retryable
|
||||||
if adapter.has_fatal_error and not adapter.fatal_error_retryable:
|
if adapter.has_fatal_error and not adapter.fatal_error_retryable:
|
||||||
@@ -2018,6 +2300,10 @@ class GatewayRunner:
|
|||||||
self._running = False
|
self._running = False
|
||||||
self._draining = True
|
self._draining = True
|
||||||
|
|
||||||
|
# Notify all chats with active agents BEFORE draining.
|
||||||
|
# Adapters are still connected here, so messages can be sent.
|
||||||
|
await self._notify_active_sessions_of_shutdown()
|
||||||
|
|
||||||
timeout = self._restart_drain_timeout
|
timeout = self._restart_drain_timeout
|
||||||
active_agents, timed_out = await self._drain_active_agents(timeout)
|
active_agents, timed_out = await self._drain_active_agents(timeout)
|
||||||
if timed_out:
|
if timed_out:
|
||||||
@@ -2088,12 +2374,31 @@ class GatewayRunner:
|
|||||||
|
|
||||||
# Write a clean-shutdown marker so the next startup knows this
|
# Write a clean-shutdown marker so the next startup knows this
|
||||||
# wasn't a crash. suspend_recently_active() only needs to run
|
# wasn't a crash. suspend_recently_active() only needs to run
|
||||||
# after unexpected exits — graceful shutdowns already drain
|
# after unexpected exits. However, if the drain timed out and
|
||||||
# active agents, so there's no stuck-session risk.
|
# agents were force-interrupted, their sessions may be in an
|
||||||
|
# incomplete state (trailing tool response, no final assistant
|
||||||
|
# message). Skip the marker in that case so the next startup
|
||||||
|
# suspends those sessions — giving users a clean slate instead
|
||||||
|
# of resuming a half-finished tool loop.
|
||||||
|
if not timed_out:
|
||||||
try:
|
try:
|
||||||
(_hermes_home / ".clean_shutdown").touch()
|
(_hermes_home / ".clean_shutdown").touch()
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
else:
|
||||||
|
logger.info(
|
||||||
|
"Skipping .clean_shutdown marker — drain timed out with "
|
||||||
|
"interrupted agents; next startup will suspend recently "
|
||||||
|
"active sessions."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Track sessions that were active at shutdown for stuck-loop
|
||||||
|
# detection (#7536). On each restart, the counter increments
|
||||||
|
# for sessions that were running. If a session hits the
|
||||||
|
# threshold (3 consecutive restarts while active), the next
|
||||||
|
# startup auto-suspends it — breaking the loop.
|
||||||
|
if active_agents:
|
||||||
|
self._increment_restart_failure_counts(set(active_agents.keys()))
|
||||||
|
|
||||||
if self._restart_requested and self._restart_via_service:
|
if self._restart_requested and self._restart_via_service:
|
||||||
self._exit_code = GATEWAY_SERVICE_RESTART_EXIT_CODE
|
self._exit_code = GATEWAY_SERVICE_RESTART_EXIT_CODE
|
||||||
@@ -3593,6 +3898,12 @@ class GatewayRunner:
|
|||||||
_response_time, _api_calls, _resp_len,
|
_response_time, _api_calls, _resp_len,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Successful turn — clear any stuck-loop counter for this session.
|
||||||
|
# This ensures the counter only accumulates across CONSECUTIVE
|
||||||
|
# restarts where the session was active (never completed).
|
||||||
|
if session_key:
|
||||||
|
self._clear_restart_failure_count(session_key)
|
||||||
|
|
||||||
# Surface error details when the agent failed silently (final_response=None)
|
# Surface error details when the agent failed silently (final_response=None)
|
||||||
if not response and agent_result.get("failed"):
|
if not response and agent_result.get("failed"):
|
||||||
error_detail = agent_result.get("error", "unknown error")
|
error_detail = agent_result.get("error", "unknown error")
|
||||||
@@ -8376,6 +8687,21 @@ class GatewayRunner:
|
|||||||
if _msn:
|
if _msn:
|
||||||
message = _msn + "\n\n" + message
|
message = _msn + "\n\n" + message
|
||||||
|
|
||||||
|
# Auto-continue: if the loaded history ends with a tool result,
|
||||||
|
# the previous agent turn was interrupted mid-work (gateway
|
||||||
|
# restart, crash, SIGTERM). Prepend a system note so the model
|
||||||
|
# finishes processing the pending tool results before addressing
|
||||||
|
# the user's new message. (#4493)
|
||||||
|
if agent_history and agent_history[-1].get("role") == "tool":
|
||||||
|
message = (
|
||||||
|
"[System note: Your previous turn was interrupted before you could "
|
||||||
|
"process the last tool result(s). The conversation history contains "
|
||||||
|
"tool outputs you haven't responded to yet. Please finish processing "
|
||||||
|
"those results and summarize what was accomplished, then address the "
|
||||||
|
"user's new message below.]\n\n"
|
||||||
|
+ message
|
||||||
|
)
|
||||||
|
|
||||||
_approval_session_key = session_key or ""
|
_approval_session_key = session_key or ""
|
||||||
_approval_session_token = set_current_session_key(_approval_session_key)
|
_approval_session_token = set_current_session_key(_approval_session_key)
|
||||||
register_gateway_notify(_approval_session_key, _approval_notify_sync)
|
register_gateway_notify(_approval_session_key, _approval_notify_sync)
|
||||||
@@ -9019,7 +9345,7 @@ class GatewayRunner:
|
|||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, interval: int = 60):
|
def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, interval: int = 60, pending_delivery_callback=None):
|
||||||
"""
|
"""
|
||||||
Background thread that ticks the cron scheduler at a regular interval.
|
Background thread that ticks the cron scheduler at a regular interval.
|
||||||
|
|
||||||
@@ -9029,6 +9355,9 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in
|
|||||||
When ``adapters`` and ``loop`` are provided, passes them through to the
|
When ``adapters`` and ``loop`` are provided, passes them through to the
|
||||||
cron delivery path so live adapters can be used for E2EE rooms.
|
cron delivery path so live adapters can be used for E2EE rooms.
|
||||||
|
|
||||||
|
When ``pending_delivery_callback`` is provided, failed deliveries are
|
||||||
|
queued for retry when the target platform reconnects.
|
||||||
|
|
||||||
Also refreshes the channel directory every 5 minutes and prunes the
|
Also refreshes the channel directory every 5 minutes and prunes the
|
||||||
image/audio/document cache once per hour.
|
image/audio/document cache once per hour.
|
||||||
"""
|
"""
|
||||||
@@ -9042,7 +9371,8 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in
|
|||||||
tick_count = 0
|
tick_count = 0
|
||||||
while not stop_event.is_set():
|
while not stop_event.is_set():
|
||||||
try:
|
try:
|
||||||
cron_tick(verbose=False, adapters=adapters, loop=loop)
|
cron_tick(verbose=False, adapters=adapters, loop=loop,
|
||||||
|
pending_delivery_callback=pending_delivery_callback)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug("Cron tick error: %s", e)
|
logger.debug("Cron tick error: %s", e)
|
||||||
|
|
||||||
@@ -9187,8 +9517,41 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
|
|||||||
|
|
||||||
runner = GatewayRunner(config)
|
runner = GatewayRunner(config)
|
||||||
|
|
||||||
|
# Track whether a signal initiated the shutdown (vs. internal request).
|
||||||
|
# When an unexpected SIGTERM kills the gateway, we exit non-zero so
|
||||||
|
# systemd's Restart=on-failure revives the process. systemctl stop
|
||||||
|
# is safe: systemd tracks stop-requested state independently of exit
|
||||||
|
# code, so Restart= never fires for a deliberate stop.
|
||||||
|
_signal_initiated_shutdown = False
|
||||||
|
|
||||||
# Set up signal handlers
|
# Set up signal handlers
|
||||||
def shutdown_signal_handler():
|
def shutdown_signal_handler():
|
||||||
|
nonlocal _signal_initiated_shutdown
|
||||||
|
_signal_initiated_shutdown = True
|
||||||
|
logger.info("Received SIGTERM/SIGINT — initiating shutdown")
|
||||||
|
# Diagnostic: log all hermes-related processes so we can identify
|
||||||
|
# what triggered the signal (hermes update, hermes gateway restart,
|
||||||
|
# a stale detached subprocess, etc.).
|
||||||
|
try:
|
||||||
|
import subprocess as _sp
|
||||||
|
_ps = _sp.run(
|
||||||
|
["ps", "aux"],
|
||||||
|
capture_output=True, text=True, timeout=3,
|
||||||
|
)
|
||||||
|
_hermes_procs = [
|
||||||
|
line for line in _ps.stdout.splitlines()
|
||||||
|
if ("hermes" in line.lower() or "gateway" in line.lower())
|
||||||
|
and str(os.getpid()) not in line.split()[1:2] # exclude self
|
||||||
|
]
|
||||||
|
if _hermes_procs:
|
||||||
|
logger.warning(
|
||||||
|
"Shutdown diagnostic — other hermes processes running:\n %s",
|
||||||
|
"\n ".join(_hermes_procs),
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.info("Shutdown diagnostic — no other hermes processes found")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
asyncio.create_task(runner.stop())
|
asyncio.create_task(runner.stop())
|
||||||
|
|
||||||
def restart_signal_handler():
|
def restart_signal_handler():
|
||||||
@@ -9230,7 +9593,11 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
|
|||||||
cron_thread = threading.Thread(
|
cron_thread = threading.Thread(
|
||||||
target=_start_cron_ticker,
|
target=_start_cron_ticker,
|
||||||
args=(cron_stop,),
|
args=(cron_stop,),
|
||||||
kwargs={"adapters": runner.adapters, "loop": asyncio.get_running_loop()},
|
kwargs={
|
||||||
|
"adapters": runner.adapters,
|
||||||
|
"loop": asyncio.get_running_loop(),
|
||||||
|
"pending_delivery_callback": runner.queue_failed_cron_delivery,
|
||||||
|
},
|
||||||
daemon=True,
|
daemon=True,
|
||||||
name="cron-ticker",
|
name="cron-ticker",
|
||||||
)
|
)
|
||||||
@@ -9258,6 +9625,21 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
|
|||||||
if runner.exit_code is not None:
|
if runner.exit_code is not None:
|
||||||
raise SystemExit(runner.exit_code)
|
raise SystemExit(runner.exit_code)
|
||||||
|
|
||||||
|
# When a signal (SIGTERM/SIGINT) caused the shutdown and it wasn't a
|
||||||
|
# planned restart (/restart, /update, SIGUSR1), exit non-zero so
|
||||||
|
# systemd's Restart=on-failure revives the process. This covers:
|
||||||
|
# - hermes update killing the gateway mid-work
|
||||||
|
# - External kill commands
|
||||||
|
# - WSL2/container runtime sending unexpected signals
|
||||||
|
# systemctl stop is safe: systemd tracks "stop requested" state
|
||||||
|
# independently of exit code, so Restart= never fires for it.
|
||||||
|
if _signal_initiated_shutdown and not runner._restart_requested:
|
||||||
|
logger.info(
|
||||||
|
"Exiting with code 1 (signal-initiated shutdown without restart "
|
||||||
|
"request) so systemd Restart=on-failure can revive the gateway."
|
||||||
|
)
|
||||||
|
return False # → sys.exit(1) in the caller
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
187
tests/gateway/test_pending_cron_delivery.py
Normal file
187
tests/gateway/test_pending_cron_delivery.py
Normal file
@@ -0,0 +1,187 @@
|
|||||||
|
"""Tests for pending cron delivery queue — retry on reconnect."""
|
||||||
|
import asyncio
|
||||||
|
import threading
|
||||||
|
from unittest.mock import AsyncMock, MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from gateway.config import Platform
|
||||||
|
|
||||||
|
|
||||||
|
class TestPendingCronDeliveryQueue:
|
||||||
|
"""Verify that failed cron deliveries are queued and flushed on reconnect."""
|
||||||
|
|
||||||
|
def _make_runner(self):
|
||||||
|
"""Create a minimal GatewayRunner for testing pending deliveries."""
|
||||||
|
from gateway.run import GatewayRunner
|
||||||
|
|
||||||
|
runner = object.__new__(GatewayRunner)
|
||||||
|
runner._pending_cron_deliveries = []
|
||||||
|
runner._pending_deliveries_lock = threading.Lock()
|
||||||
|
runner.adapters = {}
|
||||||
|
runner.queue_failed_cron_delivery = GatewayRunner.queue_failed_cron_delivery.__get__(runner, GatewayRunner)
|
||||||
|
runner._flush_pending_cron_deliveries = GatewayRunner._flush_pending_cron_deliveries.__get__(runner, GatewayRunner)
|
||||||
|
return runner
|
||||||
|
|
||||||
|
def test_queue_failed_delivery_adds_to_queue(self):
|
||||||
|
runner = self._make_runner()
|
||||||
|
assert len(runner._pending_cron_deliveries) == 0
|
||||||
|
|
||||||
|
runner.queue_failed_cron_delivery(
|
||||||
|
platform_name="telegram", chat_id="12345", thread_id=None,
|
||||||
|
content="test output", job_id="job-1", job_name="Test Job",
|
||||||
|
)
|
||||||
|
assert len(runner._pending_cron_deliveries) == 1
|
||||||
|
entry = runner._pending_cron_deliveries[0]
|
||||||
|
assert entry["platform"] == "telegram"
|
||||||
|
assert entry["chat_id"] == "12345"
|
||||||
|
assert entry["content"] == "test output"
|
||||||
|
|
||||||
|
def test_queue_preserves_thread_id(self):
|
||||||
|
runner = self._make_runner()
|
||||||
|
runner.queue_failed_cron_delivery(
|
||||||
|
platform_name="telegram", chat_id="12345", thread_id="99",
|
||||||
|
content="test", job_id="j1", job_name="Job",
|
||||||
|
)
|
||||||
|
assert runner._pending_cron_deliveries[0]["thread_id"] == "99"
|
||||||
|
|
||||||
|
def test_flush_removes_matching_platform_entries(self):
|
||||||
|
runner = self._make_runner()
|
||||||
|
runner.queue_failed_cron_delivery("telegram", "111", None, "msg1", "j1", "Job1")
|
||||||
|
runner.queue_failed_cron_delivery("discord", "222", None, "msg2", "j2", "Job2")
|
||||||
|
runner.queue_failed_cron_delivery("telegram", "333", None, "msg3", "j3", "Job3")
|
||||||
|
|
||||||
|
mock_adapter = AsyncMock()
|
||||||
|
mock_adapter.send = AsyncMock(return_value=MagicMock(success=True))
|
||||||
|
runner.adapters = {Platform.TELEGRAM: mock_adapter}
|
||||||
|
|
||||||
|
asyncio.get_event_loop().run_until_complete(
|
||||||
|
runner._flush_pending_cron_deliveries(Platform.TELEGRAM)
|
||||||
|
)
|
||||||
|
assert len(runner._pending_cron_deliveries) == 1
|
||||||
|
assert runner._pending_cron_deliveries[0]["platform"] == "discord"
|
||||||
|
|
||||||
|
def test_flush_calls_adapter_send_for_each_entry(self):
|
||||||
|
runner = self._make_runner()
|
||||||
|
runner.queue_failed_cron_delivery("telegram", "111", None, "msg1", "j1", "Job1")
|
||||||
|
runner.queue_failed_cron_delivery("telegram", "222", "42", "msg2", "j2", "Job2")
|
||||||
|
|
||||||
|
mock_adapter = AsyncMock()
|
||||||
|
mock_adapter.send = AsyncMock(return_value=MagicMock(success=True))
|
||||||
|
runner.adapters = {Platform.TELEGRAM: mock_adapter}
|
||||||
|
|
||||||
|
asyncio.get_event_loop().run_until_complete(
|
||||||
|
runner._flush_pending_cron_deliveries(Platform.TELEGRAM)
|
||||||
|
)
|
||||||
|
assert mock_adapter.send.call_count == 2
|
||||||
|
|
||||||
|
def test_flush_requeues_if_adapter_missing(self):
|
||||||
|
runner = self._make_runner()
|
||||||
|
runner.queue_failed_cron_delivery("telegram", "111", None, "msg1", "j1", "Job1")
|
||||||
|
runner.adapters = {}
|
||||||
|
|
||||||
|
asyncio.get_event_loop().run_until_complete(
|
||||||
|
runner._flush_pending_cron_deliveries(Platform.TELEGRAM)
|
||||||
|
)
|
||||||
|
assert len(runner._pending_cron_deliveries) == 1
|
||||||
|
|
||||||
|
def test_flush_skips_non_matching_platforms(self):
|
||||||
|
runner = self._make_runner()
|
||||||
|
runner.queue_failed_cron_delivery("discord", "222", None, "msg", "j1", "Job")
|
||||||
|
runner.adapters = {Platform.TELEGRAM: AsyncMock()}
|
||||||
|
|
||||||
|
asyncio.get_event_loop().run_until_complete(
|
||||||
|
runner._flush_pending_cron_deliveries(Platform.TELEGRAM)
|
||||||
|
)
|
||||||
|
assert len(runner._pending_cron_deliveries) == 1
|
||||||
|
|
||||||
|
def test_flush_passes_thread_id_in_metadata(self):
|
||||||
|
runner = self._make_runner()
|
||||||
|
runner.queue_failed_cron_delivery("telegram", "111", "42", "msg", "j1", "Job")
|
||||||
|
|
||||||
|
mock_adapter = AsyncMock()
|
||||||
|
mock_adapter.send = AsyncMock(return_value=MagicMock(success=True))
|
||||||
|
runner.adapters = {Platform.TELEGRAM: mock_adapter}
|
||||||
|
|
||||||
|
asyncio.get_event_loop().run_until_complete(
|
||||||
|
runner._flush_pending_cron_deliveries(Platform.TELEGRAM)
|
||||||
|
)
|
||||||
|
call_kwargs = mock_adapter.send.call_args.kwargs
|
||||||
|
assert call_kwargs["metadata"]["thread_id"] == "42"
|
||||||
|
|
||||||
|
|
||||||
|
class TestDeliverResultPendingCallback:
|
||||||
|
"""Verify _deliver_result calls pending_delivery_callback on failure."""
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_gateway_config(self):
|
||||||
|
"""Create a mock gateway config with telegram platform enabled."""
|
||||||
|
from gateway.config import Platform, GatewayConfig
|
||||||
|
cfg = GatewayConfig()
|
||||||
|
cfg.platforms = {Platform.TELEGRAM: MagicMock(enabled=True)}
|
||||||
|
return cfg
|
||||||
|
|
||||||
|
def _make_job(self):
|
||||||
|
return {
|
||||||
|
"id": "job-1", "name": "Test Job",
|
||||||
|
"deliver": "telegram:12345",
|
||||||
|
"origin": {"platform": "telegram", "chat_id": "12345"},
|
||||||
|
}
|
||||||
|
|
||||||
|
def test_callback_on_exception(self, mock_gateway_config):
|
||||||
|
from cron.scheduler import _deliver_result
|
||||||
|
callback = MagicMock()
|
||||||
|
|
||||||
|
with patch("cron.scheduler._resolve_delivery_target", return_value={
|
||||||
|
"platform": "telegram", "chat_id": "12345", "thread_id": None
|
||||||
|
}), \
|
||||||
|
patch("gateway.config.load_gateway_config", return_value=mock_gateway_config), \
|
||||||
|
patch("tools.send_message_tool._send_to_platform", side_effect=Exception("down")):
|
||||||
|
|
||||||
|
result = _deliver_result(self._make_job(), "test", pending_delivery_callback=callback)
|
||||||
|
|
||||||
|
assert result is not None
|
||||||
|
callback.assert_called_once()
|
||||||
|
|
||||||
|
def test_callback_on_error_dict(self, mock_gateway_config):
|
||||||
|
from cron.scheduler import _deliver_result
|
||||||
|
callback = MagicMock()
|
||||||
|
|
||||||
|
with patch("cron.scheduler._resolve_delivery_target", return_value={
|
||||||
|
"platform": "telegram", "chat_id": "12345", "thread_id": None
|
||||||
|
}), \
|
||||||
|
patch("gateway.config.load_gateway_config", return_value=mock_gateway_config), \
|
||||||
|
patch("tools.send_message_tool._send_to_platform", return_value={"error": "down"}):
|
||||||
|
|
||||||
|
result = _deliver_result(self._make_job(), "test", pending_delivery_callback=callback)
|
||||||
|
|
||||||
|
assert result is not None
|
||||||
|
callback.assert_called_once()
|
||||||
|
|
||||||
|
def test_no_callback_on_success(self, mock_gateway_config):
|
||||||
|
from cron.scheduler import _deliver_result
|
||||||
|
callback = MagicMock()
|
||||||
|
|
||||||
|
with patch("cron.scheduler._resolve_delivery_target", return_value={
|
||||||
|
"platform": "telegram", "chat_id": "12345", "thread_id": None
|
||||||
|
}), \
|
||||||
|
patch("gateway.config.load_gateway_config", return_value=mock_gateway_config), \
|
||||||
|
patch("tools.send_message_tool._send_to_platform", return_value={"ok": True}):
|
||||||
|
|
||||||
|
result = _deliver_result(self._make_job(), "test", pending_delivery_callback=callback)
|
||||||
|
|
||||||
|
assert result is None
|
||||||
|
callback.assert_not_called()
|
||||||
|
|
||||||
|
def test_no_callback_no_crash(self, mock_gateway_config):
|
||||||
|
from cron.scheduler import _deliver_result
|
||||||
|
|
||||||
|
with patch("cron.scheduler._resolve_delivery_target", return_value={
|
||||||
|
"platform": "telegram", "chat_id": "12345", "thread_id": None
|
||||||
|
}), \
|
||||||
|
patch("gateway.config.load_gateway_config", return_value=mock_gateway_config), \
|
||||||
|
patch("tools.send_message_tool._send_to_platform", side_effect=Exception("down")):
|
||||||
|
|
||||||
|
result = _deliver_result(self._make_job(), "test")
|
||||||
|
|
||||||
|
assert result is not None # error, no crash
|
||||||
Reference in New Issue
Block a user