feat: auto-reconnect failed gateway platforms with exponential backoff (#2584)
When a messaging platform fails to connect at startup (e.g. transient DNS failure) or disconnects at runtime with a retryable error, the gateway now queues it for background reconnection instead of giving up permanently. - New _platform_reconnect_watcher background task runs alongside the existing session expiry watcher - Exponential backoff: 30s, 60s, 120s, 240s, 300s cap - Max 20 retry attempts before giving up on a platform - Non-retryable errors (bad auth token, etc.) are not retried - Runtime disconnections via _handle_adapter_fatal_error now queue retryable failures instead of triggering gateway shutdown - On successful reconnect, adapter is wired up and channel directory is rebuilt automatically Fixes the case where a DNS blip during gateway startup caused Telegram and Discord to be permanently unavailable until manual restart.
This commit is contained in:
160
gateway/run.py
160
gateway/run.py
@@ -363,6 +363,10 @@ class GatewayRunner:
|
||||
# Key: session_key, Value: {"command": str, "pattern_key": str, ...}
|
||||
self._pending_approvals: Dict[str, Dict[str, Any]] = {}
|
||||
|
||||
# Track platforms that failed to connect for background reconnection.
|
||||
# Key: Platform enum, Value: {"config": platform_config, "attempts": int, "next_retry": float}
|
||||
self._failed_platforms: Dict[Platform, Dict[str, Any]] = {}
|
||||
|
||||
# Persistent Honcho managers keyed by gateway session key.
|
||||
# This preserves write_frequency="session" semantics across short-lived
|
||||
# per-message AIAgent instances.
|
||||
@@ -639,7 +643,11 @@ class GatewayRunner:
|
||||
return resolve_turn_route(user_message, getattr(self, "_smart_model_routing", {}), primary)
|
||||
|
||||
async def _handle_adapter_fatal_error(self, adapter: BasePlatformAdapter) -> None:
|
||||
"""React to a non-retryable adapter failure after startup."""
|
||||
"""React to an adapter failure after startup.
|
||||
|
||||
If the error is retryable (e.g. network blip, DNS failure), queue the
|
||||
platform for background reconnection instead of giving up permanently.
|
||||
"""
|
||||
logger.error(
|
||||
"Fatal %s adapter error (%s): %s",
|
||||
adapter.platform.value,
|
||||
@@ -655,7 +663,21 @@ class GatewayRunner:
|
||||
self.adapters.pop(adapter.platform, None)
|
||||
self.delivery_router.adapters = self.adapters
|
||||
|
||||
if not self.adapters:
|
||||
# Queue retryable failures for background reconnection
|
||||
if adapter.fatal_error_retryable:
|
||||
platform_config = self.config.platforms.get(adapter.platform)
|
||||
if platform_config and adapter.platform not in self._failed_platforms:
|
||||
self._failed_platforms[adapter.platform] = {
|
||||
"config": platform_config,
|
||||
"attempts": 0,
|
||||
"next_retry": time.monotonic() + 30,
|
||||
}
|
||||
logger.info(
|
||||
"%s queued for background reconnection",
|
||||
adapter.platform.value,
|
||||
)
|
||||
|
||||
if not self.adapters and not self._failed_platforms:
|
||||
self._exit_reason = adapter.fatal_error_message or "All messaging adapters disconnected"
|
||||
if adapter.fatal_error_retryable:
|
||||
self._exit_with_failure = True
|
||||
@@ -663,6 +685,11 @@ class GatewayRunner:
|
||||
else:
|
||||
logger.error("No connected messaging platforms remain. Shutting down gateway cleanly.")
|
||||
await self.stop()
|
||||
elif not self.adapters and self._failed_platforms:
|
||||
logger.warning(
|
||||
"No connected messaging platforms remain, but %d platform(s) queued for reconnection",
|
||||
len(self._failed_platforms),
|
||||
)
|
||||
|
||||
def _request_clean_exit(self, reason: str) -> None:
|
||||
self._exit_cleanly = True
|
||||
@@ -940,13 +967,32 @@ class GatewayRunner:
|
||||
target.append(
|
||||
f"{platform.value}: {adapter.fatal_error_message}"
|
||||
)
|
||||
# Queue for reconnection if the error is retryable
|
||||
if adapter.fatal_error_retryable:
|
||||
self._failed_platforms[platform] = {
|
||||
"config": platform_config,
|
||||
"attempts": 1,
|
||||
"next_retry": time.monotonic() + 30,
|
||||
}
|
||||
else:
|
||||
startup_retryable_errors.append(
|
||||
f"{platform.value}: failed to connect"
|
||||
)
|
||||
# No fatal error info means likely a transient issue — queue for retry
|
||||
self._failed_platforms[platform] = {
|
||||
"config": platform_config,
|
||||
"attempts": 1,
|
||||
"next_retry": time.monotonic() + 30,
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error("✗ %s error: %s", platform.value, e)
|
||||
startup_retryable_errors.append(f"{platform.value}: {e}")
|
||||
# Unexpected exceptions are typically transient — queue for retry
|
||||
self._failed_platforms[platform] = {
|
||||
"config": platform_config,
|
||||
"attempts": 1,
|
||||
"next_retry": time.monotonic() + 30,
|
||||
}
|
||||
|
||||
if connected_count == 0:
|
||||
if startup_nonretryable_errors:
|
||||
@@ -1026,6 +1072,15 @@ class GatewayRunner:
|
||||
# Start background session expiry watcher for proactive memory flushing
|
||||
asyncio.create_task(self._session_expiry_watcher())
|
||||
|
||||
# Start background reconnection watcher for platforms that failed at startup
|
||||
if self._failed_platforms:
|
||||
logger.info(
|
||||
"Starting reconnection watcher for %d failed platform(s): %s",
|
||||
len(self._failed_platforms),
|
||||
", ".join(p.value for p in self._failed_platforms),
|
||||
)
|
||||
asyncio.create_task(self._platform_reconnect_watcher())
|
||||
|
||||
logger.info("Press Ctrl+C to stop")
|
||||
|
||||
return True
|
||||
@@ -1068,6 +1123,107 @@ class GatewayRunner:
|
||||
break
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def _platform_reconnect_watcher(self) -> None:
|
||||
"""Background task that periodically retries connecting failed platforms.
|
||||
|
||||
Uses exponential backoff: 30s → 60s → 120s → 240s → 300s (cap).
|
||||
Stops retrying a platform after 20 failed attempts or if the error
|
||||
is non-retryable (e.g. bad auth token).
|
||||
"""
|
||||
_MAX_ATTEMPTS = 20
|
||||
_BACKOFF_CAP = 300 # 5 minutes max between retries
|
||||
|
||||
await asyncio.sleep(10) # initial delay — let startup finish
|
||||
while self._running:
|
||||
if not self._failed_platforms:
|
||||
# Nothing to reconnect — sleep and check again
|
||||
for _ in range(30):
|
||||
if not self._running:
|
||||
return
|
||||
await asyncio.sleep(1)
|
||||
continue
|
||||
|
||||
now = time.monotonic()
|
||||
for platform in list(self._failed_platforms.keys()):
|
||||
if not self._running:
|
||||
return
|
||||
info = self._failed_platforms[platform]
|
||||
if now < info["next_retry"]:
|
||||
continue # not time yet
|
||||
|
||||
if info["attempts"] >= _MAX_ATTEMPTS:
|
||||
logger.warning(
|
||||
"Giving up reconnecting %s after %d attempts",
|
||||
platform.value, info["attempts"],
|
||||
)
|
||||
del self._failed_platforms[platform]
|
||||
continue
|
||||
|
||||
platform_config = info["config"]
|
||||
attempt = info["attempts"] + 1
|
||||
logger.info(
|
||||
"Reconnecting %s (attempt %d/%d)...",
|
||||
platform.value, attempt, _MAX_ATTEMPTS,
|
||||
)
|
||||
|
||||
try:
|
||||
adapter = self._create_adapter(platform, platform_config)
|
||||
if not adapter:
|
||||
logger.warning(
|
||||
"Reconnect %s: adapter creation returned None, removing from retry queue",
|
||||
platform.value,
|
||||
)
|
||||
del self._failed_platforms[platform]
|
||||
continue
|
||||
|
||||
adapter.set_message_handler(self._handle_message)
|
||||
adapter.set_fatal_error_handler(self._handle_adapter_fatal_error)
|
||||
|
||||
success = await adapter.connect()
|
||||
if success:
|
||||
self.adapters[platform] = adapter
|
||||
self._sync_voice_mode_state_to_adapter(adapter)
|
||||
self.delivery_router.adapters = self.adapters
|
||||
del self._failed_platforms[platform]
|
||||
logger.info("✓ %s reconnected successfully", platform.value)
|
||||
|
||||
# Rebuild channel directory with the new adapter
|
||||
try:
|
||||
from gateway.channel_directory import build_channel_directory
|
||||
build_channel_directory(self.adapters)
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
# Check if the failure is non-retryable
|
||||
if adapter.has_fatal_error and not adapter.fatal_error_retryable:
|
||||
logger.warning(
|
||||
"Reconnect %s: non-retryable error (%s), removing from retry queue",
|
||||
platform.value, adapter.fatal_error_message,
|
||||
)
|
||||
del self._failed_platforms[platform]
|
||||
else:
|
||||
backoff = min(30 * (2 ** (attempt - 1)), _BACKOFF_CAP)
|
||||
info["attempts"] = attempt
|
||||
info["next_retry"] = time.monotonic() + backoff
|
||||
logger.info(
|
||||
"Reconnect %s failed, next retry in %ds",
|
||||
platform.value, backoff,
|
||||
)
|
||||
except Exception as e:
|
||||
backoff = min(30 * (2 ** (attempt - 1)), _BACKOFF_CAP)
|
||||
info["attempts"] = attempt
|
||||
info["next_retry"] = time.monotonic() + backoff
|
||||
logger.warning(
|
||||
"Reconnect %s error: %s, next retry in %ds",
|
||||
platform.value, e, backoff,
|
||||
)
|
||||
|
||||
# Check every 10 seconds for platforms that need reconnection
|
||||
for _ in range(10):
|
||||
if not self._running:
|
||||
return
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Stop the gateway and disconnect all adapters."""
|
||||
logger.info("Stopping gateway...")
|
||||
|
||||
Reference in New Issue
Block a user