diff --git a/gateway/platforms/feishu.py b/gateway/platforms/feishu.py index bee8b01d8..de4d97e64 100644 --- a/gateway/platforms/feishu.py +++ b/gateway/platforms/feishu.py @@ -913,14 +913,33 @@ def _unique_lines(lines: List[str]) -> List[str]: return unique -def _run_official_feishu_ws_client(ws_client: Any) -> None: +def _run_official_feishu_ws_client(ws_client: Any, adapter: Any) -> None: """Run the official Lark WS client in its own thread-local event loop.""" import lark_oapi.ws.client as ws_client_module loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) ws_client_module.loop = loop - ws_client.start() + adapter._ws_thread_loop = loop + try: + ws_client.start() + except Exception: + pass + finally: + pending = [t for t in asyncio.all_tasks(loop) if not t.done()] + for task in pending: + task.cancel() + if pending: + loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) + try: + loop.stop() + except Exception: + pass + try: + loop.close() + except Exception: + pass + adapter._ws_thread_loop = None def check_feishu_requirements() -> bool: @@ -945,10 +964,11 @@ class FeishuAdapter(BasePlatformAdapter): self._client: Optional[Any] = None self._ws_client: Optional[Any] = None self._ws_future: Optional[asyncio.Future] = None + self._ws_thread_loop: Optional[asyncio.AbstractEventLoop] = None self._loop: Optional[asyncio.AbstractEventLoop] = None self._webhook_runner: Optional[Any] = None self._webhook_site: Optional[Any] = None - self._event_handler = self._build_event_handler() + self._event_handler: Optional[Any] = None self._seen_message_ids: Dict[str, float] = {} # message_id → seen_at (time.time()) self._seen_message_order: List[str] = [] self._dedup_state_path = get_hermes_home() / "feishu_seen_message_ids.json" @@ -1116,8 +1136,37 @@ class FeishuAdapter(BasePlatformAdapter): self._reset_batch_buffers() self._disable_websocket_auto_reconnect() await self._stop_webhook_server() + + ws_thread_loop = self._ws_thread_loop + if ws_thread_loop is not None and not ws_thread_loop.is_closed(): + logger.debug("[Feishu] Cancelling websocket thread tasks and stopping loop") + + def cancel_all_tasks() -> None: + tasks = [t for t in asyncio.all_tasks(ws_thread_loop) if not t.done()] + logger.debug("[Feishu] Found %d pending tasks in websocket thread", len(tasks)) + for task in tasks: + task.cancel() + ws_thread_loop.call_later(0.1, ws_thread_loop.stop) + + ws_thread_loop.call_soon_threadsafe(cancel_all_tasks) + + ws_future = self._ws_future + if ws_future is not None: + try: + logger.debug("[Feishu] Waiting for websocket thread to exit (timeout=10s)") + await asyncio.wait_for(asyncio.shield(ws_future), timeout=10.0) + logger.debug("[Feishu] Websocket thread exited cleanly") + except asyncio.TimeoutError: + logger.warning("[Feishu] Websocket thread did not exit within 10s - may be stuck") + except asyncio.CancelledError: + logger.debug("[Feishu] Websocket thread cancelled during disconnect") + except Exception as exc: + logger.debug("[Feishu] Websocket thread exited with error: %s", exc, exc_info=True) + self._ws_future = None + self._ws_thread_loop = None self._loop = None + self._event_handler = None self._persist_seen_message_ids() await self._release_app_lock() @@ -1476,12 +1525,13 @@ class FeishuAdapter(BasePlatformAdapter): def _on_message_event(self, data: Any) -> None: """Normalize Feishu inbound events into MessageEvent.""" - if self._loop is None: + loop = self._loop + if loop is None or bool(getattr(loop, "is_closed", lambda: False)()): logger.warning("[Feishu] Dropping inbound message before adapter loop is ready") return future = asyncio.run_coroutine_threadsafe( self._handle_message_event_data(data), - self._loop, + loop, ) future.add_done_callback(self._log_background_failure) @@ -1553,27 +1603,30 @@ class FeishuAdapter(BasePlatformAdapter): ) # Only process reactions from real users. Ignore app/bot-generated reactions # and Hermes' own ACK emoji to avoid feedback loops. + loop = self._loop if ( operator_type in {"bot", "app"} or emoji_type == _FEISHU_ACK_EMOJI or not message_id - or self._loop is None + or loop is None + or bool(getattr(loop, "is_closed", lambda: False)()) ): return future = asyncio.run_coroutine_threadsafe( self._handle_reaction_event(event_type, data), - self._loop, + loop, ) future.add_done_callback(self._log_background_failure) def _on_card_action_trigger(self, data: Any) -> Any: """Schedule Feishu card actions on the adapter loop and acknowledge immediately.""" - if self._loop is None: + loop = self._loop + if loop is None or bool(getattr(loop, "is_closed", lambda: False)()): logger.warning("[Feishu] Dropping card action before adapter loop is ready") else: future = asyncio.run_coroutine_threadsafe( self._handle_card_action_event(data), - self._loop, + loop, ) future.add_done_callback(self._log_background_failure) if P2CardActionTriggerResponse is None: @@ -2083,7 +2136,7 @@ class FeishuAdapter(BasePlatformAdapter): event_type = str((payload.get("header") or {}).get("event_type") or "") data = self._namespace_from_mapping(payload) if event_type == "im.message.receive_v1": - await self._handle_message_event_data(data) + self._on_message_event(data) elif event_type == "im.message.message_read_v1": self._on_message_read_event(data) elif event_type == "im.chat.member.bot.added_v1": @@ -2093,7 +2146,7 @@ class FeishuAdapter(BasePlatformAdapter): elif event_type in ("im.message.reaction.created_v1", "im.message.reaction.deleted_v1"): self._on_reaction_event(event_type, data) elif event_type == "card.action.trigger": - asyncio.ensure_future(self._handle_card_action_event(data)) + self._on_card_action_trigger(data) else: logger.debug("[Feishu] Ignoring webhook event type: %s", event_type or "unknown") return web.json_response({"code": 0, "msg": "ok"}) @@ -2965,6 +3018,12 @@ class FeishuAdapter(BasePlatformAdapter): raise RuntimeError("websockets not installed; websocket mode unavailable") domain = FEISHU_DOMAIN if self._domain_name != "lark" else LARK_DOMAIN self._client = self._build_lark_client(domain) + self._event_handler = self._build_event_handler() + if self._event_handler is None: + raise RuntimeError("failed to build Feishu event handler") + loop = self._loop + if loop is None or loop.is_closed(): + raise RuntimeError("adapter loop is not ready") await self._hydrate_bot_identity() self._ws_client = FeishuWSClient( app_id=self._app_id, @@ -2973,10 +3032,11 @@ class FeishuAdapter(BasePlatformAdapter): event_handler=self._event_handler, domain=domain, ) - self._ws_future = self._loop.run_in_executor( + self._ws_future = loop.run_in_executor( None, _run_official_feishu_ws_client, self._ws_client, + self, ) async def _connect_webhook(self) -> None: @@ -2984,6 +3044,9 @@ class FeishuAdapter(BasePlatformAdapter): raise RuntimeError("aiohttp not installed; webhook mode unavailable") domain = FEISHU_DOMAIN if self._domain_name != "lark" else LARK_DOMAIN self._client = self._build_lark_client(domain) + self._event_handler = self._build_event_handler() + if self._event_handler is None: + raise RuntimeError("failed to build Feishu event handler") await self._hydrate_bot_identity() app = web.Application() app.router.add_post(self._webhook_path, self._handle_webhook_request) diff --git a/tests/gateway/test_feishu.py b/tests/gateway/test_feishu.py index 5344cda52..41f92d60a 100644 --- a/tests/gateway/test_feishu.py +++ b/tests/gateway/test_feishu.py @@ -8,7 +8,7 @@ import time import unittest from pathlib import Path from types import SimpleNamespace -from unittest.mock import AsyncMock, patch +from unittest.mock import AsyncMock, Mock, patch try: import lark_oapi @@ -289,7 +289,7 @@ class TestFeishuAdapterMessaging(unittest.TestCase): patch("gateway.platforms.feishu.FEISHU_AVAILABLE", True), patch("gateway.platforms.feishu.FEISHU_WEBSOCKET_AVAILABLE", True), patch("gateway.platforms.feishu.lark", SimpleNamespace(LogLevel=SimpleNamespace(INFO="INFO", WARNING="WARNING"))), - patch("gateway.platforms.feishu.EventDispatcherHandler", object()), + patch("gateway.platforms.feishu.EventDispatcherHandler") as mock_handler_class, patch("gateway.platforms.feishu.FeishuWSClient", return_value=ws_client), patch("gateway.platforms.feishu._run_official_feishu_ws_client"), patch("gateway.platforms.feishu.acquire_scoped_lock", return_value=(True, None)) as acquire_lock, @@ -297,6 +297,15 @@ class TestFeishuAdapterMessaging(unittest.TestCase): patch.object(adapter, "_hydrate_bot_identity", new=AsyncMock()), patch.object(adapter, "_build_lark_client", return_value=SimpleNamespace()), ): + mock_builder = Mock() + mock_builder.register_p2_im_message_message_read_v1 = Mock(return_value=mock_builder) + mock_builder.register_p2_im_message_receive_v1 = Mock(return_value=mock_builder) + mock_builder.register_p2_im_message_reaction_created_v1 = Mock(return_value=mock_builder) + mock_builder.register_p2_im_message_reaction_deleted_v1 = Mock(return_value=mock_builder) + mock_builder.register_p2_card_action_trigger = Mock(return_value=mock_builder) + mock_builder.build = Mock(return_value=object()) + mock_handler_class.builder = Mock(return_value=mock_builder) + loop = asyncio.new_event_loop() future = loop.create_future() future.set_result(None) @@ -305,6 +314,9 @@ class TestFeishuAdapterMessaging(unittest.TestCase): def run_in_executor(self, *_args, **_kwargs): return future + def is_closed(self): + return False + try: with patch("gateway.platforms.feishu.asyncio.get_running_loop", return_value=_Loop()): connected = asyncio.run(adapter.connect()) @@ -313,6 +325,7 @@ class TestFeishuAdapterMessaging(unittest.TestCase): loop.close() self.assertTrue(connected) + self.assertIsNone(adapter._event_handler) acquire_lock.assert_called_once_with( "feishu-app-id", "cli_app", @@ -361,7 +374,7 @@ class TestFeishuAdapterMessaging(unittest.TestCase): patch("gateway.platforms.feishu.FEISHU_AVAILABLE", True), patch("gateway.platforms.feishu.FEISHU_WEBSOCKET_AVAILABLE", True), patch("gateway.platforms.feishu.lark", SimpleNamespace(LogLevel=SimpleNamespace(INFO="INFO", WARNING="WARNING"))), - patch("gateway.platforms.feishu.EventDispatcherHandler", object()), + patch("gateway.platforms.feishu.EventDispatcherHandler") as mock_handler_class, patch("gateway.platforms.feishu.FeishuWSClient", return_value=ws_client), patch("gateway.platforms.feishu.acquire_scoped_lock", return_value=(True, None)), patch("gateway.platforms.feishu.release_scoped_lock"), @@ -369,6 +382,15 @@ class TestFeishuAdapterMessaging(unittest.TestCase): patch("gateway.platforms.feishu.asyncio.sleep", side_effect=lambda delay: sleeps.append(delay)), patch.object(adapter, "_build_lark_client", return_value=SimpleNamespace()), ): + mock_builder = Mock() + mock_builder.register_p2_im_message_message_read_v1 = Mock(return_value=mock_builder) + mock_builder.register_p2_im_message_receive_v1 = Mock(return_value=mock_builder) + mock_builder.register_p2_im_message_reaction_created_v1 = Mock(return_value=mock_builder) + mock_builder.register_p2_im_message_reaction_deleted_v1 = Mock(return_value=mock_builder) + mock_builder.register_p2_card_action_trigger = Mock(return_value=mock_builder) + mock_builder.build = Mock(return_value=object()) + mock_handler_class.builder = Mock(return_value=mock_builder) + loop = asyncio.new_event_loop() future = loop.create_future() future.set_result(None) @@ -383,6 +405,9 @@ class TestFeishuAdapterMessaging(unittest.TestCase): raise OSError("temporary websocket failure") return future + def is_closed(self): + return False + fake_loop = _Loop() try: with patch("gateway.platforms.feishu.asyncio.get_running_loop", return_value=fake_loop): @@ -1196,7 +1221,12 @@ class TestAdapterBehavior(unittest.TestCase): from gateway.platforms.feishu import FeishuAdapter adapter = FeishuAdapter(PlatformConfig()) - adapter._loop = object() + + class _Loop: + def is_closed(self): + return False + + adapter._loop = _Loop() message = SimpleNamespace( message_id="om_text", @@ -1210,6 +1240,7 @@ class TestAdapterBehavior(unittest.TestCase): data = SimpleNamespace(event=SimpleNamespace(message=message, sender=sender)) future = SimpleNamespace(add_done_callback=lambda *_args, **_kwargs: None) + def _submit(coro, _loop): coro.close() return future @@ -1219,6 +1250,30 @@ class TestAdapterBehavior(unittest.TestCase): self.assertTrue(submit.called) + @patch.dict(os.environ, {}, clear=True) + def test_webhook_request_uses_same_message_dispatch_path(self): + from gateway.config import PlatformConfig + from gateway.platforms.feishu import FeishuAdapter + + adapter = FeishuAdapter(PlatformConfig()) + adapter._on_message_event = Mock() + + body = json.dumps({ + "header": {"event_type": "im.message.receive_v1"}, + "event": {"message": {"message_id": "om_test"}}, + }).encode("utf-8") + request = SimpleNamespace( + remote="127.0.0.1", + content_length=None, + headers={}, + read=AsyncMock(return_value=body), + ) + + response = asyncio.run(adapter._handle_webhook_request(request)) + + self.assertEqual(response.status, 200) + adapter._on_message_event.assert_called_once() + @patch.dict(os.environ, {}, clear=True) def test_process_inbound_message_uses_event_sender_identity_only(self): from gateway.config import PlatformConfig