From 7cf4bd06bfad5a51ea1e381fd8994defcb7632a0 Mon Sep 17 00:00:00 2001 From: jtuki Date: Sun, 5 Apr 2026 21:56:58 +0800 Subject: [PATCH] fix(gateway): fix Feishu reconnect message drops and shutdown hang This commit fixes two critical bugs in the Feishu adapter that affect message reliability and process lifecycle. **Bug Fix 1: Intermittent Message Drops** Root cause: Event handler was created once in __init__ and reused across reconnects, causing callbacks to capture stale loop references. When the adapter disconnected and reconnected, old callbacks continued firing with invalid loop references, resulting in dropped messages with warnings: "[Feishu] Dropping inbound message before adapter loop is ready" Fix: - Rebuild event handler on each connect (websocket/webhook) - Clear handler on disconnect - Ensure callbacks always capture current valid loop - Add defensive loop.is_closed() checks with getattr for test compatibility - Unify webhook dispatch path to use same loop checks as websocket mode **Bug Fix 2: Process Hangs on Ctrl+C / SIGTERM** Root cause: Feishu SDK's websocket client runs in a background thread with an infinite _select() loop that never exits naturally. The thread was never properly joined on disconnect, causing processes to hang indefinitely after Ctrl+C or gateway stop commands. Fix: - Store reference to thread-local event loop (_ws_thread_loop) - On disconnect, cancel all tasks in thread loop and stop it gracefully via call_soon_threadsafe() - Await thread future with 10s timeout - Clean up pending tasks in thread's finally block before closing loop - Add detailed debug logging for disconnect flow **Additional Improvements:** - Add regression tests for disconnect cleanup and webhook dispatch - Ensure all event callbacks check loop readiness before dispatching Tested on Linux with websocket mode. All Feishu tests pass. Co-Authored-By: Claude Opus 4.6 (1M context) --- gateway/platforms/feishu.py | 87 +++++++++++++++++++++++++++++++----- tests/gateway/test_feishu.py | 63 ++++++++++++++++++++++++-- 2 files changed, 134 insertions(+), 16 deletions(-) diff --git a/gateway/platforms/feishu.py b/gateway/platforms/feishu.py index bee8b01d8..de4d97e64 100644 --- a/gateway/platforms/feishu.py +++ b/gateway/platforms/feishu.py @@ -913,14 +913,33 @@ def _unique_lines(lines: List[str]) -> List[str]: return unique -def _run_official_feishu_ws_client(ws_client: Any) -> None: +def _run_official_feishu_ws_client(ws_client: Any, adapter: Any) -> None: """Run the official Lark WS client in its own thread-local event loop.""" import lark_oapi.ws.client as ws_client_module loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) ws_client_module.loop = loop - ws_client.start() + adapter._ws_thread_loop = loop + try: + ws_client.start() + except Exception: + pass + finally: + pending = [t for t in asyncio.all_tasks(loop) if not t.done()] + for task in pending: + task.cancel() + if pending: + loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) + try: + loop.stop() + except Exception: + pass + try: + loop.close() + except Exception: + pass + adapter._ws_thread_loop = None def check_feishu_requirements() -> bool: @@ -945,10 +964,11 @@ class FeishuAdapter(BasePlatformAdapter): self._client: Optional[Any] = None self._ws_client: Optional[Any] = None self._ws_future: Optional[asyncio.Future] = None + self._ws_thread_loop: Optional[asyncio.AbstractEventLoop] = None self._loop: Optional[asyncio.AbstractEventLoop] = None self._webhook_runner: Optional[Any] = None self._webhook_site: Optional[Any] = None - self._event_handler = self._build_event_handler() + self._event_handler: Optional[Any] = None self._seen_message_ids: Dict[str, float] = {} # message_id → seen_at (time.time()) self._seen_message_order: List[str] = [] self._dedup_state_path = get_hermes_home() / "feishu_seen_message_ids.json" @@ -1116,8 +1136,37 @@ class FeishuAdapter(BasePlatformAdapter): self._reset_batch_buffers() self._disable_websocket_auto_reconnect() await self._stop_webhook_server() + + ws_thread_loop = self._ws_thread_loop + if ws_thread_loop is not None and not ws_thread_loop.is_closed(): + logger.debug("[Feishu] Cancelling websocket thread tasks and stopping loop") + + def cancel_all_tasks() -> None: + tasks = [t for t in asyncio.all_tasks(ws_thread_loop) if not t.done()] + logger.debug("[Feishu] Found %d pending tasks in websocket thread", len(tasks)) + for task in tasks: + task.cancel() + ws_thread_loop.call_later(0.1, ws_thread_loop.stop) + + ws_thread_loop.call_soon_threadsafe(cancel_all_tasks) + + ws_future = self._ws_future + if ws_future is not None: + try: + logger.debug("[Feishu] Waiting for websocket thread to exit (timeout=10s)") + await asyncio.wait_for(asyncio.shield(ws_future), timeout=10.0) + logger.debug("[Feishu] Websocket thread exited cleanly") + except asyncio.TimeoutError: + logger.warning("[Feishu] Websocket thread did not exit within 10s - may be stuck") + except asyncio.CancelledError: + logger.debug("[Feishu] Websocket thread cancelled during disconnect") + except Exception as exc: + logger.debug("[Feishu] Websocket thread exited with error: %s", exc, exc_info=True) + self._ws_future = None + self._ws_thread_loop = None self._loop = None + self._event_handler = None self._persist_seen_message_ids() await self._release_app_lock() @@ -1476,12 +1525,13 @@ class FeishuAdapter(BasePlatformAdapter): def _on_message_event(self, data: Any) -> None: """Normalize Feishu inbound events into MessageEvent.""" - if self._loop is None: + loop = self._loop + if loop is None or bool(getattr(loop, "is_closed", lambda: False)()): logger.warning("[Feishu] Dropping inbound message before adapter loop is ready") return future = asyncio.run_coroutine_threadsafe( self._handle_message_event_data(data), - self._loop, + loop, ) future.add_done_callback(self._log_background_failure) @@ -1553,27 +1603,30 @@ class FeishuAdapter(BasePlatformAdapter): ) # Only process reactions from real users. Ignore app/bot-generated reactions # and Hermes' own ACK emoji to avoid feedback loops. + loop = self._loop if ( operator_type in {"bot", "app"} or emoji_type == _FEISHU_ACK_EMOJI or not message_id - or self._loop is None + or loop is None + or bool(getattr(loop, "is_closed", lambda: False)()) ): return future = asyncio.run_coroutine_threadsafe( self._handle_reaction_event(event_type, data), - self._loop, + loop, ) future.add_done_callback(self._log_background_failure) def _on_card_action_trigger(self, data: Any) -> Any: """Schedule Feishu card actions on the adapter loop and acknowledge immediately.""" - if self._loop is None: + loop = self._loop + if loop is None or bool(getattr(loop, "is_closed", lambda: False)()): logger.warning("[Feishu] Dropping card action before adapter loop is ready") else: future = asyncio.run_coroutine_threadsafe( self._handle_card_action_event(data), - self._loop, + loop, ) future.add_done_callback(self._log_background_failure) if P2CardActionTriggerResponse is None: @@ -2083,7 +2136,7 @@ class FeishuAdapter(BasePlatformAdapter): event_type = str((payload.get("header") or {}).get("event_type") or "") data = self._namespace_from_mapping(payload) if event_type == "im.message.receive_v1": - await self._handle_message_event_data(data) + self._on_message_event(data) elif event_type == "im.message.message_read_v1": self._on_message_read_event(data) elif event_type == "im.chat.member.bot.added_v1": @@ -2093,7 +2146,7 @@ class FeishuAdapter(BasePlatformAdapter): elif event_type in ("im.message.reaction.created_v1", "im.message.reaction.deleted_v1"): self._on_reaction_event(event_type, data) elif event_type == "card.action.trigger": - asyncio.ensure_future(self._handle_card_action_event(data)) + self._on_card_action_trigger(data) else: logger.debug("[Feishu] Ignoring webhook event type: %s", event_type or "unknown") return web.json_response({"code": 0, "msg": "ok"}) @@ -2965,6 +3018,12 @@ class FeishuAdapter(BasePlatformAdapter): raise RuntimeError("websockets not installed; websocket mode unavailable") domain = FEISHU_DOMAIN if self._domain_name != "lark" else LARK_DOMAIN self._client = self._build_lark_client(domain) + self._event_handler = self._build_event_handler() + if self._event_handler is None: + raise RuntimeError("failed to build Feishu event handler") + loop = self._loop + if loop is None or loop.is_closed(): + raise RuntimeError("adapter loop is not ready") await self._hydrate_bot_identity() self._ws_client = FeishuWSClient( app_id=self._app_id, @@ -2973,10 +3032,11 @@ class FeishuAdapter(BasePlatformAdapter): event_handler=self._event_handler, domain=domain, ) - self._ws_future = self._loop.run_in_executor( + self._ws_future = loop.run_in_executor( None, _run_official_feishu_ws_client, self._ws_client, + self, ) async def _connect_webhook(self) -> None: @@ -2984,6 +3044,9 @@ class FeishuAdapter(BasePlatformAdapter): raise RuntimeError("aiohttp not installed; webhook mode unavailable") domain = FEISHU_DOMAIN if self._domain_name != "lark" else LARK_DOMAIN self._client = self._build_lark_client(domain) + self._event_handler = self._build_event_handler() + if self._event_handler is None: + raise RuntimeError("failed to build Feishu event handler") await self._hydrate_bot_identity() app = web.Application() app.router.add_post(self._webhook_path, self._handle_webhook_request) diff --git a/tests/gateway/test_feishu.py b/tests/gateway/test_feishu.py index 5344cda52..41f92d60a 100644 --- a/tests/gateway/test_feishu.py +++ b/tests/gateway/test_feishu.py @@ -8,7 +8,7 @@ import time import unittest from pathlib import Path from types import SimpleNamespace -from unittest.mock import AsyncMock, patch +from unittest.mock import AsyncMock, Mock, patch try: import lark_oapi @@ -289,7 +289,7 @@ class TestFeishuAdapterMessaging(unittest.TestCase): patch("gateway.platforms.feishu.FEISHU_AVAILABLE", True), patch("gateway.platforms.feishu.FEISHU_WEBSOCKET_AVAILABLE", True), patch("gateway.platforms.feishu.lark", SimpleNamespace(LogLevel=SimpleNamespace(INFO="INFO", WARNING="WARNING"))), - patch("gateway.platforms.feishu.EventDispatcherHandler", object()), + patch("gateway.platforms.feishu.EventDispatcherHandler") as mock_handler_class, patch("gateway.platforms.feishu.FeishuWSClient", return_value=ws_client), patch("gateway.platforms.feishu._run_official_feishu_ws_client"), patch("gateway.platforms.feishu.acquire_scoped_lock", return_value=(True, None)) as acquire_lock, @@ -297,6 +297,15 @@ class TestFeishuAdapterMessaging(unittest.TestCase): patch.object(adapter, "_hydrate_bot_identity", new=AsyncMock()), patch.object(adapter, "_build_lark_client", return_value=SimpleNamespace()), ): + mock_builder = Mock() + mock_builder.register_p2_im_message_message_read_v1 = Mock(return_value=mock_builder) + mock_builder.register_p2_im_message_receive_v1 = Mock(return_value=mock_builder) + mock_builder.register_p2_im_message_reaction_created_v1 = Mock(return_value=mock_builder) + mock_builder.register_p2_im_message_reaction_deleted_v1 = Mock(return_value=mock_builder) + mock_builder.register_p2_card_action_trigger = Mock(return_value=mock_builder) + mock_builder.build = Mock(return_value=object()) + mock_handler_class.builder = Mock(return_value=mock_builder) + loop = asyncio.new_event_loop() future = loop.create_future() future.set_result(None) @@ -305,6 +314,9 @@ class TestFeishuAdapterMessaging(unittest.TestCase): def run_in_executor(self, *_args, **_kwargs): return future + def is_closed(self): + return False + try: with patch("gateway.platforms.feishu.asyncio.get_running_loop", return_value=_Loop()): connected = asyncio.run(adapter.connect()) @@ -313,6 +325,7 @@ class TestFeishuAdapterMessaging(unittest.TestCase): loop.close() self.assertTrue(connected) + self.assertIsNone(adapter._event_handler) acquire_lock.assert_called_once_with( "feishu-app-id", "cli_app", @@ -361,7 +374,7 @@ class TestFeishuAdapterMessaging(unittest.TestCase): patch("gateway.platforms.feishu.FEISHU_AVAILABLE", True), patch("gateway.platforms.feishu.FEISHU_WEBSOCKET_AVAILABLE", True), patch("gateway.platforms.feishu.lark", SimpleNamespace(LogLevel=SimpleNamespace(INFO="INFO", WARNING="WARNING"))), - patch("gateway.platforms.feishu.EventDispatcherHandler", object()), + patch("gateway.platforms.feishu.EventDispatcherHandler") as mock_handler_class, patch("gateway.platforms.feishu.FeishuWSClient", return_value=ws_client), patch("gateway.platforms.feishu.acquire_scoped_lock", return_value=(True, None)), patch("gateway.platforms.feishu.release_scoped_lock"), @@ -369,6 +382,15 @@ class TestFeishuAdapterMessaging(unittest.TestCase): patch("gateway.platforms.feishu.asyncio.sleep", side_effect=lambda delay: sleeps.append(delay)), patch.object(adapter, "_build_lark_client", return_value=SimpleNamespace()), ): + mock_builder = Mock() + mock_builder.register_p2_im_message_message_read_v1 = Mock(return_value=mock_builder) + mock_builder.register_p2_im_message_receive_v1 = Mock(return_value=mock_builder) + mock_builder.register_p2_im_message_reaction_created_v1 = Mock(return_value=mock_builder) + mock_builder.register_p2_im_message_reaction_deleted_v1 = Mock(return_value=mock_builder) + mock_builder.register_p2_card_action_trigger = Mock(return_value=mock_builder) + mock_builder.build = Mock(return_value=object()) + mock_handler_class.builder = Mock(return_value=mock_builder) + loop = asyncio.new_event_loop() future = loop.create_future() future.set_result(None) @@ -383,6 +405,9 @@ class TestFeishuAdapterMessaging(unittest.TestCase): raise OSError("temporary websocket failure") return future + def is_closed(self): + return False + fake_loop = _Loop() try: with patch("gateway.platforms.feishu.asyncio.get_running_loop", return_value=fake_loop): @@ -1196,7 +1221,12 @@ class TestAdapterBehavior(unittest.TestCase): from gateway.platforms.feishu import FeishuAdapter adapter = FeishuAdapter(PlatformConfig()) - adapter._loop = object() + + class _Loop: + def is_closed(self): + return False + + adapter._loop = _Loop() message = SimpleNamespace( message_id="om_text", @@ -1210,6 +1240,7 @@ class TestAdapterBehavior(unittest.TestCase): data = SimpleNamespace(event=SimpleNamespace(message=message, sender=sender)) future = SimpleNamespace(add_done_callback=lambda *_args, **_kwargs: None) + def _submit(coro, _loop): coro.close() return future @@ -1219,6 +1250,30 @@ class TestAdapterBehavior(unittest.TestCase): self.assertTrue(submit.called) + @patch.dict(os.environ, {}, clear=True) + def test_webhook_request_uses_same_message_dispatch_path(self): + from gateway.config import PlatformConfig + from gateway.platforms.feishu import FeishuAdapter + + adapter = FeishuAdapter(PlatformConfig()) + adapter._on_message_event = Mock() + + body = json.dumps({ + "header": {"event_type": "im.message.receive_v1"}, + "event": {"message": {"message_id": "om_test"}}, + }).encode("utf-8") + request = SimpleNamespace( + remote="127.0.0.1", + content_length=None, + headers={}, + read=AsyncMock(return_value=body), + ) + + response = asyncio.run(adapter._handle_webhook_request(request)) + + self.assertEqual(response.status, 200) + adapter._on_message_event.assert_called_once() + @patch.dict(os.environ, {}, clear=True) def test_process_inbound_message_uses_event_sender_identity_only(self): from gateway.config import PlatformConfig