Adds Feishu (ByteDance's enterprise messaging platform) as a gateway platform adapter with full feature parity: WebSocket + webhook transports, message batching, dedup, rate limiting, rich post/card content parsing, media handling (images/audio/files/video), group @mention gating, reaction routing, and interactive card button support. Cherry-picked from PR #1793 by penwyp with: - Moved to current main (PR was 458 commits behind) - Fixed _send_with_retry shadowing BasePlatformAdapter method (renamed to _feishu_send_with_retry to avoid signature mismatch crash) - Fixed import structure: aiohttp/websockets imported independently of lark_oapi so they remain available when SDK is missing - Fixed get_hermes_home import (hermes_constants, not hermes_cli.config) - Added skip decorators for tests requiring lark_oapi SDK - All 16 integration points added surgically to current main New dependency: lark-oapi>=1.5.3,<2 (optional, pip install hermes-agent[feishu]) Fixes #1788 Co-authored-by: penwyp <penwyp@users.noreply.github.com>
2581 lines
100 KiB
Python
2581 lines
100 KiB
Python
"""Tests for the Feishu gateway integration."""
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import tempfile
|
|
import time
|
|
import unittest
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
try:
|
|
import lark_oapi
|
|
_HAS_LARK_OAPI = True
|
|
except ImportError:
|
|
_HAS_LARK_OAPI = False
|
|
|
|
|
|
class TestPlatformEnum(unittest.TestCase):
|
|
def test_feishu_in_platform_enum(self):
|
|
from gateway.config import Platform
|
|
|
|
self.assertEqual(Platform.FEISHU.value, "feishu")
|
|
|
|
|
|
class TestConfigEnvOverrides(unittest.TestCase):
|
|
@patch.dict(os.environ, {
|
|
"FEISHU_APP_ID": "cli_xxx",
|
|
"FEISHU_APP_SECRET": "secret_xxx",
|
|
"FEISHU_CONNECTION_MODE": "websocket",
|
|
"FEISHU_DOMAIN": "feishu",
|
|
}, clear=False)
|
|
def test_feishu_config_loaded_from_env(self):
|
|
from gateway.config import GatewayConfig, Platform, _apply_env_overrides
|
|
|
|
config = GatewayConfig()
|
|
_apply_env_overrides(config)
|
|
|
|
self.assertIn(Platform.FEISHU, config.platforms)
|
|
self.assertTrue(config.platforms[Platform.FEISHU].enabled)
|
|
self.assertEqual(config.platforms[Platform.FEISHU].extra["app_id"], "cli_xxx")
|
|
self.assertEqual(config.platforms[Platform.FEISHU].extra["connection_mode"], "websocket")
|
|
|
|
@patch.dict(os.environ, {
|
|
"FEISHU_APP_ID": "cli_xxx",
|
|
"FEISHU_APP_SECRET": "secret_xxx",
|
|
"FEISHU_HOME_CHANNEL": "oc_xxx",
|
|
}, clear=False)
|
|
def test_feishu_home_channel_loaded(self):
|
|
from gateway.config import GatewayConfig, Platform, _apply_env_overrides
|
|
|
|
config = GatewayConfig()
|
|
_apply_env_overrides(config)
|
|
|
|
home = config.platforms[Platform.FEISHU].home_channel
|
|
self.assertIsNotNone(home)
|
|
self.assertEqual(home.chat_id, "oc_xxx")
|
|
|
|
@patch.dict(os.environ, {
|
|
"FEISHU_APP_ID": "cli_xxx",
|
|
"FEISHU_APP_SECRET": "secret_xxx",
|
|
}, clear=False)
|
|
def test_feishu_in_connected_platforms(self):
|
|
from gateway.config import GatewayConfig, Platform, _apply_env_overrides
|
|
|
|
config = GatewayConfig()
|
|
_apply_env_overrides(config)
|
|
|
|
self.assertIn(Platform.FEISHU, config.get_connected_platforms())
|
|
|
|
|
|
class TestGatewayIntegration(unittest.TestCase):
|
|
def test_feishu_in_adapter_factory(self):
|
|
source = Path("gateway/run.py").read_text(encoding="utf-8")
|
|
self.assertIn("Platform.FEISHU", source)
|
|
self.assertIn("FeishuAdapter", source)
|
|
|
|
def test_feishu_in_authorization_maps(self):
|
|
source = Path("gateway/run.py").read_text(encoding="utf-8")
|
|
self.assertIn("FEISHU_ALLOWED_USERS", source)
|
|
self.assertIn("FEISHU_ALLOW_ALL_USERS", source)
|
|
|
|
def test_feishu_toolset_exists(self):
|
|
from toolsets import TOOLSETS
|
|
|
|
self.assertIn("hermes-feishu", TOOLSETS)
|
|
self.assertIn("hermes-feishu", TOOLSETS["hermes-gateway"]["includes"])
|
|
|
|
|
|
class TestFeishuPostParsing(unittest.TestCase):
|
|
def test_parse_post_content_extracts_text_mentions_and_media_refs(self):
|
|
from gateway.platforms.feishu import parse_feishu_post_content
|
|
|
|
result = parse_feishu_post_content(
|
|
json.dumps(
|
|
{
|
|
"en_us": {
|
|
"title": "Rich message",
|
|
"content": [
|
|
[{"tag": "img", "image_key": "img_1", "alt": "diagram"}],
|
|
[{"tag": "at", "user_name": "Alice", "open_id": "ou_alice"}],
|
|
[{"tag": "media", "file_key": "file_1", "file_name": "spec.pdf"}],
|
|
],
|
|
}
|
|
}
|
|
)
|
|
)
|
|
|
|
self.assertEqual(result.text_content, "Rich message\n[Image: diagram]\n@Alice\n[Attachment: spec.pdf]")
|
|
self.assertEqual(result.image_keys, ["img_1"])
|
|
self.assertEqual(result.mentioned_ids, ["ou_alice"])
|
|
self.assertEqual(len(result.media_refs), 1)
|
|
self.assertEqual(result.media_refs[0].file_key, "file_1")
|
|
self.assertEqual(result.media_refs[0].file_name, "spec.pdf")
|
|
self.assertEqual(result.media_refs[0].resource_type, "file")
|
|
|
|
def test_parse_post_content_uses_fallback_when_invalid(self):
|
|
from gateway.platforms.feishu import FALLBACK_POST_TEXT, parse_feishu_post_content
|
|
|
|
result = parse_feishu_post_content("not-json")
|
|
|
|
self.assertEqual(result.text_content, FALLBACK_POST_TEXT)
|
|
self.assertEqual(result.image_keys, [])
|
|
self.assertEqual(result.media_refs, [])
|
|
self.assertEqual(result.mentioned_ids, [])
|
|
|
|
def test_parse_post_content_preserves_rich_text_semantics(self):
|
|
from gateway.platforms.feishu import parse_feishu_post_content
|
|
|
|
result = parse_feishu_post_content(
|
|
json.dumps(
|
|
{
|
|
"en_us": {
|
|
"title": "Plan *v2*",
|
|
"content": [
|
|
[
|
|
{"tag": "text", "text": "Bold", "style": {"bold": True}},
|
|
{"tag": "text", "text": " "},
|
|
{"tag": "text", "text": "Italic", "style": {"italic": True}},
|
|
{"tag": "text", "text": " "},
|
|
{"tag": "text", "text": "Code", "style": {"code": True}},
|
|
],
|
|
[{"tag": "text", "text": "line1"}, {"tag": "br"}, {"tag": "text", "text": "line2"}],
|
|
[{"tag": "hr"}],
|
|
[{"tag": "code_block", "language": "python", "text": "print('hi')"}],
|
|
],
|
|
}
|
|
}
|
|
)
|
|
)
|
|
|
|
self.assertEqual(
|
|
result.text_content,
|
|
"Plan *v2*\n**Bold** *Italic* `Code`\nline1\nline2\n---\n```python\nprint('hi')\n```",
|
|
)
|
|
|
|
|
|
class TestFeishuMessageNormalization(unittest.TestCase):
|
|
def test_normalize_merge_forward_preserves_summary_lines(self):
|
|
from gateway.platforms.feishu import normalize_feishu_message
|
|
|
|
normalized = normalize_feishu_message(
|
|
message_type="merge_forward",
|
|
raw_content=json.dumps(
|
|
{
|
|
"title": "Sprint recap",
|
|
"messages": [
|
|
{"sender_name": "Alice", "text": "Please review PR-128"},
|
|
{
|
|
"sender_name": "Bob",
|
|
"message_type": "post",
|
|
"content": {
|
|
"en_us": {
|
|
"content": [[{"tag": "text", "text": "Ship it"}]],
|
|
}
|
|
},
|
|
},
|
|
],
|
|
}
|
|
),
|
|
)
|
|
|
|
self.assertEqual(normalized.relation_kind, "merge_forward")
|
|
self.assertEqual(
|
|
normalized.text_content,
|
|
"Sprint recap\n- Alice: Please review PR-128\n- Bob: Ship it",
|
|
)
|
|
|
|
def test_normalize_share_chat_exposes_summary_and_metadata(self):
|
|
from gateway.platforms.feishu import normalize_feishu_message
|
|
|
|
normalized = normalize_feishu_message(
|
|
message_type="share_chat",
|
|
raw_content=json.dumps(
|
|
{
|
|
"chat_id": "oc_chat_shared",
|
|
"chat_name": "Backend Guild",
|
|
}
|
|
),
|
|
)
|
|
|
|
self.assertEqual(normalized.relation_kind, "share_chat")
|
|
self.assertEqual(normalized.text_content, "Shared chat: Backend Guild\nChat ID: oc_chat_shared")
|
|
self.assertEqual(normalized.metadata["chat_id"], "oc_chat_shared")
|
|
self.assertEqual(normalized.metadata["chat_name"], "Backend Guild")
|
|
|
|
def test_normalize_interactive_card_preserves_title_body_and_actions(self):
|
|
from gateway.platforms.feishu import normalize_feishu_message
|
|
|
|
normalized = normalize_feishu_message(
|
|
message_type="interactive",
|
|
raw_content=json.dumps(
|
|
{
|
|
"card": {
|
|
"header": {"title": {"tag": "plain_text", "content": "Build Failed"}},
|
|
"elements": [
|
|
{"tag": "div", "text": {"tag": "lark_md", "content": "Service: payments-api"}},
|
|
{"tag": "div", "text": {"tag": "plain_text", "content": "Branch: main"}},
|
|
{
|
|
"tag": "action",
|
|
"actions": [
|
|
{"tag": "button", "text": {"tag": "plain_text", "content": "View Logs"}},
|
|
{"tag": "button", "text": {"tag": "plain_text", "content": "Retry"}},
|
|
],
|
|
},
|
|
],
|
|
}
|
|
}
|
|
),
|
|
)
|
|
|
|
self.assertEqual(normalized.relation_kind, "interactive")
|
|
self.assertEqual(
|
|
normalized.text_content,
|
|
"Build Failed\nService: payments-api\nBranch: main\nView Logs\nRetry\nActions: View Logs, Retry",
|
|
)
|
|
|
|
|
|
class TestFeishuAdapterMessaging(unittest.TestCase):
|
|
@patch.dict(os.environ, {
|
|
"FEISHU_APP_ID": "cli_app",
|
|
"FEISHU_APP_SECRET": "secret_app",
|
|
"FEISHU_CONNECTION_MODE": "webhook",
|
|
"FEISHU_WEBHOOK_HOST": "127.0.0.1",
|
|
"FEISHU_WEBHOOK_PORT": "9001",
|
|
"FEISHU_WEBHOOK_PATH": "/hook",
|
|
}, clear=True)
|
|
def test_connect_webhook_mode_starts_local_server(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
runner = AsyncMock()
|
|
site = AsyncMock()
|
|
web_module = SimpleNamespace(
|
|
Application=lambda: SimpleNamespace(router=SimpleNamespace(add_post=lambda *_args, **_kwargs: None)),
|
|
AppRunner=lambda _app: runner,
|
|
TCPSite=lambda _runner, host, port: SimpleNamespace(start=site.start, host=host, port=port),
|
|
)
|
|
|
|
with (
|
|
patch("gateway.platforms.feishu.FEISHU_AVAILABLE", True),
|
|
patch("gateway.platforms.feishu.FEISHU_WEBHOOK_AVAILABLE", True),
|
|
patch("gateway.platforms.feishu.acquire_scoped_lock", return_value=(True, None)),
|
|
patch("gateway.platforms.feishu.release_scoped_lock"),
|
|
patch.object(adapter, "_hydrate_bot_identity", new=AsyncMock()),
|
|
patch.object(adapter, "_build_lark_client", return_value=SimpleNamespace()),
|
|
patch("gateway.platforms.feishu.web", web_module),
|
|
):
|
|
connected = asyncio.run(adapter.connect())
|
|
|
|
self.assertTrue(connected)
|
|
runner.setup.assert_awaited_once()
|
|
site.start.assert_awaited_once()
|
|
|
|
@patch.dict(os.environ, {
|
|
"FEISHU_APP_ID": "cli_app",
|
|
"FEISHU_APP_SECRET": "secret_app",
|
|
}, clear=True)
|
|
def test_connect_acquires_scoped_lock_and_disconnect_releases_it(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
ws_client = object()
|
|
|
|
with (
|
|
patch("gateway.platforms.feishu.FEISHU_AVAILABLE", True),
|
|
patch("gateway.platforms.feishu.FEISHU_WEBSOCKET_AVAILABLE", True),
|
|
patch("gateway.platforms.feishu.lark", SimpleNamespace(LogLevel=SimpleNamespace(INFO="INFO", WARNING="WARNING"))),
|
|
patch("gateway.platforms.feishu.EventDispatcherHandler", object()),
|
|
patch("gateway.platforms.feishu.FeishuWSClient", return_value=ws_client),
|
|
patch("gateway.platforms.feishu._run_official_feishu_ws_client"),
|
|
patch("gateway.platforms.feishu.acquire_scoped_lock", return_value=(True, None)) as acquire_lock,
|
|
patch("gateway.platforms.feishu.release_scoped_lock") as release_lock,
|
|
patch.object(adapter, "_hydrate_bot_identity", new=AsyncMock()),
|
|
patch.object(adapter, "_build_lark_client", return_value=SimpleNamespace()),
|
|
):
|
|
loop = asyncio.new_event_loop()
|
|
future = loop.create_future()
|
|
future.set_result(None)
|
|
|
|
class _Loop:
|
|
def run_in_executor(self, *_args, **_kwargs):
|
|
return future
|
|
|
|
try:
|
|
with patch("gateway.platforms.feishu.asyncio.get_running_loop", return_value=_Loop()):
|
|
connected = asyncio.run(adapter.connect())
|
|
asyncio.run(adapter.disconnect())
|
|
finally:
|
|
loop.close()
|
|
|
|
self.assertTrue(connected)
|
|
acquire_lock.assert_called_once_with(
|
|
"feishu-app-id",
|
|
"cli_app",
|
|
metadata={"platform": "feishu"},
|
|
)
|
|
release_lock.assert_called_once_with("feishu-app-id", "cli_app")
|
|
|
|
@patch.dict(os.environ, {
|
|
"FEISHU_APP_ID": "cli_app",
|
|
"FEISHU_APP_SECRET": "secret_app",
|
|
}, clear=True)
|
|
def test_connect_rejects_existing_app_lock(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
|
|
with (
|
|
patch("gateway.platforms.feishu.FEISHU_AVAILABLE", True),
|
|
patch("gateway.platforms.feishu.FEISHU_WEBSOCKET_AVAILABLE", True),
|
|
patch(
|
|
"gateway.platforms.feishu.acquire_scoped_lock",
|
|
return_value=(False, {"pid": 4321}),
|
|
),
|
|
):
|
|
connected = asyncio.run(adapter.connect())
|
|
|
|
self.assertFalse(connected)
|
|
self.assertEqual(adapter.fatal_error_code, "feishu_app_lock")
|
|
self.assertFalse(adapter.fatal_error_retryable)
|
|
self.assertIn("PID 4321", adapter.fatal_error_message)
|
|
|
|
@patch.dict(os.environ, {
|
|
"FEISHU_APP_ID": "cli_app",
|
|
"FEISHU_APP_SECRET": "secret_app",
|
|
}, clear=True)
|
|
def test_connect_retries_transient_startup_failure(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
ws_client = object()
|
|
sleeps = []
|
|
|
|
with (
|
|
patch("gateway.platforms.feishu.FEISHU_AVAILABLE", True),
|
|
patch("gateway.platforms.feishu.FEISHU_WEBSOCKET_AVAILABLE", True),
|
|
patch("gateway.platforms.feishu.lark", SimpleNamespace(LogLevel=SimpleNamespace(INFO="INFO", WARNING="WARNING"))),
|
|
patch("gateway.platforms.feishu.EventDispatcherHandler", object()),
|
|
patch("gateway.platforms.feishu.FeishuWSClient", return_value=ws_client),
|
|
patch("gateway.platforms.feishu.acquire_scoped_lock", return_value=(True, None)),
|
|
patch("gateway.platforms.feishu.release_scoped_lock"),
|
|
patch.object(adapter, "_hydrate_bot_identity", new=AsyncMock()),
|
|
patch("gateway.platforms.feishu.asyncio.sleep", side_effect=lambda delay: sleeps.append(delay)),
|
|
patch.object(adapter, "_build_lark_client", return_value=SimpleNamespace()),
|
|
):
|
|
loop = asyncio.new_event_loop()
|
|
future = loop.create_future()
|
|
future.set_result(None)
|
|
|
|
class _Loop:
|
|
def __init__(self):
|
|
self.calls = 0
|
|
|
|
def run_in_executor(self, *_args, **_kwargs):
|
|
self.calls += 1
|
|
if self.calls == 1:
|
|
raise OSError("temporary websocket failure")
|
|
return future
|
|
|
|
fake_loop = _Loop()
|
|
try:
|
|
with patch("gateway.platforms.feishu.asyncio.get_running_loop", return_value=fake_loop):
|
|
connected = asyncio.run(adapter.connect())
|
|
finally:
|
|
loop.close()
|
|
|
|
self.assertTrue(connected)
|
|
self.assertEqual(sleeps, [1])
|
|
self.assertEqual(fake_loop.calls, 2)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_edit_message_updates_existing_feishu_message(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
captured = {}
|
|
|
|
class _MessageAPI:
|
|
def update(self, request):
|
|
captured["request"] = request
|
|
return SimpleNamespace(success=lambda: True)
|
|
|
|
adapter._client = SimpleNamespace(
|
|
im=SimpleNamespace(
|
|
v1=SimpleNamespace(
|
|
message=_MessageAPI(),
|
|
)
|
|
)
|
|
)
|
|
|
|
async def _direct(func, *args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
|
|
result = asyncio.run(
|
|
adapter.edit_message(
|
|
chat_id="oc_chat",
|
|
message_id="om_progress",
|
|
content="📖 read_file: \"/tmp/image.png\"",
|
|
)
|
|
)
|
|
|
|
self.assertTrue(result.success)
|
|
self.assertEqual(result.message_id, "om_progress")
|
|
self.assertEqual(captured["request"].message_id, "om_progress")
|
|
self.assertEqual(captured["request"].request_body.msg_type, "text")
|
|
self.assertEqual(
|
|
captured["request"].request_body.content,
|
|
json.dumps({"text": "📖 read_file: \"/tmp/image.png\""}, ensure_ascii=False),
|
|
)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_edit_message_falls_back_to_text_when_post_update_is_rejected(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
captured = {"calls": []}
|
|
|
|
class _MessageAPI:
|
|
def update(self, request):
|
|
captured["calls"].append(request)
|
|
if len(captured["calls"]) == 1:
|
|
return SimpleNamespace(success=lambda: False, code=230001, msg="content format of the post type is incorrect")
|
|
return SimpleNamespace(success=lambda: True)
|
|
|
|
adapter._client = SimpleNamespace(
|
|
im=SimpleNamespace(
|
|
v1=SimpleNamespace(
|
|
message=_MessageAPI(),
|
|
)
|
|
)
|
|
)
|
|
|
|
async def _direct(func, *args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
|
|
result = asyncio.run(
|
|
adapter.edit_message(
|
|
chat_id="oc_chat",
|
|
message_id="om_progress",
|
|
content="可以用 **粗体** 和 *斜体*。",
|
|
)
|
|
)
|
|
|
|
self.assertTrue(result.success)
|
|
self.assertEqual(captured["calls"][0].request_body.msg_type, "post")
|
|
self.assertEqual(captured["calls"][1].request_body.msg_type, "text")
|
|
self.assertEqual(
|
|
captured["calls"][1].request_body.content,
|
|
json.dumps({"text": "可以用 粗体 和 斜体。"}, ensure_ascii=False),
|
|
)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_get_chat_info_uses_real_feishu_chat_api(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
|
|
class _ChatAPI:
|
|
def get(self, request):
|
|
self.request = request
|
|
return SimpleNamespace(
|
|
success=lambda: True,
|
|
data=SimpleNamespace(name="Hermes Group", chat_type="group"),
|
|
)
|
|
|
|
chat_api = _ChatAPI()
|
|
adapter._client = SimpleNamespace(
|
|
im=SimpleNamespace(
|
|
v1=SimpleNamespace(
|
|
chat=chat_api,
|
|
)
|
|
)
|
|
)
|
|
|
|
async def _direct(func, *args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
|
|
info = asyncio.run(adapter.get_chat_info("oc_chat"))
|
|
|
|
self.assertEqual(chat_api.request.chat_id, "oc_chat")
|
|
self.assertEqual(info["chat_id"], "oc_chat")
|
|
self.assertEqual(info["name"], "Hermes Group")
|
|
self.assertEqual(info["type"], "group")
|
|
|
|
class TestAdapterModule(unittest.TestCase):
|
|
def test_adapter_requirement_helper_exists(self):
|
|
source = Path("gateway/platforms/feishu.py").read_text(encoding="utf-8")
|
|
self.assertIn("def check_feishu_requirements()", source)
|
|
self.assertIn("FEISHU_AVAILABLE", source)
|
|
|
|
def test_adapter_declares_websocket_scope(self):
|
|
source = Path("gateway/platforms/feishu.py").read_text(encoding="utf-8")
|
|
self.assertIn("Supported modes: websocket, webhook", source)
|
|
self.assertIn("FEISHU_CONNECTION_MODE", source)
|
|
|
|
def test_adapter_registers_message_read_noop_handler(self):
|
|
source = Path("gateway/platforms/feishu.py").read_text(encoding="utf-8")
|
|
self.assertIn("register_p2_im_message_message_read_v1", source)
|
|
self.assertIn("def _on_message_read_event", source)
|
|
|
|
def test_adapter_registers_reaction_and_card_handlers_for_websocket(self):
|
|
source = Path("gateway/platforms/feishu.py").read_text(encoding="utf-8")
|
|
self.assertIn("register_p2_im_message_reaction_created_v1", source)
|
|
self.assertIn("register_p2_im_message_reaction_deleted_v1", source)
|
|
self.assertIn("register_p2_card_action_trigger", source)
|
|
|
|
|
|
class TestAdapterBehavior(unittest.TestCase):
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_build_event_handler_registers_reaction_and_card_processors(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
calls = []
|
|
|
|
class _Builder:
|
|
def register_p2_im_message_message_read_v1(self, _handler):
|
|
calls.append("message_read")
|
|
return self
|
|
|
|
def register_p2_im_message_receive_v1(self, _handler):
|
|
calls.append("message_receive")
|
|
return self
|
|
|
|
def register_p2_im_message_reaction_created_v1(self, _handler):
|
|
calls.append("reaction_created")
|
|
return self
|
|
|
|
def register_p2_im_message_reaction_deleted_v1(self, _handler):
|
|
calls.append("reaction_deleted")
|
|
return self
|
|
|
|
def register_p2_card_action_trigger(self, _handler):
|
|
calls.append("card_action")
|
|
return self
|
|
|
|
def build(self):
|
|
calls.append("build")
|
|
return "handler"
|
|
|
|
class _Dispatcher:
|
|
@staticmethod
|
|
def builder(_encrypt_key, _verification_token):
|
|
calls.append("builder")
|
|
return _Builder()
|
|
|
|
with patch("gateway.platforms.feishu.EventDispatcherHandler", _Dispatcher):
|
|
handler = adapter._build_event_handler()
|
|
|
|
self.assertEqual(handler, "handler")
|
|
self.assertEqual(
|
|
calls,
|
|
[
|
|
"builder",
|
|
"message_read",
|
|
"message_receive",
|
|
"reaction_created",
|
|
"reaction_deleted",
|
|
"card_action",
|
|
"build",
|
|
],
|
|
)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
@unittest.skipUnless(_HAS_LARK_OAPI, "lark-oapi not installed")
|
|
def test_add_ack_reaction_uses_ok_emoji(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
captured = {}
|
|
|
|
class _ReactionAPI:
|
|
def create(self, request):
|
|
captured["request"] = request
|
|
return SimpleNamespace(
|
|
success=lambda: True,
|
|
data=SimpleNamespace(reaction_id="r_typing"),
|
|
)
|
|
|
|
adapter._client = SimpleNamespace(
|
|
im=SimpleNamespace(v1=SimpleNamespace(message_reaction=_ReactionAPI()))
|
|
)
|
|
|
|
async def _direct(func, *args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
|
|
reaction_id = asyncio.run(adapter._add_ack_reaction("om_msg"))
|
|
|
|
self.assertEqual(reaction_id, "r_typing")
|
|
self.assertEqual(captured["request"].request_body.reaction_type["emoji_type"], "OK")
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_add_ack_reaction_logs_warning_on_failure(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
|
|
class _ReactionAPI:
|
|
def create(self, request):
|
|
raise RuntimeError("boom")
|
|
|
|
adapter._client = SimpleNamespace(
|
|
im=SimpleNamespace(v1=SimpleNamespace(message_reaction=_ReactionAPI()))
|
|
)
|
|
|
|
async def _direct(func, *args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
with (
|
|
patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct),
|
|
self.assertLogs("gateway.platforms.feishu", level="WARNING") as logs,
|
|
):
|
|
reaction_id = asyncio.run(adapter._add_ack_reaction("om_msg"))
|
|
|
|
self.assertIsNone(reaction_id)
|
|
self.assertTrue(
|
|
any("Failed to add ack reaction to om_msg" in entry for entry in logs.output),
|
|
logs.output,
|
|
)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_ack_reaction_events_are_ignored_to_avoid_feedback_loops(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
adapter._loop = object()
|
|
event = SimpleNamespace(
|
|
message_id="om_msg",
|
|
operator_type="user",
|
|
reaction_type=SimpleNamespace(emoji_type="OK"),
|
|
)
|
|
data = SimpleNamespace(event=event)
|
|
|
|
with patch("gateway.platforms.feishu.asyncio.run_coroutine_threadsafe") as run_threadsafe:
|
|
adapter._on_reaction_event("im.message.reaction.created_v1", data)
|
|
|
|
run_threadsafe.assert_not_called()
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_normalize_inbound_text_strips_feishu_mentions(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
cleaned = adapter._normalize_inbound_text("hi @_user_1 there @_user_2")
|
|
self.assertEqual(cleaned, "hi there")
|
|
|
|
@patch.dict(os.environ, {"FEISHU_GROUP_POLICY": "open"}, clear=True)
|
|
def test_group_message_requires_mentions_even_when_policy_open(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
message = SimpleNamespace(mentions=[])
|
|
sender_id = SimpleNamespace(open_id="ou_any", user_id=None)
|
|
self.assertFalse(adapter._should_accept_group_message(message, sender_id))
|
|
|
|
message_with_mention = SimpleNamespace(mentions=[SimpleNamespace(key="@_user_1")])
|
|
self.assertFalse(adapter._should_accept_group_message(message_with_mention, sender_id))
|
|
|
|
@patch.dict(os.environ, {"FEISHU_GROUP_POLICY": "open"}, clear=True)
|
|
def test_group_message_with_other_user_mention_is_rejected_when_bot_identity_unknown(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
sender_id = SimpleNamespace(open_id="ou_any", user_id=None)
|
|
other_mention = SimpleNamespace(
|
|
name="Other User",
|
|
id=SimpleNamespace(open_id="ou_other", user_id="u_other"),
|
|
)
|
|
|
|
self.assertFalse(adapter._should_accept_group_message(SimpleNamespace(mentions=[other_mention]), sender_id))
|
|
|
|
@patch.dict(
|
|
os.environ,
|
|
{
|
|
"FEISHU_GROUP_POLICY": "allowlist",
|
|
"FEISHU_ALLOWED_USERS": "ou_allowed",
|
|
"FEISHU_BOT_NAME": "Hermes Bot",
|
|
},
|
|
clear=True,
|
|
)
|
|
def test_group_message_allowlist_and_mention_both_required(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
mentioned = SimpleNamespace(
|
|
mentions=[
|
|
SimpleNamespace(
|
|
name="Hermes Bot",
|
|
id=SimpleNamespace(open_id="ou_other", user_id="u_other"),
|
|
)
|
|
]
|
|
)
|
|
|
|
self.assertTrue(
|
|
adapter._should_accept_group_message(
|
|
mentioned,
|
|
SimpleNamespace(open_id="ou_allowed", user_id=None),
|
|
)
|
|
)
|
|
self.assertFalse(
|
|
adapter._should_accept_group_message(
|
|
mentioned,
|
|
SimpleNamespace(open_id="ou_blocked", user_id=None),
|
|
)
|
|
)
|
|
|
|
@patch.dict(
|
|
os.environ,
|
|
{
|
|
"FEISHU_GROUP_POLICY": "open",
|
|
"FEISHU_BOT_OPEN_ID": "ou_bot",
|
|
},
|
|
clear=True,
|
|
)
|
|
def test_group_message_matches_bot_open_id_when_configured(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
sender_id = SimpleNamespace(open_id="ou_any", user_id=None)
|
|
|
|
bot_mention = SimpleNamespace(
|
|
name="Hermes",
|
|
id=SimpleNamespace(open_id="ou_bot", user_id="u_bot"),
|
|
)
|
|
other_mention = SimpleNamespace(
|
|
name="Other",
|
|
id=SimpleNamespace(open_id="ou_other", user_id="u_other"),
|
|
)
|
|
|
|
self.assertTrue(adapter._should_accept_group_message(SimpleNamespace(mentions=[bot_mention]), sender_id))
|
|
self.assertFalse(adapter._should_accept_group_message(SimpleNamespace(mentions=[other_mention]), sender_id))
|
|
|
|
@patch.dict(
|
|
os.environ,
|
|
{
|
|
"FEISHU_GROUP_POLICY": "open",
|
|
"FEISHU_BOT_NAME": "Hermes Bot",
|
|
},
|
|
clear=True,
|
|
)
|
|
def test_group_message_matches_bot_name_when_only_name_available(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
sender_id = SimpleNamespace(open_id="ou_any", user_id=None)
|
|
|
|
named_mention = SimpleNamespace(
|
|
name="Hermes Bot",
|
|
id=SimpleNamespace(open_id="ou_other", user_id="u_other"),
|
|
)
|
|
different_mention = SimpleNamespace(
|
|
name="Another Bot",
|
|
id=SimpleNamespace(open_id="ou_other", user_id="u_other"),
|
|
)
|
|
|
|
self.assertTrue(adapter._should_accept_group_message(SimpleNamespace(mentions=[named_mention]), sender_id))
|
|
self.assertFalse(adapter._should_accept_group_message(SimpleNamespace(mentions=[different_mention]), sender_id))
|
|
|
|
@patch.dict(
|
|
os.environ,
|
|
{
|
|
"FEISHU_GROUP_POLICY": "open",
|
|
"FEISHU_BOT_OPEN_ID": "ou_bot",
|
|
},
|
|
clear=True,
|
|
)
|
|
def test_group_post_message_uses_parsed_mentions_when_sdk_mentions_missing(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
sender_id = SimpleNamespace(open_id="ou_any", user_id=None)
|
|
message = SimpleNamespace(
|
|
message_type="post",
|
|
mentions=[],
|
|
content='{"en_us":{"content":[[{"tag":"at","user_name":"Hermes","open_id":"ou_bot"}]]}}',
|
|
)
|
|
|
|
self.assertTrue(adapter._should_accept_group_message(message, sender_id))
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_extract_post_message_as_text(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
message = SimpleNamespace(
|
|
message_type="post",
|
|
content='{"zh_cn":{"title":"Title","content":[[{"tag":"text","text":"hello "}],[{"tag":"a","text":"doc","href":"https://example.com"}]]}}',
|
|
message_id="om_post",
|
|
)
|
|
|
|
text, msg_type, media_urls, media_types = asyncio.run(adapter._extract_message_content(message))
|
|
|
|
self.assertEqual(text, "Title\nhello\n[doc](https://example.com)")
|
|
self.assertEqual(msg_type.value, "text")
|
|
self.assertEqual(media_urls, [])
|
|
self.assertEqual(media_types, [])
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_extract_post_message_uses_first_available_language_block(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
message = SimpleNamespace(
|
|
message_type="post",
|
|
content='{"fr_fr":{"title":"Subject","content":[[{"tag":"text","text":"bonjour"}]]}}',
|
|
message_id="om_post_fr",
|
|
)
|
|
|
|
text, msg_type, media_urls, media_types = asyncio.run(adapter._extract_message_content(message))
|
|
|
|
self.assertEqual(text, "Subject\nbonjour")
|
|
self.assertEqual(msg_type.value, "text")
|
|
self.assertEqual(media_urls, [])
|
|
self.assertEqual(media_types, [])
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_extract_post_message_with_rich_elements_does_not_drop_content(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
message = SimpleNamespace(
|
|
message_type="post",
|
|
content=(
|
|
'{"en_us":{"title":"Rich message","content":['
|
|
'[{"tag":"img","alt":"diagram"}],'
|
|
'[{"tag":"at","user_name":"Alice"},{"tag":"text","text":" please check the attachment"}],'
|
|
'[{"tag":"media","file_name":"spec.pdf"}],'
|
|
'[{"tag":"emotion","emoji_type":"smile"}]'
|
|
']}}'
|
|
),
|
|
message_id="om_post_rich",
|
|
)
|
|
|
|
text, msg_type, media_urls, media_types = asyncio.run(adapter._extract_message_content(message))
|
|
|
|
self.assertEqual(text, "Rich message\n[Image: diagram]\n@Alice please check the attachment\n[Attachment: spec.pdf]\n:smile:")
|
|
self.assertEqual(msg_type.value, "text")
|
|
self.assertEqual(media_urls, [])
|
|
self.assertEqual(media_types, [])
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_extract_post_message_downloads_embedded_resources(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
adapter._download_feishu_image = AsyncMock(return_value=("/tmp/feishu-image.png", "image/png"))
|
|
adapter._download_feishu_message_resource = AsyncMock(return_value=("/tmp/spec.pdf", "application/pdf"))
|
|
message = SimpleNamespace(
|
|
message_type="post",
|
|
content=(
|
|
'{"en_us":{"title":"Rich message","content":['
|
|
'[{"tag":"img","image_key":"img_123","alt":"diagram"}],'
|
|
'[{"tag":"media","file_key":"file_123","file_name":"spec.pdf"}]'
|
|
']}}'
|
|
),
|
|
message_id="om_post_media",
|
|
)
|
|
|
|
text, msg_type, media_urls, media_types = asyncio.run(adapter._extract_message_content(message))
|
|
|
|
self.assertEqual(text, "Rich message\n[Image: diagram]\n[Attachment: spec.pdf]")
|
|
self.assertEqual(msg_type.value, "text")
|
|
self.assertEqual(media_urls, ["/tmp/feishu-image.png", "/tmp/spec.pdf"])
|
|
self.assertEqual(media_types, ["image/png", "application/pdf"])
|
|
adapter._download_feishu_image.assert_awaited_once_with(
|
|
message_id="om_post_media",
|
|
image_key="img_123",
|
|
)
|
|
adapter._download_feishu_message_resource.assert_awaited_once_with(
|
|
message_id="om_post_media",
|
|
file_key="file_123",
|
|
resource_type="file",
|
|
fallback_filename="spec.pdf",
|
|
)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_extract_merge_forward_message_as_text_summary(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
message = SimpleNamespace(
|
|
message_type="merge_forward",
|
|
content=json.dumps(
|
|
{
|
|
"title": "Forwarded updates",
|
|
"messages": [
|
|
{"sender_name": "Alice", "text": "Investigating the incident"},
|
|
{"sender_name": "Bob", "text": "ETA 10 minutes"},
|
|
],
|
|
}
|
|
),
|
|
message_id="om_merge_forward",
|
|
)
|
|
|
|
text, msg_type, media_urls, media_types = asyncio.run(adapter._extract_message_content(message))
|
|
|
|
self.assertEqual(
|
|
text,
|
|
"Forwarded updates\n- Alice: Investigating the incident\n- Bob: ETA 10 minutes",
|
|
)
|
|
self.assertEqual(msg_type.value, "text")
|
|
self.assertEqual(media_urls, [])
|
|
self.assertEqual(media_types, [])
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_extract_share_chat_message_as_text_summary(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
message = SimpleNamespace(
|
|
message_type="share_chat",
|
|
content='{"chat_id":"oc_shared","chat_name":"Platform Ops"}',
|
|
message_id="om_share_chat",
|
|
)
|
|
|
|
text, msg_type, media_urls, media_types = asyncio.run(adapter._extract_message_content(message))
|
|
|
|
self.assertEqual(text, "Shared chat: Platform Ops\nChat ID: oc_shared")
|
|
self.assertEqual(msg_type.value, "text")
|
|
self.assertEqual(media_urls, [])
|
|
self.assertEqual(media_types, [])
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_extract_interactive_message_as_text_summary(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
message = SimpleNamespace(
|
|
message_type="interactive",
|
|
content=json.dumps(
|
|
{
|
|
"card": {
|
|
"header": {"title": {"tag": "plain_text", "content": "Approval Request"}},
|
|
"elements": [
|
|
{"tag": "div", "text": {"tag": "plain_text", "content": "Requester: Alice"}},
|
|
{
|
|
"tag": "action",
|
|
"actions": [
|
|
{"tag": "button", "text": {"tag": "plain_text", "content": "Approve"}},
|
|
],
|
|
},
|
|
],
|
|
}
|
|
}
|
|
),
|
|
message_id="om_interactive",
|
|
)
|
|
|
|
text, msg_type, media_urls, media_types = asyncio.run(adapter._extract_message_content(message))
|
|
|
|
self.assertEqual(text, "Approval Request\nRequester: Alice\nApprove\nActions: Approve")
|
|
self.assertEqual(msg_type.value, "text")
|
|
self.assertEqual(media_urls, [])
|
|
self.assertEqual(media_types, [])
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_extract_image_message_downloads_and_caches(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
adapter._download_feishu_image = AsyncMock(return_value=("/tmp/feishu-image.png", "image/png"))
|
|
message = SimpleNamespace(
|
|
message_type="image",
|
|
content='{"image_key":"img_123"}',
|
|
message_id="om_image",
|
|
)
|
|
|
|
text, msg_type, media_urls, media_types = asyncio.run(adapter._extract_message_content(message))
|
|
|
|
self.assertEqual(text, "")
|
|
self.assertEqual(msg_type.value, "photo")
|
|
self.assertEqual(media_urls, ["/tmp/feishu-image.png"])
|
|
self.assertEqual(media_types, ["image/png"])
|
|
adapter._download_feishu_image.assert_awaited_once_with(
|
|
message_id="om_image",
|
|
image_key="img_123",
|
|
)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_extract_audio_message_downloads_and_caches(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
adapter._download_feishu_message_resource = AsyncMock(
|
|
return_value=("/tmp/feishu-audio.ogg", "audio/ogg")
|
|
)
|
|
message = SimpleNamespace(
|
|
message_type="audio",
|
|
content='{"file_key":"file_audio","file_name":"voice.ogg"}',
|
|
message_id="om_audio",
|
|
)
|
|
|
|
text, msg_type, media_urls, media_types = asyncio.run(adapter._extract_message_content(message))
|
|
|
|
self.assertEqual(text, "")
|
|
self.assertEqual(msg_type.value, "audio")
|
|
self.assertEqual(media_urls, ["/tmp/feishu-audio.ogg"])
|
|
self.assertEqual(media_types, ["audio/ogg"])
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_extract_file_message_downloads_and_caches(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
adapter._download_feishu_message_resource = AsyncMock(
|
|
return_value=("/tmp/doc_123_report.pdf", "application/pdf")
|
|
)
|
|
message = SimpleNamespace(
|
|
message_type="file",
|
|
content='{"file_key":"file_doc","file_name":"report.pdf"}',
|
|
message_id="om_file",
|
|
)
|
|
|
|
text, msg_type, media_urls, media_types = asyncio.run(adapter._extract_message_content(message))
|
|
|
|
self.assertEqual(text, "")
|
|
self.assertEqual(msg_type.value, "document")
|
|
self.assertEqual(media_urls, ["/tmp/doc_123_report.pdf"])
|
|
self.assertEqual(media_types, ["application/pdf"])
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_extract_media_message_with_image_mime_becomes_photo(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
adapter._download_feishu_message_resource = AsyncMock(
|
|
return_value=("/tmp/feishu-media.jpg", "image/jpeg")
|
|
)
|
|
message = SimpleNamespace(
|
|
message_type="media",
|
|
content='{"file_key":"file_media","file_name":"photo.jpg"}',
|
|
message_id="om_media",
|
|
)
|
|
|
|
text, msg_type, media_urls, media_types = asyncio.run(adapter._extract_message_content(message))
|
|
|
|
self.assertEqual(text, "")
|
|
self.assertEqual(msg_type.value, "photo")
|
|
self.assertEqual(media_urls, ["/tmp/feishu-media.jpg"])
|
|
self.assertEqual(media_types, ["image/jpeg"])
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_extract_media_message_with_video_mime_becomes_video(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
adapter._download_feishu_message_resource = AsyncMock(
|
|
return_value=("/tmp/feishu-video.mp4", "video/mp4")
|
|
)
|
|
message = SimpleNamespace(
|
|
message_type="media",
|
|
content='{"file_key":"file_video","file_name":"clip.mp4"}',
|
|
message_id="om_video",
|
|
)
|
|
|
|
text, msg_type, media_urls, media_types = asyncio.run(adapter._extract_message_content(message))
|
|
|
|
self.assertEqual(text, "")
|
|
self.assertEqual(msg_type.value, "video")
|
|
self.assertEqual(media_urls, ["/tmp/feishu-video.mp4"])
|
|
self.assertEqual(media_types, ["video/mp4"])
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_extract_text_from_raw_content_uses_relation_message_fallbacks(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
|
|
shared = adapter._extract_text_from_raw_content(
|
|
msg_type="share_chat",
|
|
raw_content='{"chat_id":"oc_shared","chat_name":"Platform Ops"}',
|
|
)
|
|
attachment = adapter._extract_text_from_raw_content(
|
|
msg_type="file",
|
|
raw_content='{"file_key":"file_1","file_name":"report.pdf"}',
|
|
)
|
|
|
|
self.assertEqual(shared, "Shared chat: Platform Ops\nChat ID: oc_shared")
|
|
self.assertEqual(attachment, "[Attachment: report.pdf]")
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_extract_text_message_starting_with_slash_becomes_command(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
adapter._dispatch_inbound_event = AsyncMock()
|
|
adapter.get_chat_info = AsyncMock(
|
|
return_value={"chat_id": "oc_chat", "name": "Feishu DM", "type": "dm"}
|
|
)
|
|
adapter._resolve_sender_profile = AsyncMock(
|
|
return_value={"user_id": "ou_user", "user_name": "张三", "user_id_alt": None}
|
|
)
|
|
message = SimpleNamespace(
|
|
chat_id="oc_chat",
|
|
thread_id=None,
|
|
parent_id=None,
|
|
upper_message_id=None,
|
|
message_type="text",
|
|
content='{"text":"/help test"}',
|
|
message_id="om_command",
|
|
)
|
|
|
|
asyncio.run(
|
|
adapter._process_inbound_message(
|
|
data=SimpleNamespace(event=SimpleNamespace(message=message)),
|
|
message=message,
|
|
sender_id=SimpleNamespace(open_id="ou_user", user_id=None, union_id=None),
|
|
chat_type="p2p",
|
|
message_id="om_command",
|
|
)
|
|
)
|
|
|
|
event = adapter._dispatch_inbound_event.await_args.args[0]
|
|
self.assertEqual(event.message_type.value, "command")
|
|
self.assertEqual(event.text, "/help test")
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_extract_text_file_injects_content(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
with tempfile.NamedTemporaryFile("w", suffix=".txt", delete=False) as tmp:
|
|
tmp.write("hello from feishu")
|
|
path = tmp.name
|
|
|
|
try:
|
|
text = asyncio.run(adapter._maybe_extract_text_document(path, "text/plain"))
|
|
finally:
|
|
os.unlink(path)
|
|
|
|
self.assertIn("hello from feishu", text)
|
|
self.assertIn("[Content of", text)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_message_event_submits_to_adapter_loop(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
adapter._loop = object()
|
|
|
|
message = SimpleNamespace(
|
|
message_id="om_text",
|
|
chat_type="p2p",
|
|
chat_id="oc_chat",
|
|
message_type="text",
|
|
content='{"text":"hello"}',
|
|
)
|
|
sender_id = SimpleNamespace(open_id="ou_user", user_id=None, union_id=None)
|
|
sender = SimpleNamespace(sender_id=sender_id, sender_type="user")
|
|
data = SimpleNamespace(event=SimpleNamespace(message=message, sender=sender))
|
|
|
|
future = SimpleNamespace(add_done_callback=lambda *_args, **_kwargs: None)
|
|
def _submit(coro, _loop):
|
|
coro.close()
|
|
return future
|
|
|
|
with patch("gateway.platforms.feishu.asyncio.run_coroutine_threadsafe", side_effect=_submit) as submit:
|
|
adapter._on_message_event(data)
|
|
|
|
self.assertTrue(submit.called)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_process_inbound_message_uses_event_sender_identity_only(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.base import MessageType
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
adapter._dispatch_inbound_event = AsyncMock()
|
|
# Sender name now comes from the contact API; mock it to return a known value.
|
|
adapter._resolve_sender_name_from_api = AsyncMock(return_value="张三")
|
|
adapter.get_chat_info = AsyncMock(
|
|
return_value={"chat_id": "oc_chat", "name": "Feishu DM", "type": "dm"}
|
|
)
|
|
message = SimpleNamespace(
|
|
chat_id="oc_chat",
|
|
thread_id=None,
|
|
message_type="text",
|
|
content='{"text":"hello"}',
|
|
message_id="om_text",
|
|
)
|
|
sender_id = SimpleNamespace(
|
|
open_id="ou_user",
|
|
user_id="u_user",
|
|
union_id="on_union",
|
|
)
|
|
data = SimpleNamespace(event=SimpleNamespace(message=message, sender=SimpleNamespace(sender_id=sender_id)))
|
|
|
|
asyncio.run(
|
|
adapter._process_inbound_message(
|
|
data=data,
|
|
message=message,
|
|
sender_id=sender_id,
|
|
chat_type="p2p",
|
|
message_id="om_text",
|
|
)
|
|
)
|
|
|
|
adapter._dispatch_inbound_event.assert_awaited_once()
|
|
event = adapter._dispatch_inbound_event.await_args.args[0]
|
|
self.assertEqual(event.message_type, MessageType.TEXT)
|
|
self.assertEqual(event.source.user_id, "ou_user")
|
|
self.assertEqual(event.source.user_name, "张三")
|
|
self.assertEqual(event.source.user_id_alt, "on_union")
|
|
self.assertEqual(event.source.chat_name, "Feishu DM")
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_text_batch_merges_rapid_messages_into_single_event(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.base import MessageEvent, MessageType
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
from gateway.session import SessionSource
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
adapter.handle_message = AsyncMock()
|
|
source = SessionSource(
|
|
platform=adapter.platform,
|
|
chat_id="oc_chat",
|
|
chat_name="Feishu DM",
|
|
chat_type="dm",
|
|
user_id="ou_user",
|
|
user_name="张三",
|
|
)
|
|
|
|
async def _sleep(_delay):
|
|
return None
|
|
|
|
async def _run() -> None:
|
|
with patch("gateway.platforms.feishu.asyncio.sleep", side_effect=_sleep):
|
|
await adapter._dispatch_inbound_event(
|
|
MessageEvent(text="A", message_type=MessageType.TEXT, source=source, message_id="om_1")
|
|
)
|
|
await adapter._dispatch_inbound_event(
|
|
MessageEvent(text="B", message_type=MessageType.TEXT, source=source, message_id="om_2")
|
|
)
|
|
pending = list(adapter._pending_text_batch_tasks.values())
|
|
self.assertEqual(len(pending), 1)
|
|
await asyncio.gather(*pending, return_exceptions=True)
|
|
|
|
asyncio.run(_run())
|
|
|
|
adapter.handle_message.assert_awaited_once()
|
|
event = adapter.handle_message.await_args.args[0]
|
|
self.assertEqual(event.text, "A\nB")
|
|
self.assertEqual(event.message_type, MessageType.TEXT)
|
|
|
|
@patch.dict(
|
|
os.environ,
|
|
{
|
|
"HERMES_FEISHU_TEXT_BATCH_MAX_MESSAGES": "2",
|
|
},
|
|
clear=True,
|
|
)
|
|
def test_text_batch_flushes_when_message_count_limit_is_hit(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.base import MessageEvent, MessageType
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
from gateway.session import SessionSource
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
adapter.handle_message = AsyncMock()
|
|
source = SessionSource(
|
|
platform=adapter.platform,
|
|
chat_id="oc_chat",
|
|
chat_name="Feishu DM",
|
|
chat_type="dm",
|
|
user_id="ou_user",
|
|
user_name="张三",
|
|
)
|
|
|
|
async def _sleep(_delay):
|
|
return None
|
|
|
|
async def _run() -> None:
|
|
with patch("gateway.platforms.feishu.asyncio.sleep", side_effect=_sleep):
|
|
await adapter._dispatch_inbound_event(
|
|
MessageEvent(text="A", message_type=MessageType.TEXT, source=source, message_id="om_1")
|
|
)
|
|
await adapter._dispatch_inbound_event(
|
|
MessageEvent(text="B", message_type=MessageType.TEXT, source=source, message_id="om_2")
|
|
)
|
|
await adapter._dispatch_inbound_event(
|
|
MessageEvent(text="C", message_type=MessageType.TEXT, source=source, message_id="om_3")
|
|
)
|
|
pending = list(adapter._pending_text_batch_tasks.values())
|
|
self.assertEqual(len(pending), 1)
|
|
await asyncio.gather(*pending, return_exceptions=True)
|
|
|
|
asyncio.run(_run())
|
|
|
|
self.assertEqual(adapter.handle_message.await_count, 2)
|
|
first = adapter.handle_message.await_args_list[0].args[0]
|
|
second = adapter.handle_message.await_args_list[1].args[0]
|
|
self.assertEqual(first.text, "A\nB")
|
|
self.assertEqual(second.text, "C")
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_media_batch_merges_rapid_photo_messages(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.base import MessageEvent, MessageType
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
from gateway.session import SessionSource
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
adapter.handle_message = AsyncMock()
|
|
source = SessionSource(
|
|
platform=adapter.platform,
|
|
chat_id="oc_chat",
|
|
chat_name="Feishu DM",
|
|
chat_type="dm",
|
|
user_id="ou_user",
|
|
user_name="张三",
|
|
)
|
|
|
|
async def _sleep(_delay):
|
|
return None
|
|
|
|
async def _run() -> None:
|
|
with patch("gateway.platforms.feishu.asyncio.sleep", side_effect=_sleep):
|
|
await adapter._dispatch_inbound_event(
|
|
MessageEvent(
|
|
text="第一张",
|
|
message_type=MessageType.PHOTO,
|
|
source=source,
|
|
message_id="om_p1",
|
|
media_urls=["/tmp/a.png"],
|
|
media_types=["image/png"],
|
|
)
|
|
)
|
|
await adapter._dispatch_inbound_event(
|
|
MessageEvent(
|
|
text="第二张",
|
|
message_type=MessageType.PHOTO,
|
|
source=source,
|
|
message_id="om_p2",
|
|
media_urls=["/tmp/b.png"],
|
|
media_types=["image/png"],
|
|
)
|
|
)
|
|
pending = list(adapter._pending_media_batch_tasks.values())
|
|
self.assertEqual(len(pending), 1)
|
|
await asyncio.gather(*pending, return_exceptions=True)
|
|
|
|
asyncio.run(_run())
|
|
|
|
adapter.handle_message.assert_awaited_once()
|
|
event = adapter.handle_message.await_args.args[0]
|
|
self.assertEqual(event.media_urls, ["/tmp/a.png", "/tmp/b.png"])
|
|
self.assertIn("第一张", event.text)
|
|
self.assertIn("第二张", event.text)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_send_image_downloads_then_uses_native_image_send(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
adapter.send_image_file = AsyncMock(return_value=SimpleNamespace(success=True, message_id="om_img"))
|
|
|
|
async def _run():
|
|
with patch("gateway.platforms.feishu.cache_image_from_url", new=AsyncMock(return_value="/tmp/cached.png")):
|
|
return await adapter.send_image("oc_chat", "https://example.com/cat.png", caption="cat")
|
|
|
|
result = asyncio.run(_run())
|
|
|
|
self.assertTrue(result.success)
|
|
adapter.send_image_file.assert_awaited_once()
|
|
self.assertEqual(adapter.send_image_file.await_args.kwargs["image_path"], "/tmp/cached.png")
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_send_animation_degrades_to_document_send(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
adapter.send_document = AsyncMock(return_value=SimpleNamespace(success=True, message_id="om_gif"))
|
|
|
|
async def _run():
|
|
with patch.object(
|
|
adapter,
|
|
"_download_remote_document",
|
|
new=AsyncMock(return_value=("/tmp/anim.gif", "anim.gif")),
|
|
):
|
|
return await adapter.send_animation("oc_chat", "https://example.com/anim.gif", caption="look")
|
|
|
|
result = asyncio.run(_run())
|
|
|
|
self.assertTrue(result.success)
|
|
adapter.send_document.assert_awaited_once()
|
|
caption = adapter.send_document.await_args.kwargs["caption"]
|
|
self.assertIn("GIF downgraded to file", caption)
|
|
self.assertIn("look", caption)
|
|
|
|
def test_dedup_state_persists_across_adapter_restart(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
with tempfile.TemporaryDirectory() as temp_home:
|
|
with patch.dict(os.environ, {"HERMES_HOME": temp_home}, clear=False):
|
|
first = FeishuAdapter(PlatformConfig())
|
|
self.assertFalse(first._is_duplicate("om_same"))
|
|
second = FeishuAdapter(PlatformConfig())
|
|
self.assertTrue(second._is_duplicate("om_same"))
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_process_inbound_group_message_keeps_group_type_when_chat_lookup_falls_back(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
adapter._dispatch_inbound_event = AsyncMock()
|
|
adapter.get_chat_info = AsyncMock(
|
|
return_value={"chat_id": "oc_group", "name": "oc_group", "type": "dm"}
|
|
)
|
|
adapter._resolve_sender_profile = AsyncMock(
|
|
return_value={"user_id": "ou_user", "user_name": "张三", "user_id_alt": None}
|
|
)
|
|
message = SimpleNamespace(
|
|
chat_id="oc_group",
|
|
thread_id=None,
|
|
message_type="text",
|
|
content='{"text":"hello group"}',
|
|
message_id="om_group_text",
|
|
)
|
|
sender_id = SimpleNamespace(open_id="ou_user", user_id=None, union_id=None)
|
|
data = SimpleNamespace(event=SimpleNamespace(message=message))
|
|
|
|
asyncio.run(
|
|
adapter._process_inbound_message(
|
|
data=data,
|
|
message=message,
|
|
sender_id=sender_id,
|
|
chat_type="group",
|
|
message_id="om_group_text",
|
|
)
|
|
)
|
|
|
|
event = adapter._dispatch_inbound_event.await_args.args[0]
|
|
self.assertEqual(event.source.chat_type, "group")
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_process_inbound_message_fetches_reply_to_text(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
adapter._dispatch_inbound_event = AsyncMock()
|
|
adapter.get_chat_info = AsyncMock(
|
|
return_value={"chat_id": "oc_chat", "name": "Feishu DM", "type": "dm"}
|
|
)
|
|
adapter._resolve_sender_profile = AsyncMock(
|
|
return_value={"user_id": "ou_user", "user_name": "张三", "user_id_alt": None}
|
|
)
|
|
adapter._fetch_message_text = AsyncMock(return_value="父消息内容")
|
|
message = SimpleNamespace(
|
|
chat_id="oc_chat",
|
|
thread_id=None,
|
|
parent_id="om_parent",
|
|
upper_message_id=None,
|
|
message_type="text",
|
|
content='{"text":"reply"}',
|
|
message_id="om_reply",
|
|
)
|
|
|
|
asyncio.run(
|
|
adapter._process_inbound_message(
|
|
data=SimpleNamespace(event=SimpleNamespace(message=message)),
|
|
message=message,
|
|
sender_id=SimpleNamespace(open_id="ou_user", user_id=None, union_id=None),
|
|
chat_type="p2p",
|
|
message_id="om_reply",
|
|
)
|
|
)
|
|
|
|
event = adapter._dispatch_inbound_event.await_args.args[0]
|
|
self.assertEqual(event.reply_to_message_id, "om_parent")
|
|
self.assertEqual(event.reply_to_text, "父消息内容")
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_send_replies_in_thread_when_thread_metadata_present(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
captured = {}
|
|
|
|
class _ReplyAPI:
|
|
def reply(self, request):
|
|
captured["request"] = request
|
|
return SimpleNamespace(
|
|
success=lambda: True,
|
|
data=SimpleNamespace(message_id="om_reply"),
|
|
)
|
|
|
|
adapter._client = SimpleNamespace(
|
|
im=SimpleNamespace(
|
|
v1=SimpleNamespace(
|
|
message=_ReplyAPI(),
|
|
)
|
|
)
|
|
)
|
|
|
|
async def _direct(func, *args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
|
|
result = asyncio.run(
|
|
adapter.send(
|
|
chat_id="oc_chat",
|
|
content="hello",
|
|
reply_to="om_parent",
|
|
metadata={"thread_id": "omt-thread"},
|
|
)
|
|
)
|
|
|
|
self.assertTrue(result.success)
|
|
self.assertEqual(result.message_id, "om_reply")
|
|
self.assertTrue(captured["request"].request_body.reply_in_thread)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_send_retries_transient_failure(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
captured = {"attempts": 0}
|
|
sleeps = []
|
|
|
|
class _MessageAPI:
|
|
def create(self, request):
|
|
captured["attempts"] += 1
|
|
captured["request"] = request
|
|
if captured["attempts"] == 1:
|
|
raise OSError("temporary send failure")
|
|
return SimpleNamespace(
|
|
success=lambda: True,
|
|
data=SimpleNamespace(message_id="om_retry"),
|
|
)
|
|
|
|
adapter._client = SimpleNamespace(
|
|
im=SimpleNamespace(
|
|
v1=SimpleNamespace(
|
|
message=_MessageAPI(),
|
|
)
|
|
)
|
|
)
|
|
|
|
async def _direct(func, *args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
async def _sleep(delay):
|
|
sleeps.append(delay)
|
|
|
|
with (
|
|
patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct),
|
|
patch("gateway.platforms.feishu.asyncio.sleep", side_effect=_sleep),
|
|
):
|
|
result = asyncio.run(adapter.send(chat_id="oc_chat", content="hello retry"))
|
|
|
|
self.assertTrue(result.success)
|
|
self.assertEqual(result.message_id, "om_retry")
|
|
self.assertEqual(captured["attempts"], 2)
|
|
self.assertEqual(sleeps, [1])
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_send_does_not_retry_deterministic_api_failure(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
captured = {"attempts": 0}
|
|
sleeps = []
|
|
|
|
class _MessageAPI:
|
|
def create(self, request):
|
|
captured["attempts"] += 1
|
|
return SimpleNamespace(
|
|
success=lambda: False,
|
|
code=400,
|
|
msg="bad request",
|
|
)
|
|
|
|
adapter._client = SimpleNamespace(
|
|
im=SimpleNamespace(
|
|
v1=SimpleNamespace(
|
|
message=_MessageAPI(),
|
|
)
|
|
)
|
|
)
|
|
|
|
async def _direct(func, *args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
async def _sleep(delay):
|
|
sleeps.append(delay)
|
|
|
|
with (
|
|
patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct),
|
|
patch("gateway.platforms.feishu.asyncio.sleep", side_effect=_sleep),
|
|
):
|
|
result = asyncio.run(adapter.send(chat_id="oc_chat", content="bad payload"))
|
|
|
|
self.assertFalse(result.success)
|
|
self.assertEqual(result.error, "[400] bad request")
|
|
self.assertEqual(captured["attempts"], 1)
|
|
self.assertEqual(sleeps, [])
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_send_document_reply_uses_thread_flag(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
captured = {}
|
|
|
|
class _FileAPI:
|
|
def create(self, request):
|
|
return SimpleNamespace(
|
|
success=lambda: True,
|
|
data=SimpleNamespace(file_key="file_123"),
|
|
)
|
|
|
|
class _MessageAPI:
|
|
def reply(self, request):
|
|
captured["request"] = request
|
|
return SimpleNamespace(
|
|
success=lambda: True,
|
|
data=SimpleNamespace(message_id="om_file_reply"),
|
|
)
|
|
|
|
adapter._client = SimpleNamespace(
|
|
im=SimpleNamespace(
|
|
v1=SimpleNamespace(
|
|
file=_FileAPI(),
|
|
message=_MessageAPI(),
|
|
)
|
|
)
|
|
)
|
|
|
|
async def _direct(func, *args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
with tempfile.NamedTemporaryFile("wb", suffix=".pdf", delete=False) as tmp:
|
|
tmp.write(b"%PDF-1.4 test")
|
|
file_path = tmp.name
|
|
|
|
try:
|
|
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
|
|
result = asyncio.run(
|
|
adapter.send_document(
|
|
chat_id="oc_chat",
|
|
file_path=file_path,
|
|
reply_to="om_parent",
|
|
metadata={"thread_id": "omt-thread"},
|
|
)
|
|
)
|
|
finally:
|
|
os.unlink(file_path)
|
|
|
|
self.assertTrue(result.success)
|
|
self.assertTrue(captured["request"].request_body.reply_in_thread)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_send_document_uploads_file_and_sends_file_message(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
captured = {}
|
|
|
|
class _FileAPI:
|
|
def create(self, request):
|
|
captured["upload_request"] = request
|
|
return SimpleNamespace(
|
|
success=lambda: True,
|
|
data=SimpleNamespace(file_key="file_123"),
|
|
)
|
|
|
|
class _MessageAPI:
|
|
def create(self, request):
|
|
captured["message_request"] = request
|
|
return SimpleNamespace(
|
|
success=lambda: True,
|
|
data=SimpleNamespace(message_id="om_file_msg"),
|
|
)
|
|
|
|
adapter._client = SimpleNamespace(
|
|
im=SimpleNamespace(
|
|
v1=SimpleNamespace(
|
|
file=_FileAPI(),
|
|
message=_MessageAPI(),
|
|
)
|
|
)
|
|
)
|
|
|
|
async def _direct(func, *args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
with tempfile.NamedTemporaryFile("wb", suffix=".pdf", delete=False) as tmp:
|
|
tmp.write(b"%PDF-1.4 test")
|
|
file_path = tmp.name
|
|
|
|
try:
|
|
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
|
|
result = asyncio.run(adapter.send_document(chat_id="oc_chat", file_path=file_path))
|
|
finally:
|
|
os.unlink(file_path)
|
|
|
|
self.assertTrue(result.success)
|
|
self.assertEqual(result.message_id, "om_file_msg")
|
|
self.assertEqual(captured["upload_request"].request_body.file_type, "pdf")
|
|
self.assertEqual(
|
|
captured["message_request"].request_body.content,
|
|
'{"file_key": "file_123"}',
|
|
)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_send_document_with_caption_uses_single_post_message(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
captured = {}
|
|
|
|
class _FileAPI:
|
|
def create(self, request):
|
|
return SimpleNamespace(
|
|
success=lambda: True,
|
|
data=SimpleNamespace(file_key="file_123"),
|
|
)
|
|
|
|
class _MessageAPI:
|
|
def create(self, request):
|
|
captured["message_request"] = request
|
|
return SimpleNamespace(
|
|
success=lambda: True,
|
|
data=SimpleNamespace(message_id="om_post_msg"),
|
|
)
|
|
|
|
adapter._client = SimpleNamespace(
|
|
im=SimpleNamespace(
|
|
v1=SimpleNamespace(
|
|
file=_FileAPI(),
|
|
message=_MessageAPI(),
|
|
)
|
|
)
|
|
)
|
|
|
|
async def _direct(func, *args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
with tempfile.NamedTemporaryFile("wb", suffix=".pdf", delete=False) as tmp:
|
|
tmp.write(b"%PDF-1.4 test")
|
|
file_path = tmp.name
|
|
|
|
try:
|
|
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
|
|
result = asyncio.run(
|
|
adapter.send_document(chat_id="oc_chat", file_path=file_path, caption="报告请看")
|
|
)
|
|
finally:
|
|
os.unlink(file_path)
|
|
|
|
self.assertTrue(result.success)
|
|
self.assertEqual(captured["message_request"].request_body.msg_type, "post")
|
|
self.assertIn('"tag": "media"', captured["message_request"].request_body.content)
|
|
self.assertIn('"file_key": "file_123"', captured["message_request"].request_body.content)
|
|
self.assertIn("报告请看", captured["message_request"].request_body.content)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_send_image_file_uploads_image_and_sends_image_message(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
captured = {}
|
|
|
|
class _ImageAPI:
|
|
def create(self, request):
|
|
captured["upload_request"] = request
|
|
return SimpleNamespace(
|
|
success=lambda: True,
|
|
data=SimpleNamespace(image_key="img_123"),
|
|
)
|
|
|
|
class _MessageAPI:
|
|
def create(self, request):
|
|
captured["message_request"] = request
|
|
return SimpleNamespace(
|
|
success=lambda: True,
|
|
data=SimpleNamespace(message_id="om_image_msg"),
|
|
)
|
|
|
|
adapter._client = SimpleNamespace(
|
|
im=SimpleNamespace(
|
|
v1=SimpleNamespace(
|
|
image=_ImageAPI(),
|
|
message=_MessageAPI(),
|
|
)
|
|
)
|
|
)
|
|
|
|
async def _direct(func, *args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
with tempfile.NamedTemporaryFile("wb", suffix=".png", delete=False) as tmp:
|
|
tmp.write(b"\x89PNG\r\n\x1a\n")
|
|
image_path = tmp.name
|
|
|
|
try:
|
|
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
|
|
result = asyncio.run(adapter.send_image_file(chat_id="oc_chat", image_path=image_path))
|
|
finally:
|
|
os.unlink(image_path)
|
|
|
|
self.assertTrue(result.success)
|
|
self.assertEqual(result.message_id, "om_image_msg")
|
|
self.assertEqual(captured["upload_request"].request_body.image_type, "message")
|
|
self.assertEqual(
|
|
captured["message_request"].request_body.content,
|
|
'{"image_key": "img_123"}',
|
|
)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_send_image_file_with_caption_uses_single_post_message(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
captured = {}
|
|
|
|
class _ImageAPI:
|
|
def create(self, request):
|
|
return SimpleNamespace(
|
|
success=lambda: True,
|
|
data=SimpleNamespace(image_key="img_123"),
|
|
)
|
|
|
|
class _MessageAPI:
|
|
def create(self, request):
|
|
captured["message_request"] = request
|
|
return SimpleNamespace(
|
|
success=lambda: True,
|
|
data=SimpleNamespace(message_id="om_post_img"),
|
|
)
|
|
|
|
adapter._client = SimpleNamespace(
|
|
im=SimpleNamespace(
|
|
v1=SimpleNamespace(
|
|
image=_ImageAPI(),
|
|
message=_MessageAPI(),
|
|
)
|
|
)
|
|
)
|
|
|
|
async def _direct(func, *args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
with tempfile.NamedTemporaryFile("wb", suffix=".png", delete=False) as tmp:
|
|
tmp.write(b"\x89PNG\r\n\x1a\n")
|
|
image_path = tmp.name
|
|
|
|
try:
|
|
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
|
|
result = asyncio.run(
|
|
adapter.send_image_file(chat_id="oc_chat", image_path=image_path, caption="截图说明")
|
|
)
|
|
finally:
|
|
os.unlink(image_path)
|
|
|
|
self.assertTrue(result.success)
|
|
self.assertEqual(captured["message_request"].request_body.msg_type, "post")
|
|
self.assertIn('"tag": "img"', captured["message_request"].request_body.content)
|
|
self.assertIn('"image_key": "img_123"', captured["message_request"].request_body.content)
|
|
self.assertIn("截图说明", captured["message_request"].request_body.content)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_send_video_uploads_file_and_sends_media_message(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
captured = {}
|
|
|
|
class _FileAPI:
|
|
def create(self, request):
|
|
captured["upload_request"] = request
|
|
return SimpleNamespace(
|
|
success=lambda: True,
|
|
data=SimpleNamespace(file_key="file_video_123"),
|
|
)
|
|
|
|
class _MessageAPI:
|
|
def create(self, request):
|
|
captured["message_request"] = request
|
|
return SimpleNamespace(
|
|
success=lambda: True,
|
|
data=SimpleNamespace(message_id="om_video_msg"),
|
|
)
|
|
|
|
adapter._client = SimpleNamespace(
|
|
im=SimpleNamespace(
|
|
v1=SimpleNamespace(
|
|
file=_FileAPI(),
|
|
message=_MessageAPI(),
|
|
)
|
|
)
|
|
)
|
|
|
|
async def _direct(func, *args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
with tempfile.NamedTemporaryFile("wb", suffix=".mp4", delete=False) as tmp:
|
|
tmp.write(b"\x00\x00\x00\x18ftypmp42")
|
|
video_path = tmp.name
|
|
|
|
try:
|
|
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
|
|
result = asyncio.run(adapter.send_video(chat_id="oc_chat", video_path=video_path))
|
|
finally:
|
|
os.unlink(video_path)
|
|
|
|
self.assertTrue(result.success)
|
|
self.assertEqual(captured["upload_request"].request_body.file_type, "mp4")
|
|
self.assertEqual(captured["message_request"].request_body.msg_type, "media")
|
|
self.assertEqual(captured["message_request"].request_body.content, '{"file_key": "file_video_123"}')
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_send_voice_uploads_opus_and_sends_audio_message(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
captured = {}
|
|
|
|
class _FileAPI:
|
|
def create(self, request):
|
|
captured["upload_request"] = request
|
|
return SimpleNamespace(
|
|
success=lambda: True,
|
|
data=SimpleNamespace(file_key="file_audio_123"),
|
|
)
|
|
|
|
class _MessageAPI:
|
|
def create(self, request):
|
|
captured["message_request"] = request
|
|
return SimpleNamespace(
|
|
success=lambda: True,
|
|
data=SimpleNamespace(message_id="om_audio_msg"),
|
|
)
|
|
|
|
adapter._client = SimpleNamespace(
|
|
im=SimpleNamespace(
|
|
v1=SimpleNamespace(
|
|
file=_FileAPI(),
|
|
message=_MessageAPI(),
|
|
)
|
|
)
|
|
)
|
|
|
|
async def _direct(func, *args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
with tempfile.NamedTemporaryFile("wb", suffix=".opus", delete=False) as tmp:
|
|
tmp.write(b"opus")
|
|
audio_path = tmp.name
|
|
|
|
try:
|
|
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
|
|
result = asyncio.run(adapter.send_voice(chat_id="oc_chat", audio_path=audio_path))
|
|
finally:
|
|
os.unlink(audio_path)
|
|
|
|
self.assertTrue(result.success)
|
|
self.assertEqual(captured["upload_request"].request_body.file_type, "opus")
|
|
self.assertEqual(captured["message_request"].request_body.msg_type, "audio")
|
|
self.assertEqual(captured["message_request"].request_body.content, '{"file_key": "file_audio_123"}')
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_build_post_payload_extracts_title_and_links(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
payload = json.loads(adapter._build_post_payload("# 标题\n访问 [文档](https://example.com)"))
|
|
|
|
elements = payload["zh_cn"]["content"][0]
|
|
self.assertEqual(elements, [{"tag": "md", "text": "# 标题\n访问 [文档](https://example.com)"}])
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_build_post_payload_wraps_markdown_in_md_tag(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
payload = json.loads(
|
|
adapter._build_post_payload("支持 **粗体**、*斜体* 和 `代码`")
|
|
)
|
|
|
|
elements = payload["zh_cn"]["content"][0]
|
|
self.assertEqual(
|
|
elements,
|
|
[
|
|
{"tag": "md", "text": "支持 **粗体**、*斜体* 和 `代码`"},
|
|
],
|
|
)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_build_post_payload_keeps_full_markdown_text(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
payload = json.loads(
|
|
adapter._build_post_payload(
|
|
"---\n1. 第一项\n 2. 子项\n- 外层\n - 内层\n<u>下划线</u> 和 ~~删除线~~"
|
|
)
|
|
)
|
|
|
|
rows = payload["zh_cn"]["content"]
|
|
self.assertEqual(
|
|
rows,
|
|
[[{"tag": "md", "text": "---\n1. 第一项\n 2. 子项\n- 外层\n - 内层\n<u>下划线</u> 和 ~~删除线~~"}]],
|
|
)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_send_uses_post_for_inline_markdown(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
captured = {}
|
|
|
|
class _MessageAPI:
|
|
def create(self, request):
|
|
captured["request"] = request
|
|
return SimpleNamespace(
|
|
success=lambda: True,
|
|
data=SimpleNamespace(message_id="om_markdown"),
|
|
)
|
|
|
|
adapter._client = SimpleNamespace(
|
|
im=SimpleNamespace(
|
|
v1=SimpleNamespace(
|
|
message=_MessageAPI(),
|
|
)
|
|
)
|
|
)
|
|
|
|
async def _direct(func, *args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
|
|
result = asyncio.run(
|
|
adapter.send(
|
|
chat_id="oc_chat",
|
|
content="可以用 **粗体** 和 *斜体*。",
|
|
)
|
|
)
|
|
|
|
self.assertTrue(result.success)
|
|
self.assertEqual(captured["request"].request_body.msg_type, "post")
|
|
payload = json.loads(captured["request"].request_body.content)
|
|
elements = payload["zh_cn"]["content"][0]
|
|
self.assertEqual(elements, [{"tag": "md", "text": "可以用 **粗体** 和 *斜体*。"}])
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_send_falls_back_to_text_when_post_payload_is_rejected(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
captured = {"calls": []}
|
|
|
|
class _MessageAPI:
|
|
def create(self, request):
|
|
captured["calls"].append(request)
|
|
if len(captured["calls"]) == 1:
|
|
raise RuntimeError("content format of the post type is incorrect")
|
|
return SimpleNamespace(
|
|
success=lambda: True,
|
|
data=SimpleNamespace(message_id="om_plain"),
|
|
)
|
|
|
|
adapter._client = SimpleNamespace(
|
|
im=SimpleNamespace(
|
|
v1=SimpleNamespace(
|
|
message=_MessageAPI(),
|
|
)
|
|
)
|
|
)
|
|
|
|
async def _direct(func, *args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
|
|
result = asyncio.run(
|
|
adapter.send(
|
|
chat_id="oc_chat",
|
|
content="可以用 **粗体** 和 *斜体*。",
|
|
)
|
|
)
|
|
|
|
self.assertTrue(result.success)
|
|
self.assertEqual(captured["calls"][0].request_body.msg_type, "post")
|
|
self.assertEqual(captured["calls"][1].request_body.msg_type, "text")
|
|
self.assertEqual(
|
|
captured["calls"][1].request_body.content,
|
|
json.dumps({"text": "可以用 粗体 和 斜体。"}, ensure_ascii=False),
|
|
)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_send_falls_back_to_text_when_post_response_is_unsuccessful(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
captured = {"calls": []}
|
|
|
|
class _MessageAPI:
|
|
def create(self, request):
|
|
captured["calls"].append(request)
|
|
if len(captured["calls"]) == 1:
|
|
return SimpleNamespace(success=lambda: False, code=230001, msg="content format of the post type is incorrect")
|
|
return SimpleNamespace(
|
|
success=lambda: True,
|
|
data=SimpleNamespace(message_id="om_plain_response"),
|
|
)
|
|
|
|
adapter._client = SimpleNamespace(
|
|
im=SimpleNamespace(
|
|
v1=SimpleNamespace(
|
|
message=_MessageAPI(),
|
|
)
|
|
)
|
|
)
|
|
|
|
async def _direct(func, *args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
|
|
result = asyncio.run(
|
|
adapter.send(
|
|
chat_id="oc_chat",
|
|
content="可以用 **粗体** 和 *斜体*。",
|
|
)
|
|
)
|
|
|
|
self.assertTrue(result.success)
|
|
self.assertEqual(captured["calls"][0].request_body.msg_type, "post")
|
|
self.assertEqual(captured["calls"][1].request_body.msg_type, "text")
|
|
self.assertEqual(
|
|
captured["calls"][1].request_body.content,
|
|
json.dumps({"text": "可以用 粗体 和 斜体。"}, ensure_ascii=False),
|
|
)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_send_uses_post_for_advanced_markdown_lines(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
captured = {}
|
|
|
|
class _MessageAPI:
|
|
def create(self, request):
|
|
captured["request"] = request
|
|
return SimpleNamespace(
|
|
success=lambda: True,
|
|
data=SimpleNamespace(message_id="om_markdown_advanced"),
|
|
)
|
|
|
|
adapter._client = SimpleNamespace(
|
|
im=SimpleNamespace(
|
|
v1=SimpleNamespace(
|
|
message=_MessageAPI(),
|
|
)
|
|
)
|
|
)
|
|
|
|
async def _direct(func, *args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
|
|
result = asyncio.run(
|
|
adapter.send(
|
|
chat_id="oc_chat",
|
|
content="---\n1. 第一项\n<u>下划线</u>\n~~删除线~~",
|
|
)
|
|
)
|
|
|
|
self.assertTrue(result.success)
|
|
self.assertEqual(captured["request"].request_body.msg_type, "post")
|
|
payload = json.loads(captured["request"].request_body.content)
|
|
rows = payload["zh_cn"]["content"]
|
|
self.assertEqual(
|
|
rows,
|
|
[[{"tag": "md", "text": "---\n1. 第一项\n<u>下划线</u>\n~~删除线~~"}]],
|
|
)
|
|
|
|
|
|
@unittest.skipUnless(_HAS_LARK_OAPI, "lark-oapi not installed")
|
|
class TestWebhookSecurity(unittest.TestCase):
|
|
"""Tests for webhook signature verification, rate limiting, and body size limits."""
|
|
|
|
def _make_adapter(self, encrypt_key: str = "") -> "FeishuAdapter":
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
with patch.dict(os.environ, {"FEISHU_APP_ID": "cli", "FEISHU_APP_SECRET": "sec", "FEISHU_ENCRYPT_KEY": encrypt_key}, clear=True):
|
|
return FeishuAdapter(PlatformConfig())
|
|
|
|
def test_signature_valid_passes(self):
|
|
import hashlib
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
from gateway.config import PlatformConfig
|
|
|
|
encrypt_key = "test_secret"
|
|
adapter = self._make_adapter(encrypt_key)
|
|
body = b'{"type":"event"}'
|
|
timestamp = "1700000000"
|
|
nonce = "abc123"
|
|
content = f"{timestamp}{nonce}{encrypt_key}" + body.decode("utf-8")
|
|
sig = hashlib.sha256(content.encode("utf-8")).hexdigest()
|
|
headers = {"x-lark-request-timestamp": timestamp, "x-lark-request-nonce": nonce, "x-lark-signature": sig}
|
|
self.assertTrue(adapter._is_webhook_signature_valid(headers, body))
|
|
|
|
def test_signature_invalid_rejected(self):
|
|
adapter = self._make_adapter("test_secret")
|
|
headers = {
|
|
"x-lark-request-timestamp": "1700000000",
|
|
"x-lark-request-nonce": "abc",
|
|
"x-lark-signature": "deadbeef" * 8,
|
|
}
|
|
self.assertFalse(adapter._is_webhook_signature_valid(headers, b'{"type":"event"}'))
|
|
|
|
def test_signature_missing_headers_rejected(self):
|
|
adapter = self._make_adapter("test_secret")
|
|
self.assertFalse(adapter._is_webhook_signature_valid({}, b'{}'))
|
|
|
|
def test_rate_limit_allows_requests_within_window(self):
|
|
adapter = self._make_adapter()
|
|
for _ in range(5):
|
|
self.assertTrue(adapter._check_webhook_rate_limit("10.0.0.1"))
|
|
|
|
def test_rate_limit_blocks_after_exceeding_max(self):
|
|
from gateway.platforms.feishu import _FEISHU_WEBHOOK_RATE_LIMIT_MAX
|
|
adapter = self._make_adapter()
|
|
for _ in range(_FEISHU_WEBHOOK_RATE_LIMIT_MAX):
|
|
adapter._check_webhook_rate_limit("10.0.0.2")
|
|
self.assertFalse(adapter._check_webhook_rate_limit("10.0.0.2"))
|
|
|
|
def test_rate_limit_resets_after_window_expires(self):
|
|
from gateway.platforms.feishu import _FEISHU_WEBHOOK_RATE_LIMIT_MAX, _FEISHU_WEBHOOK_RATE_WINDOW_SECONDS
|
|
adapter = self._make_adapter()
|
|
ip = "10.0.0.3"
|
|
for _ in range(_FEISHU_WEBHOOK_RATE_LIMIT_MAX):
|
|
adapter._check_webhook_rate_limit(ip)
|
|
self.assertFalse(adapter._check_webhook_rate_limit(ip))
|
|
# Simulate window expiry by backdating the stored entry.
|
|
count, window_start = adapter._webhook_rate_counts[ip]
|
|
adapter._webhook_rate_counts[ip] = (count, window_start - _FEISHU_WEBHOOK_RATE_WINDOW_SECONDS - 1)
|
|
self.assertTrue(adapter._check_webhook_rate_limit(ip))
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_webhook_request_rejects_oversized_body(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter, _FEISHU_WEBHOOK_MAX_BODY_BYTES
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
# Simulate a request whose Content-Length already signals oversize.
|
|
request = SimpleNamespace(
|
|
remote="127.0.0.1",
|
|
content_length=_FEISHU_WEBHOOK_MAX_BODY_BYTES + 1,
|
|
)
|
|
response = asyncio.run(adapter._handle_webhook_request(request))
|
|
self.assertEqual(response.status, 413)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_webhook_request_rejects_invalid_json(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
request = SimpleNamespace(
|
|
remote="127.0.0.1",
|
|
content_length=None,
|
|
read=AsyncMock(return_value=b"not-json"),
|
|
)
|
|
response = asyncio.run(adapter._handle_webhook_request(request))
|
|
self.assertEqual(response.status, 400)
|
|
|
|
@patch.dict(os.environ, {"FEISHU_ENCRYPT_KEY": "secret"}, clear=True)
|
|
def test_webhook_request_rejects_bad_signature(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
body = json.dumps({"header": {"event_type": "im.message.receive_v1"}}).encode()
|
|
request = SimpleNamespace(
|
|
remote="127.0.0.1",
|
|
content_length=None,
|
|
headers={"x-lark-request-timestamp": "123", "x-lark-request-nonce": "abc", "x-lark-signature": "bad"},
|
|
read=AsyncMock(return_value=body),
|
|
)
|
|
response = asyncio.run(adapter._handle_webhook_request(request))
|
|
self.assertEqual(response.status, 401)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_webhook_url_verification_challenge_passes_without_signature(self):
|
|
"""Challenge requests must succeed even when no encrypt_key is set."""
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
body = json.dumps({"type": "url_verification", "challenge": "test_challenge_token"}).encode()
|
|
request = SimpleNamespace(
|
|
remote="127.0.0.1",
|
|
content_length=None,
|
|
read=AsyncMock(return_value=body),
|
|
)
|
|
response = asyncio.run(adapter._handle_webhook_request(request))
|
|
self.assertEqual(response.status, 200)
|
|
self.assertIn(b"test_challenge_token", response.body)
|
|
|
|
|
|
class TestDedupTTL(unittest.TestCase):
|
|
"""Tests for TTL-aware deduplication."""
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_duplicate_within_ttl_is_rejected(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
with patch.object(adapter, "_persist_seen_message_ids"):
|
|
adapter._seen_message_ids = {"om_dup": time.time()}
|
|
adapter._seen_message_order = ["om_dup"]
|
|
self.assertTrue(adapter._is_duplicate("om_dup"))
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_expired_entry_is_not_considered_duplicate(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter, _FEISHU_DEDUP_TTL_SECONDS
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
# Plant an entry that expired well past the TTL.
|
|
stale_ts = time.time() - _FEISHU_DEDUP_TTL_SECONDS - 60
|
|
adapter._seen_message_ids = {"om_old": stale_ts}
|
|
adapter._seen_message_order = ["om_old"]
|
|
with patch.object(adapter, "_persist_seen_message_ids"):
|
|
self.assertFalse(adapter._is_duplicate("om_old"))
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_persist_saves_timestamps_as_dict(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
ts = time.time()
|
|
adapter._seen_message_ids = {"om_ts1": ts}
|
|
adapter._seen_message_order = ["om_ts1"]
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
adapter._dedup_state_path = Path(tmpdir) / "dedup.json"
|
|
adapter._persist_seen_message_ids()
|
|
saved = json.loads(adapter._dedup_state_path.read_text())
|
|
self.assertIsInstance(saved["message_ids"], dict)
|
|
self.assertAlmostEqual(saved["message_ids"]["om_ts1"], ts, places=1)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_load_backward_compat_list_format(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
path = Path(tmpdir) / "dedup.json"
|
|
path.write_text(json.dumps({"message_ids": ["om_a", "om_b"]}), encoding="utf-8")
|
|
adapter._dedup_state_path = path
|
|
adapter._load_seen_message_ids()
|
|
self.assertIn("om_a", adapter._seen_message_ids)
|
|
self.assertIn("om_b", adapter._seen_message_ids)
|
|
|
|
|
|
class TestGroupMentionAtAll(unittest.TestCase):
|
|
"""Tests for @_all (Feishu @everyone) group mention routing."""
|
|
|
|
@patch.dict(os.environ, {"FEISHU_GROUP_POLICY": "open"}, clear=True)
|
|
def test_at_all_in_content_accepts_without_explicit_bot_mention(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
message = SimpleNamespace(
|
|
content='{"text":"@_all 请注意"}',
|
|
mentions=[],
|
|
)
|
|
sender_id = SimpleNamespace(open_id="ou_any", user_id=None)
|
|
self.assertTrue(adapter._should_accept_group_message(message, sender_id))
|
|
|
|
@patch.dict(os.environ, {"FEISHU_GROUP_POLICY": "allowlist", "FEISHU_ALLOWED_USERS": "ou_allowed"}, clear=True)
|
|
def test_at_all_still_requires_policy_gate(self):
|
|
"""@_all bypasses mention gating but NOT the allowlist policy."""
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
message = SimpleNamespace(content='{"text":"@_all attention"}', mentions=[])
|
|
# Non-allowlisted user — should be blocked even with @_all.
|
|
blocked_sender = SimpleNamespace(open_id="ou_blocked", user_id=None)
|
|
self.assertFalse(adapter._should_accept_group_message(message, blocked_sender))
|
|
# Allowlisted user — should pass.
|
|
allowed_sender = SimpleNamespace(open_id="ou_allowed", user_id=None)
|
|
self.assertTrue(adapter._should_accept_group_message(message, allowed_sender))
|
|
|
|
|
|
@unittest.skipUnless(_HAS_LARK_OAPI, "lark-oapi not installed")
|
|
class TestSenderNameResolution(unittest.TestCase):
|
|
"""Tests for _resolve_sender_name_from_api."""
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_returns_none_when_client_is_none(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
adapter._client = None
|
|
result = asyncio.run(adapter._resolve_sender_name_from_api("ou_abc"))
|
|
self.assertIsNone(result)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_returns_cached_name_within_ttl(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
adapter._client = SimpleNamespace()
|
|
future_expire = time.time() + 600
|
|
adapter._sender_name_cache["ou_cached"] = ("Alice", future_expire)
|
|
result = asyncio.run(adapter._resolve_sender_name_from_api("ou_cached"))
|
|
self.assertEqual(result, "Alice")
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_fetches_and_caches_name_from_api(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
user_obj = SimpleNamespace(name="Bob", display_name=None, nickname=None, en_name=None)
|
|
mock_response = SimpleNamespace(
|
|
success=lambda: True,
|
|
data=SimpleNamespace(user=user_obj),
|
|
)
|
|
|
|
async def _direct(func, *args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
class _ContactAPI:
|
|
def get(self, request):
|
|
return mock_response
|
|
|
|
adapter._client = SimpleNamespace(
|
|
contact=SimpleNamespace(v3=SimpleNamespace(user=_ContactAPI()))
|
|
)
|
|
|
|
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
|
|
result = asyncio.run(adapter._resolve_sender_name_from_api("ou_bob"))
|
|
|
|
self.assertEqual(result, "Bob")
|
|
self.assertIn("ou_bob", adapter._sender_name_cache)
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_expired_cache_triggers_new_api_call(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
# Expired cache entry.
|
|
adapter._sender_name_cache["ou_expired"] = ("OldName", time.time() - 1)
|
|
|
|
async def _direct(func, *args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
user_obj = SimpleNamespace(name="NewName", display_name=None, nickname=None, en_name=None)
|
|
|
|
class _ContactAPI:
|
|
def get(self, request):
|
|
return SimpleNamespace(success=lambda: True, data=SimpleNamespace(user=user_obj))
|
|
|
|
adapter._client = SimpleNamespace(
|
|
contact=SimpleNamespace(v3=SimpleNamespace(user=_ContactAPI()))
|
|
)
|
|
|
|
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
|
|
result = asyncio.run(adapter._resolve_sender_name_from_api("ou_expired"))
|
|
|
|
self.assertEqual(result, "NewName")
|
|
|
|
@patch.dict(os.environ, {}, clear=True)
|
|
def test_api_failure_returns_none_without_raising(self):
|
|
from gateway.config import PlatformConfig
|
|
from gateway.platforms.feishu import FeishuAdapter
|
|
|
|
adapter = FeishuAdapter(PlatformConfig())
|
|
|
|
class _BrokenContactAPI:
|
|
def get(self, _request):
|
|
raise RuntimeError("API down")
|
|
|
|
adapter._client = SimpleNamespace(
|
|
contact=SimpleNamespace(v3=SimpleNamespace(user=_BrokenContactAPI()))
|
|
)
|
|
|
|
async def _direct(func, *args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
|
|
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
|
|
result = asyncio.run(adapter._resolve_sender_name_from_api("ou_broken"))
|
|
|
|
self.assertIsNone(result)
|