fix(gateway): fix Feishu reconnect message drops and shutdown hang
This commit fixes two critical bugs in the Feishu adapter that affect message reliability and process lifecycle. **Bug Fix 1: Intermittent Message Drops** Root cause: Event handler was created once in __init__ and reused across reconnects, causing callbacks to capture stale loop references. When the adapter disconnected and reconnected, old callbacks continued firing with invalid loop references, resulting in dropped messages with warnings: "[Feishu] Dropping inbound message before adapter loop is ready" Fix: - Rebuild event handler on each connect (websocket/webhook) - Clear handler on disconnect - Ensure callbacks always capture current valid loop - Add defensive loop.is_closed() checks with getattr for test compatibility - Unify webhook dispatch path to use same loop checks as websocket mode **Bug Fix 2: Process Hangs on Ctrl+C / SIGTERM** Root cause: Feishu SDK's websocket client runs in a background thread with an infinite _select() loop that never exits naturally. The thread was never properly joined on disconnect, causing processes to hang indefinitely after Ctrl+C or gateway stop commands. Fix: - Store reference to thread-local event loop (_ws_thread_loop) - On disconnect, cancel all tasks in thread loop and stop it gracefully via call_soon_threadsafe() - Await thread future with 10s timeout - Clean up pending tasks in thread's finally block before closing loop - Add detailed debug logging for disconnect flow **Additional Improvements:** - Add regression tests for disconnect cleanup and webhook dispatch - Ensure all event callbacks check loop readiness before dispatching Tested on Linux with websocket mode. All Feishu tests pass. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -913,14 +913,33 @@ def _unique_lines(lines: List[str]) -> List[str]:
|
||||
return unique
|
||||
|
||||
|
||||
def _run_official_feishu_ws_client(ws_client: Any) -> None:
|
||||
def _run_official_feishu_ws_client(ws_client: Any, adapter: Any) -> None:
|
||||
"""Run the official Lark WS client in its own thread-local event loop."""
|
||||
import lark_oapi.ws.client as ws_client_module
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
ws_client_module.loop = loop
|
||||
ws_client.start()
|
||||
adapter._ws_thread_loop = loop
|
||||
try:
|
||||
ws_client.start()
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
pending = [t for t in asyncio.all_tasks(loop) if not t.done()]
|
||||
for task in pending:
|
||||
task.cancel()
|
||||
if pending:
|
||||
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
|
||||
try:
|
||||
loop.stop()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
loop.close()
|
||||
except Exception:
|
||||
pass
|
||||
adapter._ws_thread_loop = None
|
||||
|
||||
|
||||
def check_feishu_requirements() -> bool:
|
||||
@@ -945,10 +964,11 @@ class FeishuAdapter(BasePlatformAdapter):
|
||||
self._client: Optional[Any] = None
|
||||
self._ws_client: Optional[Any] = None
|
||||
self._ws_future: Optional[asyncio.Future] = None
|
||||
self._ws_thread_loop: Optional[asyncio.AbstractEventLoop] = None
|
||||
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
||||
self._webhook_runner: Optional[Any] = None
|
||||
self._webhook_site: Optional[Any] = None
|
||||
self._event_handler = self._build_event_handler()
|
||||
self._event_handler: Optional[Any] = None
|
||||
self._seen_message_ids: Dict[str, float] = {} # message_id → seen_at (time.time())
|
||||
self._seen_message_order: List[str] = []
|
||||
self._dedup_state_path = get_hermes_home() / "feishu_seen_message_ids.json"
|
||||
@@ -1116,8 +1136,37 @@ class FeishuAdapter(BasePlatformAdapter):
|
||||
self._reset_batch_buffers()
|
||||
self._disable_websocket_auto_reconnect()
|
||||
await self._stop_webhook_server()
|
||||
|
||||
ws_thread_loop = self._ws_thread_loop
|
||||
if ws_thread_loop is not None and not ws_thread_loop.is_closed():
|
||||
logger.debug("[Feishu] Cancelling websocket thread tasks and stopping loop")
|
||||
|
||||
def cancel_all_tasks() -> None:
|
||||
tasks = [t for t in asyncio.all_tasks(ws_thread_loop) if not t.done()]
|
||||
logger.debug("[Feishu] Found %d pending tasks in websocket thread", len(tasks))
|
||||
for task in tasks:
|
||||
task.cancel()
|
||||
ws_thread_loop.call_later(0.1, ws_thread_loop.stop)
|
||||
|
||||
ws_thread_loop.call_soon_threadsafe(cancel_all_tasks)
|
||||
|
||||
ws_future = self._ws_future
|
||||
if ws_future is not None:
|
||||
try:
|
||||
logger.debug("[Feishu] Waiting for websocket thread to exit (timeout=10s)")
|
||||
await asyncio.wait_for(asyncio.shield(ws_future), timeout=10.0)
|
||||
logger.debug("[Feishu] Websocket thread exited cleanly")
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("[Feishu] Websocket thread did not exit within 10s - may be stuck")
|
||||
except asyncio.CancelledError:
|
||||
logger.debug("[Feishu] Websocket thread cancelled during disconnect")
|
||||
except Exception as exc:
|
||||
logger.debug("[Feishu] Websocket thread exited with error: %s", exc, exc_info=True)
|
||||
|
||||
self._ws_future = None
|
||||
self._ws_thread_loop = None
|
||||
self._loop = None
|
||||
self._event_handler = None
|
||||
self._persist_seen_message_ids()
|
||||
await self._release_app_lock()
|
||||
|
||||
@@ -1476,12 +1525,13 @@ class FeishuAdapter(BasePlatformAdapter):
|
||||
|
||||
def _on_message_event(self, data: Any) -> None:
|
||||
"""Normalize Feishu inbound events into MessageEvent."""
|
||||
if self._loop is None:
|
||||
loop = self._loop
|
||||
if loop is None or bool(getattr(loop, "is_closed", lambda: False)()):
|
||||
logger.warning("[Feishu] Dropping inbound message before adapter loop is ready")
|
||||
return
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
self._handle_message_event_data(data),
|
||||
self._loop,
|
||||
loop,
|
||||
)
|
||||
future.add_done_callback(self._log_background_failure)
|
||||
|
||||
@@ -1553,27 +1603,30 @@ class FeishuAdapter(BasePlatformAdapter):
|
||||
)
|
||||
# Only process reactions from real users. Ignore app/bot-generated reactions
|
||||
# and Hermes' own ACK emoji to avoid feedback loops.
|
||||
loop = self._loop
|
||||
if (
|
||||
operator_type in {"bot", "app"}
|
||||
or emoji_type == _FEISHU_ACK_EMOJI
|
||||
or not message_id
|
||||
or self._loop is None
|
||||
or loop is None
|
||||
or bool(getattr(loop, "is_closed", lambda: False)())
|
||||
):
|
||||
return
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
self._handle_reaction_event(event_type, data),
|
||||
self._loop,
|
||||
loop,
|
||||
)
|
||||
future.add_done_callback(self._log_background_failure)
|
||||
|
||||
def _on_card_action_trigger(self, data: Any) -> Any:
|
||||
"""Schedule Feishu card actions on the adapter loop and acknowledge immediately."""
|
||||
if self._loop is None:
|
||||
loop = self._loop
|
||||
if loop is None or bool(getattr(loop, "is_closed", lambda: False)()):
|
||||
logger.warning("[Feishu] Dropping card action before adapter loop is ready")
|
||||
else:
|
||||
future = asyncio.run_coroutine_threadsafe(
|
||||
self._handle_card_action_event(data),
|
||||
self._loop,
|
||||
loop,
|
||||
)
|
||||
future.add_done_callback(self._log_background_failure)
|
||||
if P2CardActionTriggerResponse is None:
|
||||
@@ -2083,7 +2136,7 @@ class FeishuAdapter(BasePlatformAdapter):
|
||||
event_type = str((payload.get("header") or {}).get("event_type") or "")
|
||||
data = self._namespace_from_mapping(payload)
|
||||
if event_type == "im.message.receive_v1":
|
||||
await self._handle_message_event_data(data)
|
||||
self._on_message_event(data)
|
||||
elif event_type == "im.message.message_read_v1":
|
||||
self._on_message_read_event(data)
|
||||
elif event_type == "im.chat.member.bot.added_v1":
|
||||
@@ -2093,7 +2146,7 @@ class FeishuAdapter(BasePlatformAdapter):
|
||||
elif event_type in ("im.message.reaction.created_v1", "im.message.reaction.deleted_v1"):
|
||||
self._on_reaction_event(event_type, data)
|
||||
elif event_type == "card.action.trigger":
|
||||
asyncio.ensure_future(self._handle_card_action_event(data))
|
||||
self._on_card_action_trigger(data)
|
||||
else:
|
||||
logger.debug("[Feishu] Ignoring webhook event type: %s", event_type or "unknown")
|
||||
return web.json_response({"code": 0, "msg": "ok"})
|
||||
@@ -2965,6 +3018,12 @@ class FeishuAdapter(BasePlatformAdapter):
|
||||
raise RuntimeError("websockets not installed; websocket mode unavailable")
|
||||
domain = FEISHU_DOMAIN if self._domain_name != "lark" else LARK_DOMAIN
|
||||
self._client = self._build_lark_client(domain)
|
||||
self._event_handler = self._build_event_handler()
|
||||
if self._event_handler is None:
|
||||
raise RuntimeError("failed to build Feishu event handler")
|
||||
loop = self._loop
|
||||
if loop is None or loop.is_closed():
|
||||
raise RuntimeError("adapter loop is not ready")
|
||||
await self._hydrate_bot_identity()
|
||||
self._ws_client = FeishuWSClient(
|
||||
app_id=self._app_id,
|
||||
@@ -2973,10 +3032,11 @@ class FeishuAdapter(BasePlatformAdapter):
|
||||
event_handler=self._event_handler,
|
||||
domain=domain,
|
||||
)
|
||||
self._ws_future = self._loop.run_in_executor(
|
||||
self._ws_future = loop.run_in_executor(
|
||||
None,
|
||||
_run_official_feishu_ws_client,
|
||||
self._ws_client,
|
||||
self,
|
||||
)
|
||||
|
||||
async def _connect_webhook(self) -> None:
|
||||
@@ -2984,6 +3044,9 @@ class FeishuAdapter(BasePlatformAdapter):
|
||||
raise RuntimeError("aiohttp not installed; webhook mode unavailable")
|
||||
domain = FEISHU_DOMAIN if self._domain_name != "lark" else LARK_DOMAIN
|
||||
self._client = self._build_lark_client(domain)
|
||||
self._event_handler = self._build_event_handler()
|
||||
if self._event_handler is None:
|
||||
raise RuntimeError("failed to build Feishu event handler")
|
||||
await self._hydrate_bot_identity()
|
||||
app = web.Application()
|
||||
app.router.add_post(self._webhook_path, self._handle_webhook_request)
|
||||
|
||||
@@ -8,7 +8,7 @@ import time
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock, patch
|
||||
from unittest.mock import AsyncMock, Mock, patch
|
||||
|
||||
try:
|
||||
import lark_oapi
|
||||
@@ -289,7 +289,7 @@ class TestFeishuAdapterMessaging(unittest.TestCase):
|
||||
patch("gateway.platforms.feishu.FEISHU_AVAILABLE", True),
|
||||
patch("gateway.platforms.feishu.FEISHU_WEBSOCKET_AVAILABLE", True),
|
||||
patch("gateway.platforms.feishu.lark", SimpleNamespace(LogLevel=SimpleNamespace(INFO="INFO", WARNING="WARNING"))),
|
||||
patch("gateway.platforms.feishu.EventDispatcherHandler", object()),
|
||||
patch("gateway.platforms.feishu.EventDispatcherHandler") as mock_handler_class,
|
||||
patch("gateway.platforms.feishu.FeishuWSClient", return_value=ws_client),
|
||||
patch("gateway.platforms.feishu._run_official_feishu_ws_client"),
|
||||
patch("gateway.platforms.feishu.acquire_scoped_lock", return_value=(True, None)) as acquire_lock,
|
||||
@@ -297,6 +297,15 @@ class TestFeishuAdapterMessaging(unittest.TestCase):
|
||||
patch.object(adapter, "_hydrate_bot_identity", new=AsyncMock()),
|
||||
patch.object(adapter, "_build_lark_client", return_value=SimpleNamespace()),
|
||||
):
|
||||
mock_builder = Mock()
|
||||
mock_builder.register_p2_im_message_message_read_v1 = Mock(return_value=mock_builder)
|
||||
mock_builder.register_p2_im_message_receive_v1 = Mock(return_value=mock_builder)
|
||||
mock_builder.register_p2_im_message_reaction_created_v1 = Mock(return_value=mock_builder)
|
||||
mock_builder.register_p2_im_message_reaction_deleted_v1 = Mock(return_value=mock_builder)
|
||||
mock_builder.register_p2_card_action_trigger = Mock(return_value=mock_builder)
|
||||
mock_builder.build = Mock(return_value=object())
|
||||
mock_handler_class.builder = Mock(return_value=mock_builder)
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
future = loop.create_future()
|
||||
future.set_result(None)
|
||||
@@ -305,6 +314,9 @@ class TestFeishuAdapterMessaging(unittest.TestCase):
|
||||
def run_in_executor(self, *_args, **_kwargs):
|
||||
return future
|
||||
|
||||
def is_closed(self):
|
||||
return False
|
||||
|
||||
try:
|
||||
with patch("gateway.platforms.feishu.asyncio.get_running_loop", return_value=_Loop()):
|
||||
connected = asyncio.run(adapter.connect())
|
||||
@@ -313,6 +325,7 @@ class TestFeishuAdapterMessaging(unittest.TestCase):
|
||||
loop.close()
|
||||
|
||||
self.assertTrue(connected)
|
||||
self.assertIsNone(adapter._event_handler)
|
||||
acquire_lock.assert_called_once_with(
|
||||
"feishu-app-id",
|
||||
"cli_app",
|
||||
@@ -361,7 +374,7 @@ class TestFeishuAdapterMessaging(unittest.TestCase):
|
||||
patch("gateway.platforms.feishu.FEISHU_AVAILABLE", True),
|
||||
patch("gateway.platforms.feishu.FEISHU_WEBSOCKET_AVAILABLE", True),
|
||||
patch("gateway.platforms.feishu.lark", SimpleNamespace(LogLevel=SimpleNamespace(INFO="INFO", WARNING="WARNING"))),
|
||||
patch("gateway.platforms.feishu.EventDispatcherHandler", object()),
|
||||
patch("gateway.platforms.feishu.EventDispatcherHandler") as mock_handler_class,
|
||||
patch("gateway.platforms.feishu.FeishuWSClient", return_value=ws_client),
|
||||
patch("gateway.platforms.feishu.acquire_scoped_lock", return_value=(True, None)),
|
||||
patch("gateway.platforms.feishu.release_scoped_lock"),
|
||||
@@ -369,6 +382,15 @@ class TestFeishuAdapterMessaging(unittest.TestCase):
|
||||
patch("gateway.platforms.feishu.asyncio.sleep", side_effect=lambda delay: sleeps.append(delay)),
|
||||
patch.object(adapter, "_build_lark_client", return_value=SimpleNamespace()),
|
||||
):
|
||||
mock_builder = Mock()
|
||||
mock_builder.register_p2_im_message_message_read_v1 = Mock(return_value=mock_builder)
|
||||
mock_builder.register_p2_im_message_receive_v1 = Mock(return_value=mock_builder)
|
||||
mock_builder.register_p2_im_message_reaction_created_v1 = Mock(return_value=mock_builder)
|
||||
mock_builder.register_p2_im_message_reaction_deleted_v1 = Mock(return_value=mock_builder)
|
||||
mock_builder.register_p2_card_action_trigger = Mock(return_value=mock_builder)
|
||||
mock_builder.build = Mock(return_value=object())
|
||||
mock_handler_class.builder = Mock(return_value=mock_builder)
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
future = loop.create_future()
|
||||
future.set_result(None)
|
||||
@@ -383,6 +405,9 @@ class TestFeishuAdapterMessaging(unittest.TestCase):
|
||||
raise OSError("temporary websocket failure")
|
||||
return future
|
||||
|
||||
def is_closed(self):
|
||||
return False
|
||||
|
||||
fake_loop = _Loop()
|
||||
try:
|
||||
with patch("gateway.platforms.feishu.asyncio.get_running_loop", return_value=fake_loop):
|
||||
@@ -1196,7 +1221,12 @@ class TestAdapterBehavior(unittest.TestCase):
|
||||
from gateway.platforms.feishu import FeishuAdapter
|
||||
|
||||
adapter = FeishuAdapter(PlatformConfig())
|
||||
adapter._loop = object()
|
||||
|
||||
class _Loop:
|
||||
def is_closed(self):
|
||||
return False
|
||||
|
||||
adapter._loop = _Loop()
|
||||
|
||||
message = SimpleNamespace(
|
||||
message_id="om_text",
|
||||
@@ -1210,6 +1240,7 @@ class TestAdapterBehavior(unittest.TestCase):
|
||||
data = SimpleNamespace(event=SimpleNamespace(message=message, sender=sender))
|
||||
|
||||
future = SimpleNamespace(add_done_callback=lambda *_args, **_kwargs: None)
|
||||
|
||||
def _submit(coro, _loop):
|
||||
coro.close()
|
||||
return future
|
||||
@@ -1219,6 +1250,30 @@ class TestAdapterBehavior(unittest.TestCase):
|
||||
|
||||
self.assertTrue(submit.called)
|
||||
|
||||
@patch.dict(os.environ, {}, clear=True)
|
||||
def test_webhook_request_uses_same_message_dispatch_path(self):
|
||||
from gateway.config import PlatformConfig
|
||||
from gateway.platforms.feishu import FeishuAdapter
|
||||
|
||||
adapter = FeishuAdapter(PlatformConfig())
|
||||
adapter._on_message_event = Mock()
|
||||
|
||||
body = json.dumps({
|
||||
"header": {"event_type": "im.message.receive_v1"},
|
||||
"event": {"message": {"message_id": "om_test"}},
|
||||
}).encode("utf-8")
|
||||
request = SimpleNamespace(
|
||||
remote="127.0.0.1",
|
||||
content_length=None,
|
||||
headers={},
|
||||
read=AsyncMock(return_value=body),
|
||||
)
|
||||
|
||||
response = asyncio.run(adapter._handle_webhook_request(request))
|
||||
|
||||
self.assertEqual(response.status, 200)
|
||||
adapter._on_message_event.assert_called_once()
|
||||
|
||||
@patch.dict(os.environ, {}, clear=True)
|
||||
def test_process_inbound_message_uses_event_sender_identity_only(self):
|
||||
from gateway.config import PlatformConfig
|
||||
|
||||
Reference in New Issue
Block a user