diff --git a/gateway/platforms/whatsapp.py b/gateway/platforms/whatsapp.py index 20bda5da5..6697800e5 100644 --- a/gateway/platforms/whatsapp.py +++ b/gateway/platforms/whatsapp.py @@ -196,7 +196,7 @@ class WhatsAppAdapter(BasePlatformAdapter): bridge_status = data.get("status", "unknown") if bridge_status == "connected": print(f"[{self.name}] Using existing bridge (status: {bridge_status})") - self._running = True + self._mark_connected() self._bridge_process = None # Not managed by us asyncio.create_task(self._poll_messages()) return True @@ -306,7 +306,7 @@ class WhatsAppAdapter(BasePlatformAdapter): # Start message polling task asyncio.create_task(self._poll_messages()) - self._running = True + self._mark_connected() print(f"[{self.name}] Bridge started on port {self._bridge_port}") return True @@ -324,6 +324,23 @@ class WhatsAppAdapter(BasePlatformAdapter): pass self._bridge_log_fh = None + async def _check_managed_bridge_exit(self) -> Optional[str]: + """Return a fatal error message if the managed bridge child exited.""" + if self._bridge_process is None: + return None + + returncode = self._bridge_process.poll() + if returncode is None: + return None + + message = f"WhatsApp bridge process exited unexpectedly (code {returncode})." + if not self.has_fatal_error: + logger.error("[%s] %s", self.name, message) + self._set_fatal_error("whatsapp_bridge_exited", message, retryable=True) + self._close_bridge_log() + await self._notify_fatal_error() + return self.fatal_error_message or message + async def disconnect(self) -> None: """Stop the WhatsApp bridge and clean up any orphaned processes.""" if self._bridge_process: @@ -352,7 +369,7 @@ class WhatsAppAdapter(BasePlatformAdapter): # Bridge was not started by us, don't kill it print(f"[{self.name}] Disconnecting (external bridge left running)") - self._running = False + self._mark_disconnected() self._bridge_process = None self._close_bridge_log() print(f"[{self.name}] Disconnected") @@ -367,6 +384,9 @@ class WhatsAppAdapter(BasePlatformAdapter): """Send a message via the WhatsApp bridge.""" if not self._running: return SendResult(success=False, error="Not connected") + bridge_exit = await self._check_managed_bridge_exit() + if bridge_exit: + return SendResult(success=False, error=bridge_exit) try: import aiohttp @@ -412,6 +432,9 @@ class WhatsAppAdapter(BasePlatformAdapter): """Edit a previously sent message via the WhatsApp bridge.""" if not self._running: return SendResult(success=False, error="Not connected") + bridge_exit = await self._check_managed_bridge_exit() + if bridge_exit: + return SendResult(success=False, error=bridge_exit) try: import aiohttp async with aiohttp.ClientSession() as session: @@ -443,6 +466,9 @@ class WhatsAppAdapter(BasePlatformAdapter): """Send any media file via bridge /send-media endpoint.""" if not self._running: return SendResult(success=False, error="Not connected") + bridge_exit = await self._check_managed_bridge_exit() + if bridge_exit: + return SendResult(success=False, error=bridge_exit) try: import aiohttp @@ -531,6 +557,8 @@ class WhatsAppAdapter(BasePlatformAdapter): """Send typing indicator via bridge.""" if not self._running: return + if await self._check_managed_bridge_exit(): + return try: import aiohttp @@ -548,6 +576,8 @@ class WhatsAppAdapter(BasePlatformAdapter): """Get information about a WhatsApp chat.""" if not self._running: return {"name": "Unknown", "type": "dm"} + if await self._check_managed_bridge_exit(): + return {"name": chat_id, "type": "dm"} try: import aiohttp @@ -578,6 +608,10 @@ class WhatsAppAdapter(BasePlatformAdapter): return while self._running: + bridge_exit = await self._check_managed_bridge_exit() + if bridge_exit: + print(f"[{self.name}] {bridge_exit}") + break try: async with aiohttp.ClientSession() as session: async with session.get( @@ -593,6 +627,10 @@ class WhatsAppAdapter(BasePlatformAdapter): except asyncio.CancelledError: break except Exception as e: + bridge_exit = await self._check_managed_bridge_exit() + if bridge_exit: + print(f"[{self.name}] {bridge_exit}") + break print(f"[{self.name}] Poll error: {e}") await asyncio.sleep(5) @@ -674,4 +712,3 @@ class WhatsAppAdapter(BasePlatformAdapter): except Exception as e: print(f"[{self.name}] Error building event: {e}") return None - diff --git a/gateway/run.py b/gateway/run.py index 43b7c079a..8c34935c1 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -336,6 +336,7 @@ class GatewayRunner: self._running = False self._shutdown_event = asyncio.Event() self._exit_cleanly = False + self._exit_with_failure = False self._exit_reason: Optional[str] = None # Track running agents per session for interrupt support @@ -591,6 +592,10 @@ class GatewayRunner: def should_exit_cleanly(self) -> bool: return self._exit_cleanly + @property + def should_exit_with_failure(self) -> bool: + return self._exit_with_failure + @property def exit_reason(self) -> Optional[str]: return self._exit_reason @@ -643,7 +648,11 @@ class GatewayRunner: if not self.adapters: self._exit_reason = adapter.fatal_error_message or "All messaging adapters disconnected" - logger.error("No connected messaging platforms remain. Shutting down gateway cleanly.") + if adapter.fatal_error_retryable: + self._exit_with_failure = True + logger.error("No connected messaging platforms remain. Shutting down gateway for service restart.") + else: + logger.error("No connected messaging platforms remain. Shutting down gateway cleanly.") await self.stop() def _request_clean_exit(self, reason: str) -> None: @@ -5266,6 +5275,11 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = # Wait for shutdown await runner.wait_for_shutdown() + + if runner.should_exit_with_failure: + if runner.exit_reason: + logger.error("Gateway exiting with failure: %s", runner.exit_reason) + return False # Stop cron ticker cleanly cron_stop.set() diff --git a/tests/gateway/test_runner_fatal_adapter.py b/tests/gateway/test_runner_fatal_adapter.py index aa414d72f..2badb87c4 100644 --- a/tests/gateway/test_runner_fatal_adapter.py +++ b/tests/gateway/test_runner_fatal_adapter.py @@ -1,3 +1,5 @@ +from unittest.mock import AsyncMock + import pytest from gateway.config import GatewayConfig, Platform, PlatformConfig @@ -27,6 +29,23 @@ class _FatalAdapter(BasePlatformAdapter): return {"id": chat_id} +class _RuntimeRetryableAdapter(BasePlatformAdapter): + def __init__(self): + super().__init__(PlatformConfig(enabled=True, token="token"), Platform.WHATSAPP) + + async def connect(self) -> bool: + return True + + async def disconnect(self) -> None: + self._mark_disconnected() + + async def send(self, chat_id, content, reply_to=None, metadata=None): + raise NotImplementedError + + async def get_chat_info(self, chat_id): + return {"id": chat_id} + + @pytest.mark.asyncio async def test_runner_requests_clean_exit_for_nonretryable_startup_conflict(monkeypatch, tmp_path): config = GatewayConfig( @@ -44,3 +63,31 @@ async def test_runner_requests_clean_exit_for_nonretryable_startup_conflict(monk assert ok is True assert runner.should_exit_cleanly is True assert "already using this Telegram bot token" in runner.exit_reason + + +@pytest.mark.asyncio +async def test_runner_requests_failure_exit_for_retryable_runtime_fatal(monkeypatch, tmp_path): + config = GatewayConfig( + platforms={ + Platform.WHATSAPP: PlatformConfig(enabled=True, token="token") + }, + sessions_dir=tmp_path / "sessions", + ) + runner = GatewayRunner(config) + adapter = _RuntimeRetryableAdapter() + adapter._set_fatal_error( + "whatsapp_bridge_exited", + "WhatsApp bridge process exited unexpectedly (code 1).", + retryable=True, + ) + + runner.adapters = {Platform.WHATSAPP: adapter} + runner.delivery_router.adapters = runner.adapters + runner.stop = AsyncMock() + + await runner._handle_adapter_fatal_error(adapter) + + assert runner.should_exit_cleanly is False + assert runner.should_exit_with_failure is True + assert "exited unexpectedly" in runner.exit_reason + runner.stop.assert_awaited_once() diff --git a/tests/gateway/test_whatsapp_connect.py b/tests/gateway/test_whatsapp_connect.py index 37a1f9509..7a2126bb8 100644 --- a/tests/gateway/test_whatsapp_connect.py +++ b/tests/gateway/test_whatsapp_connect.py @@ -53,6 +53,15 @@ def _make_adapter(): adapter._bridge_process = None adapter._reply_prefix = None adapter._running = False + adapter._message_handler = None + adapter._fatal_error_code = None + adapter._fatal_error_message = None + adapter._fatal_error_retryable = True + adapter._fatal_error_handler = None + adapter._active_sessions = {} + adapter._pending_messages = {} + adapter._background_tasks = set() + adapter._auto_tts_disabled_chats = set() adapter._message_queue = asyncio.Queue() return adapter @@ -200,6 +209,54 @@ class TestFileHandleClosedOnError: mock_fh.close.assert_called_once() assert adapter._bridge_log_fh is None + +class TestBridgeRuntimeFailure: + """Verify runtime bridge death is surfaced as a fatal adapter error.""" + + @pytest.mark.asyncio + async def test_send_marks_retryable_fatal_when_managed_bridge_exits(self): + adapter = _make_adapter() + fatal_handler = AsyncMock() + adapter.set_fatal_error_handler(fatal_handler) + adapter._running = True + mock_fh = MagicMock() + adapter._bridge_log_fh = mock_fh + + mock_proc = MagicMock() + mock_proc.poll.return_value = 7 + adapter._bridge_process = mock_proc + + result = await adapter.send("chat-123", "hello") + + assert result.success is False + assert "exited unexpectedly" in result.error + assert adapter.fatal_error_code == "whatsapp_bridge_exited" + assert adapter.fatal_error_retryable is True + fatal_handler.assert_awaited_once() + mock_fh.close.assert_called_once() + assert adapter._bridge_log_fh is None + + @pytest.mark.asyncio + async def test_poll_messages_marks_retryable_fatal_when_managed_bridge_exits(self): + adapter = _make_adapter() + fatal_handler = AsyncMock() + adapter.set_fatal_error_handler(fatal_handler) + adapter._running = True + mock_fh = MagicMock() + adapter._bridge_log_fh = mock_fh + + mock_proc = MagicMock() + mock_proc.poll.return_value = 23 + adapter._bridge_process = mock_proc + + await adapter._poll_messages() + + assert adapter.fatal_error_code == "whatsapp_bridge_exited" + assert adapter.fatal_error_retryable is True + fatal_handler.assert_awaited_once() + mock_fh.close.assert_called_once() + assert adapter._bridge_log_fh is None + @pytest.mark.asyncio async def test_closed_when_http_not_ready(self): """Health endpoint never returns 200 within 15 attempts."""