fix(gateway): restart on whatsapp bridge child exit (#2334)
Co-authored-by: Frederico Ribeiro <fr@tecompanytea.com>
This commit is contained in:
@@ -196,7 +196,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||
bridge_status = data.get("status", "unknown")
|
||||
if bridge_status == "connected":
|
||||
print(f"[{self.name}] Using existing bridge (status: {bridge_status})")
|
||||
self._running = True
|
||||
self._mark_connected()
|
||||
self._bridge_process = None # Not managed by us
|
||||
asyncio.create_task(self._poll_messages())
|
||||
return True
|
||||
@@ -306,7 +306,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||
# Start message polling task
|
||||
asyncio.create_task(self._poll_messages())
|
||||
|
||||
self._running = True
|
||||
self._mark_connected()
|
||||
print(f"[{self.name}] Bridge started on port {self._bridge_port}")
|
||||
return True
|
||||
|
||||
@@ -324,6 +324,23 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||
pass
|
||||
self._bridge_log_fh = None
|
||||
|
||||
async def _check_managed_bridge_exit(self) -> Optional[str]:
|
||||
"""Return a fatal error message if the managed bridge child exited."""
|
||||
if self._bridge_process is None:
|
||||
return None
|
||||
|
||||
returncode = self._bridge_process.poll()
|
||||
if returncode is None:
|
||||
return None
|
||||
|
||||
message = f"WhatsApp bridge process exited unexpectedly (code {returncode})."
|
||||
if not self.has_fatal_error:
|
||||
logger.error("[%s] %s", self.name, message)
|
||||
self._set_fatal_error("whatsapp_bridge_exited", message, retryable=True)
|
||||
self._close_bridge_log()
|
||||
await self._notify_fatal_error()
|
||||
return self.fatal_error_message or message
|
||||
|
||||
async def disconnect(self) -> None:
|
||||
"""Stop the WhatsApp bridge and clean up any orphaned processes."""
|
||||
if self._bridge_process:
|
||||
@@ -352,7 +369,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||
# Bridge was not started by us, don't kill it
|
||||
print(f"[{self.name}] Disconnecting (external bridge left running)")
|
||||
|
||||
self._running = False
|
||||
self._mark_disconnected()
|
||||
self._bridge_process = None
|
||||
self._close_bridge_log()
|
||||
print(f"[{self.name}] Disconnected")
|
||||
@@ -367,6 +384,9 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||
"""Send a message via the WhatsApp bridge."""
|
||||
if not self._running:
|
||||
return SendResult(success=False, error="Not connected")
|
||||
bridge_exit = await self._check_managed_bridge_exit()
|
||||
if bridge_exit:
|
||||
return SendResult(success=False, error=bridge_exit)
|
||||
|
||||
try:
|
||||
import aiohttp
|
||||
@@ -412,6 +432,9 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||
"""Edit a previously sent message via the WhatsApp bridge."""
|
||||
if not self._running:
|
||||
return SendResult(success=False, error="Not connected")
|
||||
bridge_exit = await self._check_managed_bridge_exit()
|
||||
if bridge_exit:
|
||||
return SendResult(success=False, error=bridge_exit)
|
||||
try:
|
||||
import aiohttp
|
||||
async with aiohttp.ClientSession() as session:
|
||||
@@ -443,6 +466,9 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||
"""Send any media file via bridge /send-media endpoint."""
|
||||
if not self._running:
|
||||
return SendResult(success=False, error="Not connected")
|
||||
bridge_exit = await self._check_managed_bridge_exit()
|
||||
if bridge_exit:
|
||||
return SendResult(success=False, error=bridge_exit)
|
||||
try:
|
||||
import aiohttp
|
||||
|
||||
@@ -531,6 +557,8 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||
"""Send typing indicator via bridge."""
|
||||
if not self._running:
|
||||
return
|
||||
if await self._check_managed_bridge_exit():
|
||||
return
|
||||
|
||||
try:
|
||||
import aiohttp
|
||||
@@ -548,6 +576,8 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||
"""Get information about a WhatsApp chat."""
|
||||
if not self._running:
|
||||
return {"name": "Unknown", "type": "dm"}
|
||||
if await self._check_managed_bridge_exit():
|
||||
return {"name": chat_id, "type": "dm"}
|
||||
|
||||
try:
|
||||
import aiohttp
|
||||
@@ -578,6 +608,10 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||
return
|
||||
|
||||
while self._running:
|
||||
bridge_exit = await self._check_managed_bridge_exit()
|
||||
if bridge_exit:
|
||||
print(f"[{self.name}] {bridge_exit}")
|
||||
break
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(
|
||||
@@ -593,6 +627,10 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
bridge_exit = await self._check_managed_bridge_exit()
|
||||
if bridge_exit:
|
||||
print(f"[{self.name}] {bridge_exit}")
|
||||
break
|
||||
print(f"[{self.name}] Poll error: {e}")
|
||||
await asyncio.sleep(5)
|
||||
|
||||
@@ -674,4 +712,3 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||
except Exception as e:
|
||||
print(f"[{self.name}] Error building event: {e}")
|
||||
return None
|
||||
|
||||
|
||||
@@ -336,6 +336,7 @@ class GatewayRunner:
|
||||
self._running = False
|
||||
self._shutdown_event = asyncio.Event()
|
||||
self._exit_cleanly = False
|
||||
self._exit_with_failure = False
|
||||
self._exit_reason: Optional[str] = None
|
||||
|
||||
# Track running agents per session for interrupt support
|
||||
@@ -591,6 +592,10 @@ class GatewayRunner:
|
||||
def should_exit_cleanly(self) -> bool:
|
||||
return self._exit_cleanly
|
||||
|
||||
@property
|
||||
def should_exit_with_failure(self) -> bool:
|
||||
return self._exit_with_failure
|
||||
|
||||
@property
|
||||
def exit_reason(self) -> Optional[str]:
|
||||
return self._exit_reason
|
||||
@@ -643,7 +648,11 @@ class GatewayRunner:
|
||||
|
||||
if not self.adapters:
|
||||
self._exit_reason = adapter.fatal_error_message or "All messaging adapters disconnected"
|
||||
logger.error("No connected messaging platforms remain. Shutting down gateway cleanly.")
|
||||
if adapter.fatal_error_retryable:
|
||||
self._exit_with_failure = True
|
||||
logger.error("No connected messaging platforms remain. Shutting down gateway for service restart.")
|
||||
else:
|
||||
logger.error("No connected messaging platforms remain. Shutting down gateway cleanly.")
|
||||
await self.stop()
|
||||
|
||||
def _request_clean_exit(self, reason: str) -> None:
|
||||
@@ -5266,6 +5275,11 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
|
||||
|
||||
# Wait for shutdown
|
||||
await runner.wait_for_shutdown()
|
||||
|
||||
if runner.should_exit_with_failure:
|
||||
if runner.exit_reason:
|
||||
logger.error("Gateway exiting with failure: %s", runner.exit_reason)
|
||||
return False
|
||||
|
||||
# Stop cron ticker cleanly
|
||||
cron_stop.set()
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.config import GatewayConfig, Platform, PlatformConfig
|
||||
@@ -27,6 +29,23 @@ class _FatalAdapter(BasePlatformAdapter):
|
||||
return {"id": chat_id}
|
||||
|
||||
|
||||
class _RuntimeRetryableAdapter(BasePlatformAdapter):
|
||||
def __init__(self):
|
||||
super().__init__(PlatformConfig(enabled=True, token="token"), Platform.WHATSAPP)
|
||||
|
||||
async def connect(self) -> bool:
|
||||
return True
|
||||
|
||||
async def disconnect(self) -> None:
|
||||
self._mark_disconnected()
|
||||
|
||||
async def send(self, chat_id, content, reply_to=None, metadata=None):
|
||||
raise NotImplementedError
|
||||
|
||||
async def get_chat_info(self, chat_id):
|
||||
return {"id": chat_id}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_runner_requests_clean_exit_for_nonretryable_startup_conflict(monkeypatch, tmp_path):
|
||||
config = GatewayConfig(
|
||||
@@ -44,3 +63,31 @@ async def test_runner_requests_clean_exit_for_nonretryable_startup_conflict(monk
|
||||
assert ok is True
|
||||
assert runner.should_exit_cleanly is True
|
||||
assert "already using this Telegram bot token" in runner.exit_reason
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_runner_requests_failure_exit_for_retryable_runtime_fatal(monkeypatch, tmp_path):
|
||||
config = GatewayConfig(
|
||||
platforms={
|
||||
Platform.WHATSAPP: PlatformConfig(enabled=True, token="token")
|
||||
},
|
||||
sessions_dir=tmp_path / "sessions",
|
||||
)
|
||||
runner = GatewayRunner(config)
|
||||
adapter = _RuntimeRetryableAdapter()
|
||||
adapter._set_fatal_error(
|
||||
"whatsapp_bridge_exited",
|
||||
"WhatsApp bridge process exited unexpectedly (code 1).",
|
||||
retryable=True,
|
||||
)
|
||||
|
||||
runner.adapters = {Platform.WHATSAPP: adapter}
|
||||
runner.delivery_router.adapters = runner.adapters
|
||||
runner.stop = AsyncMock()
|
||||
|
||||
await runner._handle_adapter_fatal_error(adapter)
|
||||
|
||||
assert runner.should_exit_cleanly is False
|
||||
assert runner.should_exit_with_failure is True
|
||||
assert "exited unexpectedly" in runner.exit_reason
|
||||
runner.stop.assert_awaited_once()
|
||||
|
||||
@@ -53,6 +53,15 @@ def _make_adapter():
|
||||
adapter._bridge_process = None
|
||||
adapter._reply_prefix = None
|
||||
adapter._running = False
|
||||
adapter._message_handler = None
|
||||
adapter._fatal_error_code = None
|
||||
adapter._fatal_error_message = None
|
||||
adapter._fatal_error_retryable = True
|
||||
adapter._fatal_error_handler = None
|
||||
adapter._active_sessions = {}
|
||||
adapter._pending_messages = {}
|
||||
adapter._background_tasks = set()
|
||||
adapter._auto_tts_disabled_chats = set()
|
||||
adapter._message_queue = asyncio.Queue()
|
||||
return adapter
|
||||
|
||||
@@ -200,6 +209,54 @@ class TestFileHandleClosedOnError:
|
||||
mock_fh.close.assert_called_once()
|
||||
assert adapter._bridge_log_fh is None
|
||||
|
||||
|
||||
class TestBridgeRuntimeFailure:
|
||||
"""Verify runtime bridge death is surfaced as a fatal adapter error."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_marks_retryable_fatal_when_managed_bridge_exits(self):
|
||||
adapter = _make_adapter()
|
||||
fatal_handler = AsyncMock()
|
||||
adapter.set_fatal_error_handler(fatal_handler)
|
||||
adapter._running = True
|
||||
mock_fh = MagicMock()
|
||||
adapter._bridge_log_fh = mock_fh
|
||||
|
||||
mock_proc = MagicMock()
|
||||
mock_proc.poll.return_value = 7
|
||||
adapter._bridge_process = mock_proc
|
||||
|
||||
result = await adapter.send("chat-123", "hello")
|
||||
|
||||
assert result.success is False
|
||||
assert "exited unexpectedly" in result.error
|
||||
assert adapter.fatal_error_code == "whatsapp_bridge_exited"
|
||||
assert adapter.fatal_error_retryable is True
|
||||
fatal_handler.assert_awaited_once()
|
||||
mock_fh.close.assert_called_once()
|
||||
assert adapter._bridge_log_fh is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_poll_messages_marks_retryable_fatal_when_managed_bridge_exits(self):
|
||||
adapter = _make_adapter()
|
||||
fatal_handler = AsyncMock()
|
||||
adapter.set_fatal_error_handler(fatal_handler)
|
||||
adapter._running = True
|
||||
mock_fh = MagicMock()
|
||||
adapter._bridge_log_fh = mock_fh
|
||||
|
||||
mock_proc = MagicMock()
|
||||
mock_proc.poll.return_value = 23
|
||||
adapter._bridge_process = mock_proc
|
||||
|
||||
await adapter._poll_messages()
|
||||
|
||||
assert adapter.fatal_error_code == "whatsapp_bridge_exited"
|
||||
assert adapter.fatal_error_retryable is True
|
||||
fatal_handler.assert_awaited_once()
|
||||
mock_fh.close.assert_called_once()
|
||||
assert adapter._bridge_log_fh is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_closed_when_http_not_ready(self):
|
||||
"""Health endpoint never returns 200 within 15 attempts."""
|
||||
|
||||
Reference in New Issue
Block a user