Files
hermes-agent/tests/gateway/test_feishu.py
Teknium ca4907dfbc feat(gateway): add Feishu/Lark platform support (#3817)
Adds Feishu (ByteDance's enterprise messaging platform) as a gateway
platform adapter with full feature parity: WebSocket + webhook transports,
message batching, dedup, rate limiting, rich post/card content parsing,
media handling (images/audio/files/video), group @mention gating,
reaction routing, and interactive card button support.

Cherry-picked from PR #1793 by penwyp with:
- Moved to current main (PR was 458 commits behind)
- Fixed _send_with_retry shadowing BasePlatformAdapter method (renamed to
  _feishu_send_with_retry to avoid signature mismatch crash)
- Fixed import structure: aiohttp/websockets imported independently of
  lark_oapi so they remain available when SDK is missing
- Fixed get_hermes_home import (hermes_constants, not hermes_cli.config)
- Added skip decorators for tests requiring lark_oapi SDK
- All 16 integration points added surgically to current main

New dependency: lark-oapi>=1.5.3,<2 (optional, pip install hermes-agent[feishu])

Fixes #1788

Co-authored-by: penwyp <penwyp@users.noreply.github.com>
2026-03-29 18:17:42 -07:00

2581 lines
100 KiB
Python

"""Tests for the Feishu gateway integration."""
import asyncio
import json
import os
import tempfile
import time
import unittest
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import AsyncMock, patch
try:
import lark_oapi
_HAS_LARK_OAPI = True
except ImportError:
_HAS_LARK_OAPI = False
class TestPlatformEnum(unittest.TestCase):
def test_feishu_in_platform_enum(self):
from gateway.config import Platform
self.assertEqual(Platform.FEISHU.value, "feishu")
class TestConfigEnvOverrides(unittest.TestCase):
@patch.dict(os.environ, {
"FEISHU_APP_ID": "cli_xxx",
"FEISHU_APP_SECRET": "secret_xxx",
"FEISHU_CONNECTION_MODE": "websocket",
"FEISHU_DOMAIN": "feishu",
}, clear=False)
def test_feishu_config_loaded_from_env(self):
from gateway.config import GatewayConfig, Platform, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
self.assertIn(Platform.FEISHU, config.platforms)
self.assertTrue(config.platforms[Platform.FEISHU].enabled)
self.assertEqual(config.platforms[Platform.FEISHU].extra["app_id"], "cli_xxx")
self.assertEqual(config.platforms[Platform.FEISHU].extra["connection_mode"], "websocket")
@patch.dict(os.environ, {
"FEISHU_APP_ID": "cli_xxx",
"FEISHU_APP_SECRET": "secret_xxx",
"FEISHU_HOME_CHANNEL": "oc_xxx",
}, clear=False)
def test_feishu_home_channel_loaded(self):
from gateway.config import GatewayConfig, Platform, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
home = config.platforms[Platform.FEISHU].home_channel
self.assertIsNotNone(home)
self.assertEqual(home.chat_id, "oc_xxx")
@patch.dict(os.environ, {
"FEISHU_APP_ID": "cli_xxx",
"FEISHU_APP_SECRET": "secret_xxx",
}, clear=False)
def test_feishu_in_connected_platforms(self):
from gateway.config import GatewayConfig, Platform, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
self.assertIn(Platform.FEISHU, config.get_connected_platforms())
class TestGatewayIntegration(unittest.TestCase):
def test_feishu_in_adapter_factory(self):
source = Path("gateway/run.py").read_text(encoding="utf-8")
self.assertIn("Platform.FEISHU", source)
self.assertIn("FeishuAdapter", source)
def test_feishu_in_authorization_maps(self):
source = Path("gateway/run.py").read_text(encoding="utf-8")
self.assertIn("FEISHU_ALLOWED_USERS", source)
self.assertIn("FEISHU_ALLOW_ALL_USERS", source)
def test_feishu_toolset_exists(self):
from toolsets import TOOLSETS
self.assertIn("hermes-feishu", TOOLSETS)
self.assertIn("hermes-feishu", TOOLSETS["hermes-gateway"]["includes"])
class TestFeishuPostParsing(unittest.TestCase):
def test_parse_post_content_extracts_text_mentions_and_media_refs(self):
from gateway.platforms.feishu import parse_feishu_post_content
result = parse_feishu_post_content(
json.dumps(
{
"en_us": {
"title": "Rich message",
"content": [
[{"tag": "img", "image_key": "img_1", "alt": "diagram"}],
[{"tag": "at", "user_name": "Alice", "open_id": "ou_alice"}],
[{"tag": "media", "file_key": "file_1", "file_name": "spec.pdf"}],
],
}
}
)
)
self.assertEqual(result.text_content, "Rich message\n[Image: diagram]\n@Alice\n[Attachment: spec.pdf]")
self.assertEqual(result.image_keys, ["img_1"])
self.assertEqual(result.mentioned_ids, ["ou_alice"])
self.assertEqual(len(result.media_refs), 1)
self.assertEqual(result.media_refs[0].file_key, "file_1")
self.assertEqual(result.media_refs[0].file_name, "spec.pdf")
self.assertEqual(result.media_refs[0].resource_type, "file")
def test_parse_post_content_uses_fallback_when_invalid(self):
from gateway.platforms.feishu import FALLBACK_POST_TEXT, parse_feishu_post_content
result = parse_feishu_post_content("not-json")
self.assertEqual(result.text_content, FALLBACK_POST_TEXT)
self.assertEqual(result.image_keys, [])
self.assertEqual(result.media_refs, [])
self.assertEqual(result.mentioned_ids, [])
def test_parse_post_content_preserves_rich_text_semantics(self):
from gateway.platforms.feishu import parse_feishu_post_content
result = parse_feishu_post_content(
json.dumps(
{
"en_us": {
"title": "Plan *v2*",
"content": [
[
{"tag": "text", "text": "Bold", "style": {"bold": True}},
{"tag": "text", "text": " "},
{"tag": "text", "text": "Italic", "style": {"italic": True}},
{"tag": "text", "text": " "},
{"tag": "text", "text": "Code", "style": {"code": True}},
],
[{"tag": "text", "text": "line1"}, {"tag": "br"}, {"tag": "text", "text": "line2"}],
[{"tag": "hr"}],
[{"tag": "code_block", "language": "python", "text": "print('hi')"}],
],
}
}
)
)
self.assertEqual(
result.text_content,
"Plan *v2*\n**Bold** *Italic* `Code`\nline1\nline2\n---\n```python\nprint('hi')\n```",
)
class TestFeishuMessageNormalization(unittest.TestCase):
def test_normalize_merge_forward_preserves_summary_lines(self):
from gateway.platforms.feishu import normalize_feishu_message
normalized = normalize_feishu_message(
message_type="merge_forward",
raw_content=json.dumps(
{
"title": "Sprint recap",
"messages": [
{"sender_name": "Alice", "text": "Please review PR-128"},
{
"sender_name": "Bob",
"message_type": "post",
"content": {
"en_us": {
"content": [[{"tag": "text", "text": "Ship it"}]],
}
},
},
],
}
),
)
self.assertEqual(normalized.relation_kind, "merge_forward")
self.assertEqual(
normalized.text_content,
"Sprint recap\n- Alice: Please review PR-128\n- Bob: Ship it",
)
def test_normalize_share_chat_exposes_summary_and_metadata(self):
from gateway.platforms.feishu import normalize_feishu_message
normalized = normalize_feishu_message(
message_type="share_chat",
raw_content=json.dumps(
{
"chat_id": "oc_chat_shared",
"chat_name": "Backend Guild",
}
),
)
self.assertEqual(normalized.relation_kind, "share_chat")
self.assertEqual(normalized.text_content, "Shared chat: Backend Guild\nChat ID: oc_chat_shared")
self.assertEqual(normalized.metadata["chat_id"], "oc_chat_shared")
self.assertEqual(normalized.metadata["chat_name"], "Backend Guild")
def test_normalize_interactive_card_preserves_title_body_and_actions(self):
from gateway.platforms.feishu import normalize_feishu_message
normalized = normalize_feishu_message(
message_type="interactive",
raw_content=json.dumps(
{
"card": {
"header": {"title": {"tag": "plain_text", "content": "Build Failed"}},
"elements": [
{"tag": "div", "text": {"tag": "lark_md", "content": "Service: payments-api"}},
{"tag": "div", "text": {"tag": "plain_text", "content": "Branch: main"}},
{
"tag": "action",
"actions": [
{"tag": "button", "text": {"tag": "plain_text", "content": "View Logs"}},
{"tag": "button", "text": {"tag": "plain_text", "content": "Retry"}},
],
},
],
}
}
),
)
self.assertEqual(normalized.relation_kind, "interactive")
self.assertEqual(
normalized.text_content,
"Build Failed\nService: payments-api\nBranch: main\nView Logs\nRetry\nActions: View Logs, Retry",
)
class TestFeishuAdapterMessaging(unittest.TestCase):
@patch.dict(os.environ, {
"FEISHU_APP_ID": "cli_app",
"FEISHU_APP_SECRET": "secret_app",
"FEISHU_CONNECTION_MODE": "webhook",
"FEISHU_WEBHOOK_HOST": "127.0.0.1",
"FEISHU_WEBHOOK_PORT": "9001",
"FEISHU_WEBHOOK_PATH": "/hook",
}, clear=True)
def test_connect_webhook_mode_starts_local_server(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
runner = AsyncMock()
site = AsyncMock()
web_module = SimpleNamespace(
Application=lambda: SimpleNamespace(router=SimpleNamespace(add_post=lambda *_args, **_kwargs: None)),
AppRunner=lambda _app: runner,
TCPSite=lambda _runner, host, port: SimpleNamespace(start=site.start, host=host, port=port),
)
with (
patch("gateway.platforms.feishu.FEISHU_AVAILABLE", True),
patch("gateway.platforms.feishu.FEISHU_WEBHOOK_AVAILABLE", True),
patch("gateway.platforms.feishu.acquire_scoped_lock", return_value=(True, None)),
patch("gateway.platforms.feishu.release_scoped_lock"),
patch.object(adapter, "_hydrate_bot_identity", new=AsyncMock()),
patch.object(adapter, "_build_lark_client", return_value=SimpleNamespace()),
patch("gateway.platforms.feishu.web", web_module),
):
connected = asyncio.run(adapter.connect())
self.assertTrue(connected)
runner.setup.assert_awaited_once()
site.start.assert_awaited_once()
@patch.dict(os.environ, {
"FEISHU_APP_ID": "cli_app",
"FEISHU_APP_SECRET": "secret_app",
}, clear=True)
def test_connect_acquires_scoped_lock_and_disconnect_releases_it(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
ws_client = object()
with (
patch("gateway.platforms.feishu.FEISHU_AVAILABLE", True),
patch("gateway.platforms.feishu.FEISHU_WEBSOCKET_AVAILABLE", True),
patch("gateway.platforms.feishu.lark", SimpleNamespace(LogLevel=SimpleNamespace(INFO="INFO", WARNING="WARNING"))),
patch("gateway.platforms.feishu.EventDispatcherHandler", object()),
patch("gateway.platforms.feishu.FeishuWSClient", return_value=ws_client),
patch("gateway.platforms.feishu._run_official_feishu_ws_client"),
patch("gateway.platforms.feishu.acquire_scoped_lock", return_value=(True, None)) as acquire_lock,
patch("gateway.platforms.feishu.release_scoped_lock") as release_lock,
patch.object(adapter, "_hydrate_bot_identity", new=AsyncMock()),
patch.object(adapter, "_build_lark_client", return_value=SimpleNamespace()),
):
loop = asyncio.new_event_loop()
future = loop.create_future()
future.set_result(None)
class _Loop:
def run_in_executor(self, *_args, **_kwargs):
return future
try:
with patch("gateway.platforms.feishu.asyncio.get_running_loop", return_value=_Loop()):
connected = asyncio.run(adapter.connect())
asyncio.run(adapter.disconnect())
finally:
loop.close()
self.assertTrue(connected)
acquire_lock.assert_called_once_with(
"feishu-app-id",
"cli_app",
metadata={"platform": "feishu"},
)
release_lock.assert_called_once_with("feishu-app-id", "cli_app")
@patch.dict(os.environ, {
"FEISHU_APP_ID": "cli_app",
"FEISHU_APP_SECRET": "secret_app",
}, clear=True)
def test_connect_rejects_existing_app_lock(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
with (
patch("gateway.platforms.feishu.FEISHU_AVAILABLE", True),
patch("gateway.platforms.feishu.FEISHU_WEBSOCKET_AVAILABLE", True),
patch(
"gateway.platforms.feishu.acquire_scoped_lock",
return_value=(False, {"pid": 4321}),
),
):
connected = asyncio.run(adapter.connect())
self.assertFalse(connected)
self.assertEqual(adapter.fatal_error_code, "feishu_app_lock")
self.assertFalse(adapter.fatal_error_retryable)
self.assertIn("PID 4321", adapter.fatal_error_message)
@patch.dict(os.environ, {
"FEISHU_APP_ID": "cli_app",
"FEISHU_APP_SECRET": "secret_app",
}, clear=True)
def test_connect_retries_transient_startup_failure(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
ws_client = object()
sleeps = []
with (
patch("gateway.platforms.feishu.FEISHU_AVAILABLE", True),
patch("gateway.platforms.feishu.FEISHU_WEBSOCKET_AVAILABLE", True),
patch("gateway.platforms.feishu.lark", SimpleNamespace(LogLevel=SimpleNamespace(INFO="INFO", WARNING="WARNING"))),
patch("gateway.platforms.feishu.EventDispatcherHandler", object()),
patch("gateway.platforms.feishu.FeishuWSClient", return_value=ws_client),
patch("gateway.platforms.feishu.acquire_scoped_lock", return_value=(True, None)),
patch("gateway.platforms.feishu.release_scoped_lock"),
patch.object(adapter, "_hydrate_bot_identity", new=AsyncMock()),
patch("gateway.platforms.feishu.asyncio.sleep", side_effect=lambda delay: sleeps.append(delay)),
patch.object(adapter, "_build_lark_client", return_value=SimpleNamespace()),
):
loop = asyncio.new_event_loop()
future = loop.create_future()
future.set_result(None)
class _Loop:
def __init__(self):
self.calls = 0
def run_in_executor(self, *_args, **_kwargs):
self.calls += 1
if self.calls == 1:
raise OSError("temporary websocket failure")
return future
fake_loop = _Loop()
try:
with patch("gateway.platforms.feishu.asyncio.get_running_loop", return_value=fake_loop):
connected = asyncio.run(adapter.connect())
finally:
loop.close()
self.assertTrue(connected)
self.assertEqual(sleeps, [1])
self.assertEqual(fake_loop.calls, 2)
@patch.dict(os.environ, {}, clear=True)
def test_edit_message_updates_existing_feishu_message(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
captured = {}
class _MessageAPI:
def update(self, request):
captured["request"] = request
return SimpleNamespace(success=lambda: True)
adapter._client = SimpleNamespace(
im=SimpleNamespace(
v1=SimpleNamespace(
message=_MessageAPI(),
)
)
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
result = asyncio.run(
adapter.edit_message(
chat_id="oc_chat",
message_id="om_progress",
content="📖 read_file: \"/tmp/image.png\"",
)
)
self.assertTrue(result.success)
self.assertEqual(result.message_id, "om_progress")
self.assertEqual(captured["request"].message_id, "om_progress")
self.assertEqual(captured["request"].request_body.msg_type, "text")
self.assertEqual(
captured["request"].request_body.content,
json.dumps({"text": "📖 read_file: \"/tmp/image.png\""}, ensure_ascii=False),
)
@patch.dict(os.environ, {}, clear=True)
def test_edit_message_falls_back_to_text_when_post_update_is_rejected(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
captured = {"calls": []}
class _MessageAPI:
def update(self, request):
captured["calls"].append(request)
if len(captured["calls"]) == 1:
return SimpleNamespace(success=lambda: False, code=230001, msg="content format of the post type is incorrect")
return SimpleNamespace(success=lambda: True)
adapter._client = SimpleNamespace(
im=SimpleNamespace(
v1=SimpleNamespace(
message=_MessageAPI(),
)
)
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
result = asyncio.run(
adapter.edit_message(
chat_id="oc_chat",
message_id="om_progress",
content="可以用 **粗体** 和 *斜体*。",
)
)
self.assertTrue(result.success)
self.assertEqual(captured["calls"][0].request_body.msg_type, "post")
self.assertEqual(captured["calls"][1].request_body.msg_type, "text")
self.assertEqual(
captured["calls"][1].request_body.content,
json.dumps({"text": "可以用 粗体 和 斜体。"}, ensure_ascii=False),
)
@patch.dict(os.environ, {}, clear=True)
def test_get_chat_info_uses_real_feishu_chat_api(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
class _ChatAPI:
def get(self, request):
self.request = request
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(name="Hermes Group", chat_type="group"),
)
chat_api = _ChatAPI()
adapter._client = SimpleNamespace(
im=SimpleNamespace(
v1=SimpleNamespace(
chat=chat_api,
)
)
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
info = asyncio.run(adapter.get_chat_info("oc_chat"))
self.assertEqual(chat_api.request.chat_id, "oc_chat")
self.assertEqual(info["chat_id"], "oc_chat")
self.assertEqual(info["name"], "Hermes Group")
self.assertEqual(info["type"], "group")
class TestAdapterModule(unittest.TestCase):
def test_adapter_requirement_helper_exists(self):
source = Path("gateway/platforms/feishu.py").read_text(encoding="utf-8")
self.assertIn("def check_feishu_requirements()", source)
self.assertIn("FEISHU_AVAILABLE", source)
def test_adapter_declares_websocket_scope(self):
source = Path("gateway/platforms/feishu.py").read_text(encoding="utf-8")
self.assertIn("Supported modes: websocket, webhook", source)
self.assertIn("FEISHU_CONNECTION_MODE", source)
def test_adapter_registers_message_read_noop_handler(self):
source = Path("gateway/platforms/feishu.py").read_text(encoding="utf-8")
self.assertIn("register_p2_im_message_message_read_v1", source)
self.assertIn("def _on_message_read_event", source)
def test_adapter_registers_reaction_and_card_handlers_for_websocket(self):
source = Path("gateway/platforms/feishu.py").read_text(encoding="utf-8")
self.assertIn("register_p2_im_message_reaction_created_v1", source)
self.assertIn("register_p2_im_message_reaction_deleted_v1", source)
self.assertIn("register_p2_card_action_trigger", source)
class TestAdapterBehavior(unittest.TestCase):
@patch.dict(os.environ, {}, clear=True)
def test_build_event_handler_registers_reaction_and_card_processors(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
calls = []
class _Builder:
def register_p2_im_message_message_read_v1(self, _handler):
calls.append("message_read")
return self
def register_p2_im_message_receive_v1(self, _handler):
calls.append("message_receive")
return self
def register_p2_im_message_reaction_created_v1(self, _handler):
calls.append("reaction_created")
return self
def register_p2_im_message_reaction_deleted_v1(self, _handler):
calls.append("reaction_deleted")
return self
def register_p2_card_action_trigger(self, _handler):
calls.append("card_action")
return self
def build(self):
calls.append("build")
return "handler"
class _Dispatcher:
@staticmethod
def builder(_encrypt_key, _verification_token):
calls.append("builder")
return _Builder()
with patch("gateway.platforms.feishu.EventDispatcherHandler", _Dispatcher):
handler = adapter._build_event_handler()
self.assertEqual(handler, "handler")
self.assertEqual(
calls,
[
"builder",
"message_read",
"message_receive",
"reaction_created",
"reaction_deleted",
"card_action",
"build",
],
)
@patch.dict(os.environ, {}, clear=True)
@unittest.skipUnless(_HAS_LARK_OAPI, "lark-oapi not installed")
def test_add_ack_reaction_uses_ok_emoji(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
captured = {}
class _ReactionAPI:
def create(self, request):
captured["request"] = request
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(reaction_id="r_typing"),
)
adapter._client = SimpleNamespace(
im=SimpleNamespace(v1=SimpleNamespace(message_reaction=_ReactionAPI()))
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
reaction_id = asyncio.run(adapter._add_ack_reaction("om_msg"))
self.assertEqual(reaction_id, "r_typing")
self.assertEqual(captured["request"].request_body.reaction_type["emoji_type"], "OK")
@patch.dict(os.environ, {}, clear=True)
def test_add_ack_reaction_logs_warning_on_failure(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
class _ReactionAPI:
def create(self, request):
raise RuntimeError("boom")
adapter._client = SimpleNamespace(
im=SimpleNamespace(v1=SimpleNamespace(message_reaction=_ReactionAPI()))
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
with (
patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct),
self.assertLogs("gateway.platforms.feishu", level="WARNING") as logs,
):
reaction_id = asyncio.run(adapter._add_ack_reaction("om_msg"))
self.assertIsNone(reaction_id)
self.assertTrue(
any("Failed to add ack reaction to om_msg" in entry for entry in logs.output),
logs.output,
)
@patch.dict(os.environ, {}, clear=True)
def test_ack_reaction_events_are_ignored_to_avoid_feedback_loops(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter._loop = object()
event = SimpleNamespace(
message_id="om_msg",
operator_type="user",
reaction_type=SimpleNamespace(emoji_type="OK"),
)
data = SimpleNamespace(event=event)
with patch("gateway.platforms.feishu.asyncio.run_coroutine_threadsafe") as run_threadsafe:
adapter._on_reaction_event("im.message.reaction.created_v1", data)
run_threadsafe.assert_not_called()
@patch.dict(os.environ, {}, clear=True)
def test_normalize_inbound_text_strips_feishu_mentions(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
cleaned = adapter._normalize_inbound_text("hi @_user_1 there @_user_2")
self.assertEqual(cleaned, "hi there")
@patch.dict(os.environ, {"FEISHU_GROUP_POLICY": "open"}, clear=True)
def test_group_message_requires_mentions_even_when_policy_open(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
message = SimpleNamespace(mentions=[])
sender_id = SimpleNamespace(open_id="ou_any", user_id=None)
self.assertFalse(adapter._should_accept_group_message(message, sender_id))
message_with_mention = SimpleNamespace(mentions=[SimpleNamespace(key="@_user_1")])
self.assertFalse(adapter._should_accept_group_message(message_with_mention, sender_id))
@patch.dict(os.environ, {"FEISHU_GROUP_POLICY": "open"}, clear=True)
def test_group_message_with_other_user_mention_is_rejected_when_bot_identity_unknown(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
sender_id = SimpleNamespace(open_id="ou_any", user_id=None)
other_mention = SimpleNamespace(
name="Other User",
id=SimpleNamespace(open_id="ou_other", user_id="u_other"),
)
self.assertFalse(adapter._should_accept_group_message(SimpleNamespace(mentions=[other_mention]), sender_id))
@patch.dict(
os.environ,
{
"FEISHU_GROUP_POLICY": "allowlist",
"FEISHU_ALLOWED_USERS": "ou_allowed",
"FEISHU_BOT_NAME": "Hermes Bot",
},
clear=True,
)
def test_group_message_allowlist_and_mention_both_required(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
mentioned = SimpleNamespace(
mentions=[
SimpleNamespace(
name="Hermes Bot",
id=SimpleNamespace(open_id="ou_other", user_id="u_other"),
)
]
)
self.assertTrue(
adapter._should_accept_group_message(
mentioned,
SimpleNamespace(open_id="ou_allowed", user_id=None),
)
)
self.assertFalse(
adapter._should_accept_group_message(
mentioned,
SimpleNamespace(open_id="ou_blocked", user_id=None),
)
)
@patch.dict(
os.environ,
{
"FEISHU_GROUP_POLICY": "open",
"FEISHU_BOT_OPEN_ID": "ou_bot",
},
clear=True,
)
def test_group_message_matches_bot_open_id_when_configured(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
sender_id = SimpleNamespace(open_id="ou_any", user_id=None)
bot_mention = SimpleNamespace(
name="Hermes",
id=SimpleNamespace(open_id="ou_bot", user_id="u_bot"),
)
other_mention = SimpleNamespace(
name="Other",
id=SimpleNamespace(open_id="ou_other", user_id="u_other"),
)
self.assertTrue(adapter._should_accept_group_message(SimpleNamespace(mentions=[bot_mention]), sender_id))
self.assertFalse(adapter._should_accept_group_message(SimpleNamespace(mentions=[other_mention]), sender_id))
@patch.dict(
os.environ,
{
"FEISHU_GROUP_POLICY": "open",
"FEISHU_BOT_NAME": "Hermes Bot",
},
clear=True,
)
def test_group_message_matches_bot_name_when_only_name_available(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
sender_id = SimpleNamespace(open_id="ou_any", user_id=None)
named_mention = SimpleNamespace(
name="Hermes Bot",
id=SimpleNamespace(open_id="ou_other", user_id="u_other"),
)
different_mention = SimpleNamespace(
name="Another Bot",
id=SimpleNamespace(open_id="ou_other", user_id="u_other"),
)
self.assertTrue(adapter._should_accept_group_message(SimpleNamespace(mentions=[named_mention]), sender_id))
self.assertFalse(adapter._should_accept_group_message(SimpleNamespace(mentions=[different_mention]), sender_id))
@patch.dict(
os.environ,
{
"FEISHU_GROUP_POLICY": "open",
"FEISHU_BOT_OPEN_ID": "ou_bot",
},
clear=True,
)
def test_group_post_message_uses_parsed_mentions_when_sdk_mentions_missing(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
sender_id = SimpleNamespace(open_id="ou_any", user_id=None)
message = SimpleNamespace(
message_type="post",
mentions=[],
content='{"en_us":{"content":[[{"tag":"at","user_name":"Hermes","open_id":"ou_bot"}]]}}',
)
self.assertTrue(adapter._should_accept_group_message(message, sender_id))
@patch.dict(os.environ, {}, clear=True)
def test_extract_post_message_as_text(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
message = SimpleNamespace(
message_type="post",
content='{"zh_cn":{"title":"Title","content":[[{"tag":"text","text":"hello "}],[{"tag":"a","text":"doc","href":"https://example.com"}]]}}',
message_id="om_post",
)
text, msg_type, media_urls, media_types = asyncio.run(adapter._extract_message_content(message))
self.assertEqual(text, "Title\nhello\n[doc](https://example.com)")
self.assertEqual(msg_type.value, "text")
self.assertEqual(media_urls, [])
self.assertEqual(media_types, [])
@patch.dict(os.environ, {}, clear=True)
def test_extract_post_message_uses_first_available_language_block(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
message = SimpleNamespace(
message_type="post",
content='{"fr_fr":{"title":"Subject","content":[[{"tag":"text","text":"bonjour"}]]}}',
message_id="om_post_fr",
)
text, msg_type, media_urls, media_types = asyncio.run(adapter._extract_message_content(message))
self.assertEqual(text, "Subject\nbonjour")
self.assertEqual(msg_type.value, "text")
self.assertEqual(media_urls, [])
self.assertEqual(media_types, [])
@patch.dict(os.environ, {}, clear=True)
def test_extract_post_message_with_rich_elements_does_not_drop_content(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
message = SimpleNamespace(
message_type="post",
content=(
'{"en_us":{"title":"Rich message","content":['
'[{"tag":"img","alt":"diagram"}],'
'[{"tag":"at","user_name":"Alice"},{"tag":"text","text":" please check the attachment"}],'
'[{"tag":"media","file_name":"spec.pdf"}],'
'[{"tag":"emotion","emoji_type":"smile"}]'
']}}'
),
message_id="om_post_rich",
)
text, msg_type, media_urls, media_types = asyncio.run(adapter._extract_message_content(message))
self.assertEqual(text, "Rich message\n[Image: diagram]\n@Alice please check the attachment\n[Attachment: spec.pdf]\n:smile:")
self.assertEqual(msg_type.value, "text")
self.assertEqual(media_urls, [])
self.assertEqual(media_types, [])
@patch.dict(os.environ, {}, clear=True)
def test_extract_post_message_downloads_embedded_resources(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter._download_feishu_image = AsyncMock(return_value=("/tmp/feishu-image.png", "image/png"))
adapter._download_feishu_message_resource = AsyncMock(return_value=("/tmp/spec.pdf", "application/pdf"))
message = SimpleNamespace(
message_type="post",
content=(
'{"en_us":{"title":"Rich message","content":['
'[{"tag":"img","image_key":"img_123","alt":"diagram"}],'
'[{"tag":"media","file_key":"file_123","file_name":"spec.pdf"}]'
']}}'
),
message_id="om_post_media",
)
text, msg_type, media_urls, media_types = asyncio.run(adapter._extract_message_content(message))
self.assertEqual(text, "Rich message\n[Image: diagram]\n[Attachment: spec.pdf]")
self.assertEqual(msg_type.value, "text")
self.assertEqual(media_urls, ["/tmp/feishu-image.png", "/tmp/spec.pdf"])
self.assertEqual(media_types, ["image/png", "application/pdf"])
adapter._download_feishu_image.assert_awaited_once_with(
message_id="om_post_media",
image_key="img_123",
)
adapter._download_feishu_message_resource.assert_awaited_once_with(
message_id="om_post_media",
file_key="file_123",
resource_type="file",
fallback_filename="spec.pdf",
)
@patch.dict(os.environ, {}, clear=True)
def test_extract_merge_forward_message_as_text_summary(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
message = SimpleNamespace(
message_type="merge_forward",
content=json.dumps(
{
"title": "Forwarded updates",
"messages": [
{"sender_name": "Alice", "text": "Investigating the incident"},
{"sender_name": "Bob", "text": "ETA 10 minutes"},
],
}
),
message_id="om_merge_forward",
)
text, msg_type, media_urls, media_types = asyncio.run(adapter._extract_message_content(message))
self.assertEqual(
text,
"Forwarded updates\n- Alice: Investigating the incident\n- Bob: ETA 10 minutes",
)
self.assertEqual(msg_type.value, "text")
self.assertEqual(media_urls, [])
self.assertEqual(media_types, [])
@patch.dict(os.environ, {}, clear=True)
def test_extract_share_chat_message_as_text_summary(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
message = SimpleNamespace(
message_type="share_chat",
content='{"chat_id":"oc_shared","chat_name":"Platform Ops"}',
message_id="om_share_chat",
)
text, msg_type, media_urls, media_types = asyncio.run(adapter._extract_message_content(message))
self.assertEqual(text, "Shared chat: Platform Ops\nChat ID: oc_shared")
self.assertEqual(msg_type.value, "text")
self.assertEqual(media_urls, [])
self.assertEqual(media_types, [])
@patch.dict(os.environ, {}, clear=True)
def test_extract_interactive_message_as_text_summary(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
message = SimpleNamespace(
message_type="interactive",
content=json.dumps(
{
"card": {
"header": {"title": {"tag": "plain_text", "content": "Approval Request"}},
"elements": [
{"tag": "div", "text": {"tag": "plain_text", "content": "Requester: Alice"}},
{
"tag": "action",
"actions": [
{"tag": "button", "text": {"tag": "plain_text", "content": "Approve"}},
],
},
],
}
}
),
message_id="om_interactive",
)
text, msg_type, media_urls, media_types = asyncio.run(adapter._extract_message_content(message))
self.assertEqual(text, "Approval Request\nRequester: Alice\nApprove\nActions: Approve")
self.assertEqual(msg_type.value, "text")
self.assertEqual(media_urls, [])
self.assertEqual(media_types, [])
@patch.dict(os.environ, {}, clear=True)
def test_extract_image_message_downloads_and_caches(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter._download_feishu_image = AsyncMock(return_value=("/tmp/feishu-image.png", "image/png"))
message = SimpleNamespace(
message_type="image",
content='{"image_key":"img_123"}',
message_id="om_image",
)
text, msg_type, media_urls, media_types = asyncio.run(adapter._extract_message_content(message))
self.assertEqual(text, "")
self.assertEqual(msg_type.value, "photo")
self.assertEqual(media_urls, ["/tmp/feishu-image.png"])
self.assertEqual(media_types, ["image/png"])
adapter._download_feishu_image.assert_awaited_once_with(
message_id="om_image",
image_key="img_123",
)
@patch.dict(os.environ, {}, clear=True)
def test_extract_audio_message_downloads_and_caches(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter._download_feishu_message_resource = AsyncMock(
return_value=("/tmp/feishu-audio.ogg", "audio/ogg")
)
message = SimpleNamespace(
message_type="audio",
content='{"file_key":"file_audio","file_name":"voice.ogg"}',
message_id="om_audio",
)
text, msg_type, media_urls, media_types = asyncio.run(adapter._extract_message_content(message))
self.assertEqual(text, "")
self.assertEqual(msg_type.value, "audio")
self.assertEqual(media_urls, ["/tmp/feishu-audio.ogg"])
self.assertEqual(media_types, ["audio/ogg"])
@patch.dict(os.environ, {}, clear=True)
def test_extract_file_message_downloads_and_caches(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter._download_feishu_message_resource = AsyncMock(
return_value=("/tmp/doc_123_report.pdf", "application/pdf")
)
message = SimpleNamespace(
message_type="file",
content='{"file_key":"file_doc","file_name":"report.pdf"}',
message_id="om_file",
)
text, msg_type, media_urls, media_types = asyncio.run(adapter._extract_message_content(message))
self.assertEqual(text, "")
self.assertEqual(msg_type.value, "document")
self.assertEqual(media_urls, ["/tmp/doc_123_report.pdf"])
self.assertEqual(media_types, ["application/pdf"])
@patch.dict(os.environ, {}, clear=True)
def test_extract_media_message_with_image_mime_becomes_photo(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter._download_feishu_message_resource = AsyncMock(
return_value=("/tmp/feishu-media.jpg", "image/jpeg")
)
message = SimpleNamespace(
message_type="media",
content='{"file_key":"file_media","file_name":"photo.jpg"}',
message_id="om_media",
)
text, msg_type, media_urls, media_types = asyncio.run(adapter._extract_message_content(message))
self.assertEqual(text, "")
self.assertEqual(msg_type.value, "photo")
self.assertEqual(media_urls, ["/tmp/feishu-media.jpg"])
self.assertEqual(media_types, ["image/jpeg"])
@patch.dict(os.environ, {}, clear=True)
def test_extract_media_message_with_video_mime_becomes_video(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter._download_feishu_message_resource = AsyncMock(
return_value=("/tmp/feishu-video.mp4", "video/mp4")
)
message = SimpleNamespace(
message_type="media",
content='{"file_key":"file_video","file_name":"clip.mp4"}',
message_id="om_video",
)
text, msg_type, media_urls, media_types = asyncio.run(adapter._extract_message_content(message))
self.assertEqual(text, "")
self.assertEqual(msg_type.value, "video")
self.assertEqual(media_urls, ["/tmp/feishu-video.mp4"])
self.assertEqual(media_types, ["video/mp4"])
@patch.dict(os.environ, {}, clear=True)
def test_extract_text_from_raw_content_uses_relation_message_fallbacks(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
shared = adapter._extract_text_from_raw_content(
msg_type="share_chat",
raw_content='{"chat_id":"oc_shared","chat_name":"Platform Ops"}',
)
attachment = adapter._extract_text_from_raw_content(
msg_type="file",
raw_content='{"file_key":"file_1","file_name":"report.pdf"}',
)
self.assertEqual(shared, "Shared chat: Platform Ops\nChat ID: oc_shared")
self.assertEqual(attachment, "[Attachment: report.pdf]")
@patch.dict(os.environ, {}, clear=True)
def test_extract_text_message_starting_with_slash_becomes_command(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter._dispatch_inbound_event = AsyncMock()
adapter.get_chat_info = AsyncMock(
return_value={"chat_id": "oc_chat", "name": "Feishu DM", "type": "dm"}
)
adapter._resolve_sender_profile = AsyncMock(
return_value={"user_id": "ou_user", "user_name": "张三", "user_id_alt": None}
)
message = SimpleNamespace(
chat_id="oc_chat",
thread_id=None,
parent_id=None,
upper_message_id=None,
message_type="text",
content='{"text":"/help test"}',
message_id="om_command",
)
asyncio.run(
adapter._process_inbound_message(
data=SimpleNamespace(event=SimpleNamespace(message=message)),
message=message,
sender_id=SimpleNamespace(open_id="ou_user", user_id=None, union_id=None),
chat_type="p2p",
message_id="om_command",
)
)
event = adapter._dispatch_inbound_event.await_args.args[0]
self.assertEqual(event.message_type.value, "command")
self.assertEqual(event.text, "/help test")
@patch.dict(os.environ, {}, clear=True)
def test_extract_text_file_injects_content(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
with tempfile.NamedTemporaryFile("w", suffix=".txt", delete=False) as tmp:
tmp.write("hello from feishu")
path = tmp.name
try:
text = asyncio.run(adapter._maybe_extract_text_document(path, "text/plain"))
finally:
os.unlink(path)
self.assertIn("hello from feishu", text)
self.assertIn("[Content of", text)
@patch.dict(os.environ, {}, clear=True)
def test_message_event_submits_to_adapter_loop(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter._loop = object()
message = SimpleNamespace(
message_id="om_text",
chat_type="p2p",
chat_id="oc_chat",
message_type="text",
content='{"text":"hello"}',
)
sender_id = SimpleNamespace(open_id="ou_user", user_id=None, union_id=None)
sender = SimpleNamespace(sender_id=sender_id, sender_type="user")
data = SimpleNamespace(event=SimpleNamespace(message=message, sender=sender))
future = SimpleNamespace(add_done_callback=lambda *_args, **_kwargs: None)
def _submit(coro, _loop):
coro.close()
return future
with patch("gateway.platforms.feishu.asyncio.run_coroutine_threadsafe", side_effect=_submit) as submit:
adapter._on_message_event(data)
self.assertTrue(submit.called)
@patch.dict(os.environ, {}, clear=True)
def test_process_inbound_message_uses_event_sender_identity_only(self):
from gateway.config import PlatformConfig
from gateway.platforms.base import MessageType
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter._dispatch_inbound_event = AsyncMock()
# Sender name now comes from the contact API; mock it to return a known value.
adapter._resolve_sender_name_from_api = AsyncMock(return_value="张三")
adapter.get_chat_info = AsyncMock(
return_value={"chat_id": "oc_chat", "name": "Feishu DM", "type": "dm"}
)
message = SimpleNamespace(
chat_id="oc_chat",
thread_id=None,
message_type="text",
content='{"text":"hello"}',
message_id="om_text",
)
sender_id = SimpleNamespace(
open_id="ou_user",
user_id="u_user",
union_id="on_union",
)
data = SimpleNamespace(event=SimpleNamespace(message=message, sender=SimpleNamespace(sender_id=sender_id)))
asyncio.run(
adapter._process_inbound_message(
data=data,
message=message,
sender_id=sender_id,
chat_type="p2p",
message_id="om_text",
)
)
adapter._dispatch_inbound_event.assert_awaited_once()
event = adapter._dispatch_inbound_event.await_args.args[0]
self.assertEqual(event.message_type, MessageType.TEXT)
self.assertEqual(event.source.user_id, "ou_user")
self.assertEqual(event.source.user_name, "张三")
self.assertEqual(event.source.user_id_alt, "on_union")
self.assertEqual(event.source.chat_name, "Feishu DM")
@patch.dict(os.environ, {}, clear=True)
def test_text_batch_merges_rapid_messages_into_single_event(self):
from gateway.config import PlatformConfig
from gateway.platforms.base import MessageEvent, MessageType
from gateway.platforms.feishu import FeishuAdapter
from gateway.session import SessionSource
adapter = FeishuAdapter(PlatformConfig())
adapter.handle_message = AsyncMock()
source = SessionSource(
platform=adapter.platform,
chat_id="oc_chat",
chat_name="Feishu DM",
chat_type="dm",
user_id="ou_user",
user_name="张三",
)
async def _sleep(_delay):
return None
async def _run() -> None:
with patch("gateway.platforms.feishu.asyncio.sleep", side_effect=_sleep):
await adapter._dispatch_inbound_event(
MessageEvent(text="A", message_type=MessageType.TEXT, source=source, message_id="om_1")
)
await adapter._dispatch_inbound_event(
MessageEvent(text="B", message_type=MessageType.TEXT, source=source, message_id="om_2")
)
pending = list(adapter._pending_text_batch_tasks.values())
self.assertEqual(len(pending), 1)
await asyncio.gather(*pending, return_exceptions=True)
asyncio.run(_run())
adapter.handle_message.assert_awaited_once()
event = adapter.handle_message.await_args.args[0]
self.assertEqual(event.text, "A\nB")
self.assertEqual(event.message_type, MessageType.TEXT)
@patch.dict(
os.environ,
{
"HERMES_FEISHU_TEXT_BATCH_MAX_MESSAGES": "2",
},
clear=True,
)
def test_text_batch_flushes_when_message_count_limit_is_hit(self):
from gateway.config import PlatformConfig
from gateway.platforms.base import MessageEvent, MessageType
from gateway.platforms.feishu import FeishuAdapter
from gateway.session import SessionSource
adapter = FeishuAdapter(PlatformConfig())
adapter.handle_message = AsyncMock()
source = SessionSource(
platform=adapter.platform,
chat_id="oc_chat",
chat_name="Feishu DM",
chat_type="dm",
user_id="ou_user",
user_name="张三",
)
async def _sleep(_delay):
return None
async def _run() -> None:
with patch("gateway.platforms.feishu.asyncio.sleep", side_effect=_sleep):
await adapter._dispatch_inbound_event(
MessageEvent(text="A", message_type=MessageType.TEXT, source=source, message_id="om_1")
)
await adapter._dispatch_inbound_event(
MessageEvent(text="B", message_type=MessageType.TEXT, source=source, message_id="om_2")
)
await adapter._dispatch_inbound_event(
MessageEvent(text="C", message_type=MessageType.TEXT, source=source, message_id="om_3")
)
pending = list(adapter._pending_text_batch_tasks.values())
self.assertEqual(len(pending), 1)
await asyncio.gather(*pending, return_exceptions=True)
asyncio.run(_run())
self.assertEqual(adapter.handle_message.await_count, 2)
first = adapter.handle_message.await_args_list[0].args[0]
second = adapter.handle_message.await_args_list[1].args[0]
self.assertEqual(first.text, "A\nB")
self.assertEqual(second.text, "C")
@patch.dict(os.environ, {}, clear=True)
def test_media_batch_merges_rapid_photo_messages(self):
from gateway.config import PlatformConfig
from gateway.platforms.base import MessageEvent, MessageType
from gateway.platforms.feishu import FeishuAdapter
from gateway.session import SessionSource
adapter = FeishuAdapter(PlatformConfig())
adapter.handle_message = AsyncMock()
source = SessionSource(
platform=adapter.platform,
chat_id="oc_chat",
chat_name="Feishu DM",
chat_type="dm",
user_id="ou_user",
user_name="张三",
)
async def _sleep(_delay):
return None
async def _run() -> None:
with patch("gateway.platforms.feishu.asyncio.sleep", side_effect=_sleep):
await adapter._dispatch_inbound_event(
MessageEvent(
text="第一张",
message_type=MessageType.PHOTO,
source=source,
message_id="om_p1",
media_urls=["/tmp/a.png"],
media_types=["image/png"],
)
)
await adapter._dispatch_inbound_event(
MessageEvent(
text="第二张",
message_type=MessageType.PHOTO,
source=source,
message_id="om_p2",
media_urls=["/tmp/b.png"],
media_types=["image/png"],
)
)
pending = list(adapter._pending_media_batch_tasks.values())
self.assertEqual(len(pending), 1)
await asyncio.gather(*pending, return_exceptions=True)
asyncio.run(_run())
adapter.handle_message.assert_awaited_once()
event = adapter.handle_message.await_args.args[0]
self.assertEqual(event.media_urls, ["/tmp/a.png", "/tmp/b.png"])
self.assertIn("第一张", event.text)
self.assertIn("第二张", event.text)
@patch.dict(os.environ, {}, clear=True)
def test_send_image_downloads_then_uses_native_image_send(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter.send_image_file = AsyncMock(return_value=SimpleNamespace(success=True, message_id="om_img"))
async def _run():
with patch("gateway.platforms.feishu.cache_image_from_url", new=AsyncMock(return_value="/tmp/cached.png")):
return await adapter.send_image("oc_chat", "https://example.com/cat.png", caption="cat")
result = asyncio.run(_run())
self.assertTrue(result.success)
adapter.send_image_file.assert_awaited_once()
self.assertEqual(adapter.send_image_file.await_args.kwargs["image_path"], "/tmp/cached.png")
@patch.dict(os.environ, {}, clear=True)
def test_send_animation_degrades_to_document_send(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter.send_document = AsyncMock(return_value=SimpleNamespace(success=True, message_id="om_gif"))
async def _run():
with patch.object(
adapter,
"_download_remote_document",
new=AsyncMock(return_value=("/tmp/anim.gif", "anim.gif")),
):
return await adapter.send_animation("oc_chat", "https://example.com/anim.gif", caption="look")
result = asyncio.run(_run())
self.assertTrue(result.success)
adapter.send_document.assert_awaited_once()
caption = adapter.send_document.await_args.kwargs["caption"]
self.assertIn("GIF downgraded to file", caption)
self.assertIn("look", caption)
def test_dedup_state_persists_across_adapter_restart(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
with tempfile.TemporaryDirectory() as temp_home:
with patch.dict(os.environ, {"HERMES_HOME": temp_home}, clear=False):
first = FeishuAdapter(PlatformConfig())
self.assertFalse(first._is_duplicate("om_same"))
second = FeishuAdapter(PlatformConfig())
self.assertTrue(second._is_duplicate("om_same"))
@patch.dict(os.environ, {}, clear=True)
def test_process_inbound_group_message_keeps_group_type_when_chat_lookup_falls_back(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter._dispatch_inbound_event = AsyncMock()
adapter.get_chat_info = AsyncMock(
return_value={"chat_id": "oc_group", "name": "oc_group", "type": "dm"}
)
adapter._resolve_sender_profile = AsyncMock(
return_value={"user_id": "ou_user", "user_name": "张三", "user_id_alt": None}
)
message = SimpleNamespace(
chat_id="oc_group",
thread_id=None,
message_type="text",
content='{"text":"hello group"}',
message_id="om_group_text",
)
sender_id = SimpleNamespace(open_id="ou_user", user_id=None, union_id=None)
data = SimpleNamespace(event=SimpleNamespace(message=message))
asyncio.run(
adapter._process_inbound_message(
data=data,
message=message,
sender_id=sender_id,
chat_type="group",
message_id="om_group_text",
)
)
event = adapter._dispatch_inbound_event.await_args.args[0]
self.assertEqual(event.source.chat_type, "group")
@patch.dict(os.environ, {}, clear=True)
def test_process_inbound_message_fetches_reply_to_text(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter._dispatch_inbound_event = AsyncMock()
adapter.get_chat_info = AsyncMock(
return_value={"chat_id": "oc_chat", "name": "Feishu DM", "type": "dm"}
)
adapter._resolve_sender_profile = AsyncMock(
return_value={"user_id": "ou_user", "user_name": "张三", "user_id_alt": None}
)
adapter._fetch_message_text = AsyncMock(return_value="父消息内容")
message = SimpleNamespace(
chat_id="oc_chat",
thread_id=None,
parent_id="om_parent",
upper_message_id=None,
message_type="text",
content='{"text":"reply"}',
message_id="om_reply",
)
asyncio.run(
adapter._process_inbound_message(
data=SimpleNamespace(event=SimpleNamespace(message=message)),
message=message,
sender_id=SimpleNamespace(open_id="ou_user", user_id=None, union_id=None),
chat_type="p2p",
message_id="om_reply",
)
)
event = adapter._dispatch_inbound_event.await_args.args[0]
self.assertEqual(event.reply_to_message_id, "om_parent")
self.assertEqual(event.reply_to_text, "父消息内容")
@patch.dict(os.environ, {}, clear=True)
def test_send_replies_in_thread_when_thread_metadata_present(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
captured = {}
class _ReplyAPI:
def reply(self, request):
captured["request"] = request
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(message_id="om_reply"),
)
adapter._client = SimpleNamespace(
im=SimpleNamespace(
v1=SimpleNamespace(
message=_ReplyAPI(),
)
)
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
result = asyncio.run(
adapter.send(
chat_id="oc_chat",
content="hello",
reply_to="om_parent",
metadata={"thread_id": "omt-thread"},
)
)
self.assertTrue(result.success)
self.assertEqual(result.message_id, "om_reply")
self.assertTrue(captured["request"].request_body.reply_in_thread)
@patch.dict(os.environ, {}, clear=True)
def test_send_retries_transient_failure(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
captured = {"attempts": 0}
sleeps = []
class _MessageAPI:
def create(self, request):
captured["attempts"] += 1
captured["request"] = request
if captured["attempts"] == 1:
raise OSError("temporary send failure")
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(message_id="om_retry"),
)
adapter._client = SimpleNamespace(
im=SimpleNamespace(
v1=SimpleNamespace(
message=_MessageAPI(),
)
)
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
async def _sleep(delay):
sleeps.append(delay)
with (
patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct),
patch("gateway.platforms.feishu.asyncio.sleep", side_effect=_sleep),
):
result = asyncio.run(adapter.send(chat_id="oc_chat", content="hello retry"))
self.assertTrue(result.success)
self.assertEqual(result.message_id, "om_retry")
self.assertEqual(captured["attempts"], 2)
self.assertEqual(sleeps, [1])
@patch.dict(os.environ, {}, clear=True)
def test_send_does_not_retry_deterministic_api_failure(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
captured = {"attempts": 0}
sleeps = []
class _MessageAPI:
def create(self, request):
captured["attempts"] += 1
return SimpleNamespace(
success=lambda: False,
code=400,
msg="bad request",
)
adapter._client = SimpleNamespace(
im=SimpleNamespace(
v1=SimpleNamespace(
message=_MessageAPI(),
)
)
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
async def _sleep(delay):
sleeps.append(delay)
with (
patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct),
patch("gateway.platforms.feishu.asyncio.sleep", side_effect=_sleep),
):
result = asyncio.run(adapter.send(chat_id="oc_chat", content="bad payload"))
self.assertFalse(result.success)
self.assertEqual(result.error, "[400] bad request")
self.assertEqual(captured["attempts"], 1)
self.assertEqual(sleeps, [])
@patch.dict(os.environ, {}, clear=True)
def test_send_document_reply_uses_thread_flag(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
captured = {}
class _FileAPI:
def create(self, request):
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(file_key="file_123"),
)
class _MessageAPI:
def reply(self, request):
captured["request"] = request
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(message_id="om_file_reply"),
)
adapter._client = SimpleNamespace(
im=SimpleNamespace(
v1=SimpleNamespace(
file=_FileAPI(),
message=_MessageAPI(),
)
)
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
with tempfile.NamedTemporaryFile("wb", suffix=".pdf", delete=False) as tmp:
tmp.write(b"%PDF-1.4 test")
file_path = tmp.name
try:
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
result = asyncio.run(
adapter.send_document(
chat_id="oc_chat",
file_path=file_path,
reply_to="om_parent",
metadata={"thread_id": "omt-thread"},
)
)
finally:
os.unlink(file_path)
self.assertTrue(result.success)
self.assertTrue(captured["request"].request_body.reply_in_thread)
@patch.dict(os.environ, {}, clear=True)
def test_send_document_uploads_file_and_sends_file_message(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
captured = {}
class _FileAPI:
def create(self, request):
captured["upload_request"] = request
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(file_key="file_123"),
)
class _MessageAPI:
def create(self, request):
captured["message_request"] = request
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(message_id="om_file_msg"),
)
adapter._client = SimpleNamespace(
im=SimpleNamespace(
v1=SimpleNamespace(
file=_FileAPI(),
message=_MessageAPI(),
)
)
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
with tempfile.NamedTemporaryFile("wb", suffix=".pdf", delete=False) as tmp:
tmp.write(b"%PDF-1.4 test")
file_path = tmp.name
try:
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
result = asyncio.run(adapter.send_document(chat_id="oc_chat", file_path=file_path))
finally:
os.unlink(file_path)
self.assertTrue(result.success)
self.assertEqual(result.message_id, "om_file_msg")
self.assertEqual(captured["upload_request"].request_body.file_type, "pdf")
self.assertEqual(
captured["message_request"].request_body.content,
'{"file_key": "file_123"}',
)
@patch.dict(os.environ, {}, clear=True)
def test_send_document_with_caption_uses_single_post_message(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
captured = {}
class _FileAPI:
def create(self, request):
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(file_key="file_123"),
)
class _MessageAPI:
def create(self, request):
captured["message_request"] = request
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(message_id="om_post_msg"),
)
adapter._client = SimpleNamespace(
im=SimpleNamespace(
v1=SimpleNamespace(
file=_FileAPI(),
message=_MessageAPI(),
)
)
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
with tempfile.NamedTemporaryFile("wb", suffix=".pdf", delete=False) as tmp:
tmp.write(b"%PDF-1.4 test")
file_path = tmp.name
try:
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
result = asyncio.run(
adapter.send_document(chat_id="oc_chat", file_path=file_path, caption="报告请看")
)
finally:
os.unlink(file_path)
self.assertTrue(result.success)
self.assertEqual(captured["message_request"].request_body.msg_type, "post")
self.assertIn('"tag": "media"', captured["message_request"].request_body.content)
self.assertIn('"file_key": "file_123"', captured["message_request"].request_body.content)
self.assertIn("报告请看", captured["message_request"].request_body.content)
@patch.dict(os.environ, {}, clear=True)
def test_send_image_file_uploads_image_and_sends_image_message(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
captured = {}
class _ImageAPI:
def create(self, request):
captured["upload_request"] = request
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(image_key="img_123"),
)
class _MessageAPI:
def create(self, request):
captured["message_request"] = request
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(message_id="om_image_msg"),
)
adapter._client = SimpleNamespace(
im=SimpleNamespace(
v1=SimpleNamespace(
image=_ImageAPI(),
message=_MessageAPI(),
)
)
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
with tempfile.NamedTemporaryFile("wb", suffix=".png", delete=False) as tmp:
tmp.write(b"\x89PNG\r\n\x1a\n")
image_path = tmp.name
try:
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
result = asyncio.run(adapter.send_image_file(chat_id="oc_chat", image_path=image_path))
finally:
os.unlink(image_path)
self.assertTrue(result.success)
self.assertEqual(result.message_id, "om_image_msg")
self.assertEqual(captured["upload_request"].request_body.image_type, "message")
self.assertEqual(
captured["message_request"].request_body.content,
'{"image_key": "img_123"}',
)
@patch.dict(os.environ, {}, clear=True)
def test_send_image_file_with_caption_uses_single_post_message(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
captured = {}
class _ImageAPI:
def create(self, request):
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(image_key="img_123"),
)
class _MessageAPI:
def create(self, request):
captured["message_request"] = request
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(message_id="om_post_img"),
)
adapter._client = SimpleNamespace(
im=SimpleNamespace(
v1=SimpleNamespace(
image=_ImageAPI(),
message=_MessageAPI(),
)
)
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
with tempfile.NamedTemporaryFile("wb", suffix=".png", delete=False) as tmp:
tmp.write(b"\x89PNG\r\n\x1a\n")
image_path = tmp.name
try:
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
result = asyncio.run(
adapter.send_image_file(chat_id="oc_chat", image_path=image_path, caption="截图说明")
)
finally:
os.unlink(image_path)
self.assertTrue(result.success)
self.assertEqual(captured["message_request"].request_body.msg_type, "post")
self.assertIn('"tag": "img"', captured["message_request"].request_body.content)
self.assertIn('"image_key": "img_123"', captured["message_request"].request_body.content)
self.assertIn("截图说明", captured["message_request"].request_body.content)
@patch.dict(os.environ, {}, clear=True)
def test_send_video_uploads_file_and_sends_media_message(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
captured = {}
class _FileAPI:
def create(self, request):
captured["upload_request"] = request
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(file_key="file_video_123"),
)
class _MessageAPI:
def create(self, request):
captured["message_request"] = request
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(message_id="om_video_msg"),
)
adapter._client = SimpleNamespace(
im=SimpleNamespace(
v1=SimpleNamespace(
file=_FileAPI(),
message=_MessageAPI(),
)
)
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
with tempfile.NamedTemporaryFile("wb", suffix=".mp4", delete=False) as tmp:
tmp.write(b"\x00\x00\x00\x18ftypmp42")
video_path = tmp.name
try:
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
result = asyncio.run(adapter.send_video(chat_id="oc_chat", video_path=video_path))
finally:
os.unlink(video_path)
self.assertTrue(result.success)
self.assertEqual(captured["upload_request"].request_body.file_type, "mp4")
self.assertEqual(captured["message_request"].request_body.msg_type, "media")
self.assertEqual(captured["message_request"].request_body.content, '{"file_key": "file_video_123"}')
@patch.dict(os.environ, {}, clear=True)
def test_send_voice_uploads_opus_and_sends_audio_message(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
captured = {}
class _FileAPI:
def create(self, request):
captured["upload_request"] = request
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(file_key="file_audio_123"),
)
class _MessageAPI:
def create(self, request):
captured["message_request"] = request
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(message_id="om_audio_msg"),
)
adapter._client = SimpleNamespace(
im=SimpleNamespace(
v1=SimpleNamespace(
file=_FileAPI(),
message=_MessageAPI(),
)
)
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
with tempfile.NamedTemporaryFile("wb", suffix=".opus", delete=False) as tmp:
tmp.write(b"opus")
audio_path = tmp.name
try:
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
result = asyncio.run(adapter.send_voice(chat_id="oc_chat", audio_path=audio_path))
finally:
os.unlink(audio_path)
self.assertTrue(result.success)
self.assertEqual(captured["upload_request"].request_body.file_type, "opus")
self.assertEqual(captured["message_request"].request_body.msg_type, "audio")
self.assertEqual(captured["message_request"].request_body.content, '{"file_key": "file_audio_123"}')
@patch.dict(os.environ, {}, clear=True)
def test_build_post_payload_extracts_title_and_links(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
payload = json.loads(adapter._build_post_payload("# 标题\n访问 [文档](https://example.com)"))
elements = payload["zh_cn"]["content"][0]
self.assertEqual(elements, [{"tag": "md", "text": "# 标题\n访问 [文档](https://example.com)"}])
@patch.dict(os.environ, {}, clear=True)
def test_build_post_payload_wraps_markdown_in_md_tag(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
payload = json.loads(
adapter._build_post_payload("支持 **粗体**、*斜体* 和 `代码`")
)
elements = payload["zh_cn"]["content"][0]
self.assertEqual(
elements,
[
{"tag": "md", "text": "支持 **粗体**、*斜体* 和 `代码`"},
],
)
@patch.dict(os.environ, {}, clear=True)
def test_build_post_payload_keeps_full_markdown_text(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
payload = json.loads(
adapter._build_post_payload(
"---\n1. 第一项\n 2. 子项\n- 外层\n - 内层\n<u>下划线</u> 和 ~~删除线~~"
)
)
rows = payload["zh_cn"]["content"]
self.assertEqual(
rows,
[[{"tag": "md", "text": "---\n1. 第一项\n 2. 子项\n- 外层\n - 内层\n<u>下划线</u> 和 ~~删除线~~"}]],
)
@patch.dict(os.environ, {}, clear=True)
def test_send_uses_post_for_inline_markdown(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
captured = {}
class _MessageAPI:
def create(self, request):
captured["request"] = request
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(message_id="om_markdown"),
)
adapter._client = SimpleNamespace(
im=SimpleNamespace(
v1=SimpleNamespace(
message=_MessageAPI(),
)
)
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
result = asyncio.run(
adapter.send(
chat_id="oc_chat",
content="可以用 **粗体** 和 *斜体*。",
)
)
self.assertTrue(result.success)
self.assertEqual(captured["request"].request_body.msg_type, "post")
payload = json.loads(captured["request"].request_body.content)
elements = payload["zh_cn"]["content"][0]
self.assertEqual(elements, [{"tag": "md", "text": "可以用 **粗体** 和 *斜体*。"}])
@patch.dict(os.environ, {}, clear=True)
def test_send_falls_back_to_text_when_post_payload_is_rejected(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
captured = {"calls": []}
class _MessageAPI:
def create(self, request):
captured["calls"].append(request)
if len(captured["calls"]) == 1:
raise RuntimeError("content format of the post type is incorrect")
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(message_id="om_plain"),
)
adapter._client = SimpleNamespace(
im=SimpleNamespace(
v1=SimpleNamespace(
message=_MessageAPI(),
)
)
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
result = asyncio.run(
adapter.send(
chat_id="oc_chat",
content="可以用 **粗体** 和 *斜体*。",
)
)
self.assertTrue(result.success)
self.assertEqual(captured["calls"][0].request_body.msg_type, "post")
self.assertEqual(captured["calls"][1].request_body.msg_type, "text")
self.assertEqual(
captured["calls"][1].request_body.content,
json.dumps({"text": "可以用 粗体 和 斜体。"}, ensure_ascii=False),
)
@patch.dict(os.environ, {}, clear=True)
def test_send_falls_back_to_text_when_post_response_is_unsuccessful(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
captured = {"calls": []}
class _MessageAPI:
def create(self, request):
captured["calls"].append(request)
if len(captured["calls"]) == 1:
return SimpleNamespace(success=lambda: False, code=230001, msg="content format of the post type is incorrect")
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(message_id="om_plain_response"),
)
adapter._client = SimpleNamespace(
im=SimpleNamespace(
v1=SimpleNamespace(
message=_MessageAPI(),
)
)
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
result = asyncio.run(
adapter.send(
chat_id="oc_chat",
content="可以用 **粗体** 和 *斜体*。",
)
)
self.assertTrue(result.success)
self.assertEqual(captured["calls"][0].request_body.msg_type, "post")
self.assertEqual(captured["calls"][1].request_body.msg_type, "text")
self.assertEqual(
captured["calls"][1].request_body.content,
json.dumps({"text": "可以用 粗体 和 斜体。"}, ensure_ascii=False),
)
@patch.dict(os.environ, {}, clear=True)
def test_send_uses_post_for_advanced_markdown_lines(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
captured = {}
class _MessageAPI:
def create(self, request):
captured["request"] = request
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(message_id="om_markdown_advanced"),
)
adapter._client = SimpleNamespace(
im=SimpleNamespace(
v1=SimpleNamespace(
message=_MessageAPI(),
)
)
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
result = asyncio.run(
adapter.send(
chat_id="oc_chat",
content="---\n1. 第一项\n<u>下划线</u>\n~~删除线~~",
)
)
self.assertTrue(result.success)
self.assertEqual(captured["request"].request_body.msg_type, "post")
payload = json.loads(captured["request"].request_body.content)
rows = payload["zh_cn"]["content"]
self.assertEqual(
rows,
[[{"tag": "md", "text": "---\n1. 第一项\n<u>下划线</u>\n~~删除线~~"}]],
)
@unittest.skipUnless(_HAS_LARK_OAPI, "lark-oapi not installed")
class TestWebhookSecurity(unittest.TestCase):
"""Tests for webhook signature verification, rate limiting, and body size limits."""
def _make_adapter(self, encrypt_key: str = "") -> "FeishuAdapter":
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
with patch.dict(os.environ, {"FEISHU_APP_ID": "cli", "FEISHU_APP_SECRET": "sec", "FEISHU_ENCRYPT_KEY": encrypt_key}, clear=True):
return FeishuAdapter(PlatformConfig())
def test_signature_valid_passes(self):
import hashlib
from gateway.platforms.feishu import FeishuAdapter
from gateway.config import PlatformConfig
encrypt_key = "test_secret"
adapter = self._make_adapter(encrypt_key)
body = b'{"type":"event"}'
timestamp = "1700000000"
nonce = "abc123"
content = f"{timestamp}{nonce}{encrypt_key}" + body.decode("utf-8")
sig = hashlib.sha256(content.encode("utf-8")).hexdigest()
headers = {"x-lark-request-timestamp": timestamp, "x-lark-request-nonce": nonce, "x-lark-signature": sig}
self.assertTrue(adapter._is_webhook_signature_valid(headers, body))
def test_signature_invalid_rejected(self):
adapter = self._make_adapter("test_secret")
headers = {
"x-lark-request-timestamp": "1700000000",
"x-lark-request-nonce": "abc",
"x-lark-signature": "deadbeef" * 8,
}
self.assertFalse(adapter._is_webhook_signature_valid(headers, b'{"type":"event"}'))
def test_signature_missing_headers_rejected(self):
adapter = self._make_adapter("test_secret")
self.assertFalse(adapter._is_webhook_signature_valid({}, b'{}'))
def test_rate_limit_allows_requests_within_window(self):
adapter = self._make_adapter()
for _ in range(5):
self.assertTrue(adapter._check_webhook_rate_limit("10.0.0.1"))
def test_rate_limit_blocks_after_exceeding_max(self):
from gateway.platforms.feishu import _FEISHU_WEBHOOK_RATE_LIMIT_MAX
adapter = self._make_adapter()
for _ in range(_FEISHU_WEBHOOK_RATE_LIMIT_MAX):
adapter._check_webhook_rate_limit("10.0.0.2")
self.assertFalse(adapter._check_webhook_rate_limit("10.0.0.2"))
def test_rate_limit_resets_after_window_expires(self):
from gateway.platforms.feishu import _FEISHU_WEBHOOK_RATE_LIMIT_MAX, _FEISHU_WEBHOOK_RATE_WINDOW_SECONDS
adapter = self._make_adapter()
ip = "10.0.0.3"
for _ in range(_FEISHU_WEBHOOK_RATE_LIMIT_MAX):
adapter._check_webhook_rate_limit(ip)
self.assertFalse(adapter._check_webhook_rate_limit(ip))
# Simulate window expiry by backdating the stored entry.
count, window_start = adapter._webhook_rate_counts[ip]
adapter._webhook_rate_counts[ip] = (count, window_start - _FEISHU_WEBHOOK_RATE_WINDOW_SECONDS - 1)
self.assertTrue(adapter._check_webhook_rate_limit(ip))
@patch.dict(os.environ, {}, clear=True)
def test_webhook_request_rejects_oversized_body(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter, _FEISHU_WEBHOOK_MAX_BODY_BYTES
adapter = FeishuAdapter(PlatformConfig())
# Simulate a request whose Content-Length already signals oversize.
request = SimpleNamespace(
remote="127.0.0.1",
content_length=_FEISHU_WEBHOOK_MAX_BODY_BYTES + 1,
)
response = asyncio.run(adapter._handle_webhook_request(request))
self.assertEqual(response.status, 413)
@patch.dict(os.environ, {}, clear=True)
def test_webhook_request_rejects_invalid_json(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
request = SimpleNamespace(
remote="127.0.0.1",
content_length=None,
read=AsyncMock(return_value=b"not-json"),
)
response = asyncio.run(adapter._handle_webhook_request(request))
self.assertEqual(response.status, 400)
@patch.dict(os.environ, {"FEISHU_ENCRYPT_KEY": "secret"}, clear=True)
def test_webhook_request_rejects_bad_signature(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
body = json.dumps({"header": {"event_type": "im.message.receive_v1"}}).encode()
request = SimpleNamespace(
remote="127.0.0.1",
content_length=None,
headers={"x-lark-request-timestamp": "123", "x-lark-request-nonce": "abc", "x-lark-signature": "bad"},
read=AsyncMock(return_value=body),
)
response = asyncio.run(adapter._handle_webhook_request(request))
self.assertEqual(response.status, 401)
@patch.dict(os.environ, {}, clear=True)
def test_webhook_url_verification_challenge_passes_without_signature(self):
"""Challenge requests must succeed even when no encrypt_key is set."""
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
body = json.dumps({"type": "url_verification", "challenge": "test_challenge_token"}).encode()
request = SimpleNamespace(
remote="127.0.0.1",
content_length=None,
read=AsyncMock(return_value=body),
)
response = asyncio.run(adapter._handle_webhook_request(request))
self.assertEqual(response.status, 200)
self.assertIn(b"test_challenge_token", response.body)
class TestDedupTTL(unittest.TestCase):
"""Tests for TTL-aware deduplication."""
@patch.dict(os.environ, {}, clear=True)
def test_duplicate_within_ttl_is_rejected(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
with patch.object(adapter, "_persist_seen_message_ids"):
adapter._seen_message_ids = {"om_dup": time.time()}
adapter._seen_message_order = ["om_dup"]
self.assertTrue(adapter._is_duplicate("om_dup"))
@patch.dict(os.environ, {}, clear=True)
def test_expired_entry_is_not_considered_duplicate(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter, _FEISHU_DEDUP_TTL_SECONDS
adapter = FeishuAdapter(PlatformConfig())
# Plant an entry that expired well past the TTL.
stale_ts = time.time() - _FEISHU_DEDUP_TTL_SECONDS - 60
adapter._seen_message_ids = {"om_old": stale_ts}
adapter._seen_message_order = ["om_old"]
with patch.object(adapter, "_persist_seen_message_ids"):
self.assertFalse(adapter._is_duplicate("om_old"))
@patch.dict(os.environ, {}, clear=True)
def test_persist_saves_timestamps_as_dict(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
ts = time.time()
adapter._seen_message_ids = {"om_ts1": ts}
adapter._seen_message_order = ["om_ts1"]
with tempfile.TemporaryDirectory() as tmpdir:
adapter._dedup_state_path = Path(tmpdir) / "dedup.json"
adapter._persist_seen_message_ids()
saved = json.loads(adapter._dedup_state_path.read_text())
self.assertIsInstance(saved["message_ids"], dict)
self.assertAlmostEqual(saved["message_ids"]["om_ts1"], ts, places=1)
@patch.dict(os.environ, {}, clear=True)
def test_load_backward_compat_list_format(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
with tempfile.TemporaryDirectory() as tmpdir:
path = Path(tmpdir) / "dedup.json"
path.write_text(json.dumps({"message_ids": ["om_a", "om_b"]}), encoding="utf-8")
adapter._dedup_state_path = path
adapter._load_seen_message_ids()
self.assertIn("om_a", adapter._seen_message_ids)
self.assertIn("om_b", adapter._seen_message_ids)
class TestGroupMentionAtAll(unittest.TestCase):
"""Tests for @_all (Feishu @everyone) group mention routing."""
@patch.dict(os.environ, {"FEISHU_GROUP_POLICY": "open"}, clear=True)
def test_at_all_in_content_accepts_without_explicit_bot_mention(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
message = SimpleNamespace(
content='{"text":"@_all 请注意"}',
mentions=[],
)
sender_id = SimpleNamespace(open_id="ou_any", user_id=None)
self.assertTrue(adapter._should_accept_group_message(message, sender_id))
@patch.dict(os.environ, {"FEISHU_GROUP_POLICY": "allowlist", "FEISHU_ALLOWED_USERS": "ou_allowed"}, clear=True)
def test_at_all_still_requires_policy_gate(self):
"""@_all bypasses mention gating but NOT the allowlist policy."""
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
message = SimpleNamespace(content='{"text":"@_all attention"}', mentions=[])
# Non-allowlisted user — should be blocked even with @_all.
blocked_sender = SimpleNamespace(open_id="ou_blocked", user_id=None)
self.assertFalse(adapter._should_accept_group_message(message, blocked_sender))
# Allowlisted user — should pass.
allowed_sender = SimpleNamespace(open_id="ou_allowed", user_id=None)
self.assertTrue(adapter._should_accept_group_message(message, allowed_sender))
@unittest.skipUnless(_HAS_LARK_OAPI, "lark-oapi not installed")
class TestSenderNameResolution(unittest.TestCase):
"""Tests for _resolve_sender_name_from_api."""
@patch.dict(os.environ, {}, clear=True)
def test_returns_none_when_client_is_none(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter._client = None
result = asyncio.run(adapter._resolve_sender_name_from_api("ou_abc"))
self.assertIsNone(result)
@patch.dict(os.environ, {}, clear=True)
def test_returns_cached_name_within_ttl(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter._client = SimpleNamespace()
future_expire = time.time() + 600
adapter._sender_name_cache["ou_cached"] = ("Alice", future_expire)
result = asyncio.run(adapter._resolve_sender_name_from_api("ou_cached"))
self.assertEqual(result, "Alice")
@patch.dict(os.environ, {}, clear=True)
def test_fetches_and_caches_name_from_api(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
user_obj = SimpleNamespace(name="Bob", display_name=None, nickname=None, en_name=None)
mock_response = SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(user=user_obj),
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
class _ContactAPI:
def get(self, request):
return mock_response
adapter._client = SimpleNamespace(
contact=SimpleNamespace(v3=SimpleNamespace(user=_ContactAPI()))
)
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
result = asyncio.run(adapter._resolve_sender_name_from_api("ou_bob"))
self.assertEqual(result, "Bob")
self.assertIn("ou_bob", adapter._sender_name_cache)
@patch.dict(os.environ, {}, clear=True)
def test_expired_cache_triggers_new_api_call(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
# Expired cache entry.
adapter._sender_name_cache["ou_expired"] = ("OldName", time.time() - 1)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
user_obj = SimpleNamespace(name="NewName", display_name=None, nickname=None, en_name=None)
class _ContactAPI:
def get(self, request):
return SimpleNamespace(success=lambda: True, data=SimpleNamespace(user=user_obj))
adapter._client = SimpleNamespace(
contact=SimpleNamespace(v3=SimpleNamespace(user=_ContactAPI()))
)
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
result = asyncio.run(adapter._resolve_sender_name_from_api("ou_expired"))
self.assertEqual(result, "NewName")
@patch.dict(os.environ, {}, clear=True)
def test_api_failure_returns_none_without_raising(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
class _BrokenContactAPI:
def get(self, _request):
raise RuntimeError("API down")
adapter._client = SimpleNamespace(
contact=SimpleNamespace(v3=SimpleNamespace(user=_BrokenContactAPI()))
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
result = asyncio.run(adapter._resolve_sender_name_from_api("ou_broken"))
self.assertIsNone(result)