Add DingTalk as a messaging platform using the dingtalk-stream SDK
for real-time message reception via Stream Mode (no webhook needed).
Replies are sent via session webhook using markdown format.
Features:
- Stream Mode connection (long-lived WebSocket, no public URL needed)
- Text and rich text message support
- DM and group chat support
- Message deduplication with 5-minute window
- Auto-reconnection with exponential backoff
- Session webhook caching for reply routing
Configuration:
export DINGTALK_CLIENT_ID=your-app-key
export DINGTALK_CLIENT_SECRET=your-app-secret
# or in config.yaml:
platforms:
dingtalk:
enabled: true
extra:
client_id: your-app-key
client_secret: your-app-secret
Files:
- gateway/platforms/dingtalk.py (340 lines) — adapter implementation
- gateway/config.py — add DINGTALK to Platform enum
- gateway/run.py — add DingTalk to _create_adapter
- hermes_cli/config.py — add env vars to _EXTRA_ENV_KEYS
- hermes_cli/tools_config.py — add dingtalk to PLATFORMS
- tests/gateway/test_dingtalk.py — 21 tests
275 lines
10 KiB
Python
275 lines
10 KiB
Python
"""Tests for DingTalk platform adapter."""
|
|
import asyncio
|
|
import json
|
|
from datetime import datetime, timezone
|
|
from unittest.mock import AsyncMock, MagicMock, patch, PropertyMock
|
|
|
|
import pytest
|
|
|
|
from gateway.config import Platform, PlatformConfig
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Requirements check
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestDingTalkRequirements:
|
|
|
|
def test_returns_false_when_sdk_missing(self, monkeypatch):
|
|
with patch.dict("sys.modules", {"dingtalk_stream": None}):
|
|
monkeypatch.setattr(
|
|
"gateway.platforms.dingtalk.DINGTALK_STREAM_AVAILABLE", False
|
|
)
|
|
from gateway.platforms.dingtalk import check_dingtalk_requirements
|
|
assert check_dingtalk_requirements() is False
|
|
|
|
def test_returns_false_when_env_vars_missing(self, monkeypatch):
|
|
monkeypatch.setattr(
|
|
"gateway.platforms.dingtalk.DINGTALK_STREAM_AVAILABLE", True
|
|
)
|
|
monkeypatch.setattr("gateway.platforms.dingtalk.HTTPX_AVAILABLE", True)
|
|
monkeypatch.delenv("DINGTALK_CLIENT_ID", raising=False)
|
|
monkeypatch.delenv("DINGTALK_CLIENT_SECRET", raising=False)
|
|
from gateway.platforms.dingtalk import check_dingtalk_requirements
|
|
assert check_dingtalk_requirements() is False
|
|
|
|
def test_returns_true_when_all_available(self, monkeypatch):
|
|
monkeypatch.setattr(
|
|
"gateway.platforms.dingtalk.DINGTALK_STREAM_AVAILABLE", True
|
|
)
|
|
monkeypatch.setattr("gateway.platforms.dingtalk.HTTPX_AVAILABLE", True)
|
|
monkeypatch.setenv("DINGTALK_CLIENT_ID", "test-id")
|
|
monkeypatch.setenv("DINGTALK_CLIENT_SECRET", "test-secret")
|
|
from gateway.platforms.dingtalk import check_dingtalk_requirements
|
|
assert check_dingtalk_requirements() is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Adapter construction
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestDingTalkAdapterInit:
|
|
|
|
def test_reads_config_from_extra(self):
|
|
from gateway.platforms.dingtalk import DingTalkAdapter
|
|
config = PlatformConfig(
|
|
enabled=True,
|
|
extra={"client_id": "cfg-id", "client_secret": "cfg-secret"},
|
|
)
|
|
adapter = DingTalkAdapter(config)
|
|
assert adapter._client_id == "cfg-id"
|
|
assert adapter._client_secret == "cfg-secret"
|
|
assert adapter.name == "Dingtalk" # base class uses .title()
|
|
|
|
def test_falls_back_to_env_vars(self, monkeypatch):
|
|
monkeypatch.setenv("DINGTALK_CLIENT_ID", "env-id")
|
|
monkeypatch.setenv("DINGTALK_CLIENT_SECRET", "env-secret")
|
|
from gateway.platforms.dingtalk import DingTalkAdapter
|
|
config = PlatformConfig(enabled=True)
|
|
adapter = DingTalkAdapter(config)
|
|
assert adapter._client_id == "env-id"
|
|
assert adapter._client_secret == "env-secret"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Message text extraction
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestExtractText:
|
|
|
|
def test_extracts_dict_text(self):
|
|
from gateway.platforms.dingtalk import DingTalkAdapter
|
|
msg = MagicMock()
|
|
msg.text = {"content": " hello world "}
|
|
msg.rich_text = None
|
|
assert DingTalkAdapter._extract_text(msg) == "hello world"
|
|
|
|
def test_extracts_string_text(self):
|
|
from gateway.platforms.dingtalk import DingTalkAdapter
|
|
msg = MagicMock()
|
|
msg.text = "plain text"
|
|
msg.rich_text = None
|
|
assert DingTalkAdapter._extract_text(msg) == "plain text"
|
|
|
|
def test_falls_back_to_rich_text(self):
|
|
from gateway.platforms.dingtalk import DingTalkAdapter
|
|
msg = MagicMock()
|
|
msg.text = ""
|
|
msg.rich_text = [{"text": "part1"}, {"text": "part2"}, {"image": "url"}]
|
|
assert DingTalkAdapter._extract_text(msg) == "part1 part2"
|
|
|
|
def test_returns_empty_for_no_content(self):
|
|
from gateway.platforms.dingtalk import DingTalkAdapter
|
|
msg = MagicMock()
|
|
msg.text = ""
|
|
msg.rich_text = None
|
|
assert DingTalkAdapter._extract_text(msg) == ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Deduplication
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestDeduplication:
|
|
|
|
def test_first_message_not_duplicate(self):
|
|
from gateway.platforms.dingtalk import DingTalkAdapter
|
|
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
|
|
assert adapter._is_duplicate("msg-1") is False
|
|
|
|
def test_second_same_message_is_duplicate(self):
|
|
from gateway.platforms.dingtalk import DingTalkAdapter
|
|
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
|
|
adapter._is_duplicate("msg-1")
|
|
assert adapter._is_duplicate("msg-1") is True
|
|
|
|
def test_different_messages_not_duplicate(self):
|
|
from gateway.platforms.dingtalk import DingTalkAdapter
|
|
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
|
|
adapter._is_duplicate("msg-1")
|
|
assert adapter._is_duplicate("msg-2") is False
|
|
|
|
def test_cache_cleanup_on_overflow(self):
|
|
from gateway.platforms.dingtalk import DingTalkAdapter, DEDUP_MAX_SIZE
|
|
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
|
|
# Fill beyond max
|
|
for i in range(DEDUP_MAX_SIZE + 10):
|
|
adapter._is_duplicate(f"msg-{i}")
|
|
# Cache should have been pruned
|
|
assert len(adapter._seen_messages) <= DEDUP_MAX_SIZE + 10
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Send
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestSend:
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_posts_to_webhook(self):
|
|
from gateway.platforms.dingtalk import DingTalkAdapter
|
|
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
|
|
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 200
|
|
mock_response.text = "OK"
|
|
|
|
mock_client = AsyncMock()
|
|
mock_client.post = AsyncMock(return_value=mock_response)
|
|
adapter._http_client = mock_client
|
|
|
|
result = await adapter.send(
|
|
"chat-123", "Hello!",
|
|
metadata={"session_webhook": "https://dingtalk.example/webhook"}
|
|
)
|
|
assert result.success is True
|
|
mock_client.post.assert_called_once()
|
|
call_args = mock_client.post.call_args
|
|
assert call_args[0][0] == "https://dingtalk.example/webhook"
|
|
payload = call_args[1]["json"]
|
|
assert payload["msgtype"] == "markdown"
|
|
assert payload["markdown"]["title"] == "Hermes"
|
|
assert payload["markdown"]["text"] == "Hello!"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_fails_without_webhook(self):
|
|
from gateway.platforms.dingtalk import DingTalkAdapter
|
|
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
|
|
adapter._http_client = AsyncMock()
|
|
|
|
result = await adapter.send("chat-123", "Hello!")
|
|
assert result.success is False
|
|
assert "session_webhook" in result.error
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_uses_cached_webhook(self):
|
|
from gateway.platforms.dingtalk import DingTalkAdapter
|
|
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
|
|
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 200
|
|
mock_client = AsyncMock()
|
|
mock_client.post = AsyncMock(return_value=mock_response)
|
|
adapter._http_client = mock_client
|
|
adapter._session_webhooks["chat-123"] = "https://cached.example/webhook"
|
|
|
|
result = await adapter.send("chat-123", "Hello!")
|
|
assert result.success is True
|
|
assert mock_client.post.call_args[0][0] == "https://cached.example/webhook"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_handles_http_error(self):
|
|
from gateway.platforms.dingtalk import DingTalkAdapter
|
|
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
|
|
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 400
|
|
mock_response.text = "Bad Request"
|
|
mock_client = AsyncMock()
|
|
mock_client.post = AsyncMock(return_value=mock_response)
|
|
adapter._http_client = mock_client
|
|
|
|
result = await adapter.send(
|
|
"chat-123", "Hello!",
|
|
metadata={"session_webhook": "https://example/webhook"}
|
|
)
|
|
assert result.success is False
|
|
assert "400" in result.error
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Connect / disconnect
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestConnect:
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_connect_fails_without_sdk(self, monkeypatch):
|
|
monkeypatch.setattr(
|
|
"gateway.platforms.dingtalk.DINGTALK_STREAM_AVAILABLE", False
|
|
)
|
|
from gateway.platforms.dingtalk import DingTalkAdapter
|
|
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
|
|
result = await adapter.connect()
|
|
assert result is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_connect_fails_without_credentials(self):
|
|
from gateway.platforms.dingtalk import DingTalkAdapter
|
|
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
|
|
adapter._client_id = ""
|
|
adapter._client_secret = ""
|
|
result = await adapter.connect()
|
|
assert result is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_disconnect_cleans_up(self):
|
|
from gateway.platforms.dingtalk import DingTalkAdapter
|
|
adapter = DingTalkAdapter(PlatformConfig(enabled=True))
|
|
adapter._session_webhooks["a"] = "http://x"
|
|
adapter._seen_messages["b"] = 1.0
|
|
adapter._http_client = AsyncMock()
|
|
adapter._stream_task = None
|
|
|
|
await adapter.disconnect()
|
|
assert len(adapter._session_webhooks) == 0
|
|
assert len(adapter._seen_messages) == 0
|
|
assert adapter._http_client is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Platform enum
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPlatformEnum:
|
|
|
|
def test_dingtalk_in_platform_enum(self):
|
|
assert Platform.DINGTALK.value == "dingtalk"
|