* feat(telegram): auto-discover fallback IPs via DoH when api.telegram.org is unreachable On some networks (university, corporate), api.telegram.org resolves to a valid Telegram IP that is unreachable due to routing/firewall rules. A different IP in the same Telegram-owned 149.154.160.0/20 block works fine. This adds automatic fallback IP discovery at connect time: 1. Query Google and Cloudflare DNS-over-HTTPS for api.telegram.org A records 2. Exclude the system-DNS IP (the unreachable one), use the rest as fallbacks 3. If DoH is also blocked, fall back to a seed list (149.154.167.220) 4. TelegramFallbackTransport tries primary first, sticks to whichever works No configuration needed — works automatically. TELEGRAM_FALLBACK_IPS env var still available as manual override. Zero impact on healthy networks (primary path succeeds on first attempt, fallback never exercised). No new dependencies (uses httpx already in deps + stdlib socket). * fix: share transport instance and downgrade seed fallback log to info - Use single TelegramFallbackTransport shared between request and get_updates_request so sticky IP is shared across polling and API calls - Keep separate HTTPXRequest instances (different timeout settings) - Downgrade "using seed fallback IPs" from warning to info to avoid noisy logs on healthy networks * fix: add telegram.request mock and discovery fixture to remaining test files The original PR missed test_dm_topics.py and test_telegram_network_reconnect.py — both need the telegram.request mock module. The reconnect test also needs _no_auto_discovery since _handle_polling_network_error calls connect() which now invokes discover_fallback_ips(). --------- Co-authored-by: Mohan Qiao <Gavin-Qiao@users.noreply.github.com>
695 lines
26 KiB
Python
695 lines
26 KiB
Python
"""
|
|
Tests for Telegram document handling in gateway/platforms/telegram.py.
|
|
|
|
Covers: document type detection, download/cache flow, size limits,
|
|
text injection, error handling.
|
|
|
|
Note: python-telegram-bot may not be installed in the test environment.
|
|
We mock the telegram module at import time to avoid collection errors.
|
|
"""
|
|
|
|
import asyncio
|
|
import importlib
|
|
import os
|
|
import sys
|
|
from types import SimpleNamespace
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from gateway.config import Platform, PlatformConfig
|
|
from gateway.platforms.base import (
|
|
MessageEvent,
|
|
MessageType,
|
|
SendResult,
|
|
SUPPORTED_DOCUMENT_TYPES,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Mock the telegram package if it's not installed
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _ensure_telegram_mock():
|
|
"""Install mock telegram modules so TelegramAdapter can be imported."""
|
|
if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"):
|
|
# Real library is installed — no mocking needed
|
|
return
|
|
|
|
telegram_mod = MagicMock()
|
|
# ContextTypes needs DEFAULT_TYPE as an actual attribute for the annotation
|
|
telegram_mod.ext.ContextTypes.DEFAULT_TYPE = type(None)
|
|
telegram_mod.constants.ParseMode.MARKDOWN_V2 = "MarkdownV2"
|
|
telegram_mod.constants.ChatType.GROUP = "group"
|
|
telegram_mod.constants.ChatType.SUPERGROUP = "supergroup"
|
|
telegram_mod.constants.ChatType.CHANNEL = "channel"
|
|
telegram_mod.constants.ChatType.PRIVATE = "private"
|
|
|
|
for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"):
|
|
sys.modules.setdefault(name, telegram_mod)
|
|
|
|
|
|
_ensure_telegram_mock()
|
|
|
|
# Now we can safely import
|
|
from gateway.platforms.telegram import TelegramAdapter # noqa: E402
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers to build mock Telegram objects
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _make_file_obj(data: bytes = b"hello"):
|
|
"""Create a mock Telegram File with download_as_bytearray."""
|
|
f = AsyncMock()
|
|
f.download_as_bytearray = AsyncMock(return_value=bytearray(data))
|
|
f.file_path = "documents/file.pdf"
|
|
return f
|
|
|
|
|
|
def _make_document(
|
|
file_name="report.pdf",
|
|
mime_type="application/pdf",
|
|
file_size=1024,
|
|
file_obj=None,
|
|
):
|
|
"""Create a mock Telegram Document object."""
|
|
doc = MagicMock()
|
|
doc.file_name = file_name
|
|
doc.mime_type = mime_type
|
|
doc.file_size = file_size
|
|
doc.get_file = AsyncMock(return_value=file_obj or _make_file_obj())
|
|
return doc
|
|
|
|
|
|
def _make_message(document=None, caption=None, media_group_id=None, photo=None):
|
|
"""Build a mock Telegram Message with the given document/photo."""
|
|
msg = MagicMock()
|
|
msg.message_id = 42
|
|
msg.text = caption or ""
|
|
msg.caption = caption
|
|
msg.date = None
|
|
# Media flags — all None except explicit payload
|
|
msg.photo = photo
|
|
msg.video = None
|
|
msg.audio = None
|
|
msg.voice = None
|
|
msg.sticker = None
|
|
msg.document = document
|
|
msg.media_group_id = media_group_id
|
|
# Chat / user
|
|
msg.chat = MagicMock()
|
|
msg.chat.id = 100
|
|
msg.chat.type = "private"
|
|
msg.chat.title = None
|
|
msg.chat.full_name = "Test User"
|
|
msg.from_user = MagicMock()
|
|
msg.from_user.id = 1
|
|
msg.from_user.full_name = "Test User"
|
|
msg.message_thread_id = None
|
|
return msg
|
|
|
|
|
|
def _make_update(msg):
|
|
"""Wrap a message in a mock Update."""
|
|
update = MagicMock()
|
|
update.message = msg
|
|
return update
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.fixture()
|
|
def adapter():
|
|
config = PlatformConfig(enabled=True, token="fake-token")
|
|
a = TelegramAdapter(config)
|
|
# Capture events instead of processing them
|
|
a.handle_message = AsyncMock()
|
|
return a
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _redirect_cache(tmp_path, monkeypatch):
|
|
"""Point document cache to tmp_path so tests don't touch ~/.hermes."""
|
|
monkeypatch.setattr(
|
|
"gateway.platforms.base.DOCUMENT_CACHE_DIR", tmp_path / "doc_cache"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestDocumentTypeDetection
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDocumentTypeDetection:
|
|
@pytest.mark.asyncio
|
|
async def test_document_detected_explicitly(self, adapter):
|
|
doc = _make_document()
|
|
msg = _make_message(document=doc)
|
|
update = _make_update(msg)
|
|
await adapter._handle_media_message(update, MagicMock())
|
|
event = adapter.handle_message.call_args[0][0]
|
|
assert event.message_type == MessageType.DOCUMENT
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fallback_is_document(self, adapter):
|
|
"""When no specific media attr is set, message_type defaults to DOCUMENT."""
|
|
msg = _make_message()
|
|
msg.document = None # no media at all
|
|
update = _make_update(msg)
|
|
await adapter._handle_media_message(update, MagicMock())
|
|
event = adapter.handle_message.call_args[0][0]
|
|
assert event.message_type == MessageType.DOCUMENT
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestDocumentDownloadBlock
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _make_photo(file_obj=None):
|
|
photo = MagicMock()
|
|
photo.get_file = AsyncMock(return_value=file_obj or _make_file_obj(b"photo-bytes"))
|
|
return photo
|
|
|
|
|
|
class TestDocumentDownloadBlock:
|
|
@pytest.mark.asyncio
|
|
async def test_supported_pdf_is_cached(self, adapter):
|
|
pdf_bytes = b"%PDF-1.4 fake"
|
|
file_obj = _make_file_obj(pdf_bytes)
|
|
doc = _make_document(file_name="report.pdf", file_size=1024, file_obj=file_obj)
|
|
msg = _make_message(document=doc)
|
|
update = _make_update(msg)
|
|
|
|
await adapter._handle_media_message(update, MagicMock())
|
|
event = adapter.handle_message.call_args[0][0]
|
|
assert len(event.media_urls) == 1
|
|
assert os.path.exists(event.media_urls[0])
|
|
assert event.media_types == ["application/pdf"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_supported_txt_injects_content(self, adapter):
|
|
content = b"Hello from a text file"
|
|
file_obj = _make_file_obj(content)
|
|
doc = _make_document(
|
|
file_name="notes.txt", mime_type="text/plain",
|
|
file_size=len(content), file_obj=file_obj,
|
|
)
|
|
msg = _make_message(document=doc)
|
|
update = _make_update(msg)
|
|
|
|
await adapter._handle_media_message(update, MagicMock())
|
|
event = adapter.handle_message.call_args[0][0]
|
|
assert "Hello from a text file" in event.text
|
|
assert "[Content of notes.txt]" in event.text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_supported_md_injects_content(self, adapter):
|
|
content = b"# Title\nSome markdown"
|
|
file_obj = _make_file_obj(content)
|
|
doc = _make_document(
|
|
file_name="readme.md", mime_type="text/markdown",
|
|
file_size=len(content), file_obj=file_obj,
|
|
)
|
|
msg = _make_message(document=doc)
|
|
update = _make_update(msg)
|
|
|
|
await adapter._handle_media_message(update, MagicMock())
|
|
event = adapter.handle_message.call_args[0][0]
|
|
assert "# Title" in event.text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_caption_preserved_with_injection(self, adapter):
|
|
content = b"file text"
|
|
file_obj = _make_file_obj(content)
|
|
doc = _make_document(
|
|
file_name="doc.txt", mime_type="text/plain",
|
|
file_size=len(content), file_obj=file_obj,
|
|
)
|
|
msg = _make_message(document=doc, caption="Please summarize")
|
|
update = _make_update(msg)
|
|
|
|
await adapter._handle_media_message(update, MagicMock())
|
|
event = adapter.handle_message.call_args[0][0]
|
|
assert "file text" in event.text
|
|
assert "Please summarize" in event.text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unsupported_type_rejected(self, adapter):
|
|
doc = _make_document(file_name="archive.zip", mime_type="application/zip", file_size=100)
|
|
msg = _make_message(document=doc)
|
|
update = _make_update(msg)
|
|
|
|
await adapter._handle_media_message(update, MagicMock())
|
|
event = adapter.handle_message.call_args[0][0]
|
|
assert "Unsupported document type" in event.text
|
|
assert ".zip" in event.text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_oversized_file_rejected(self, adapter):
|
|
doc = _make_document(file_name="huge.pdf", file_size=25 * 1024 * 1024)
|
|
msg = _make_message(document=doc)
|
|
update = _make_update(msg)
|
|
|
|
await adapter._handle_media_message(update, MagicMock())
|
|
event = adapter.handle_message.call_args[0][0]
|
|
assert "too large" in event.text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_none_file_size_rejected(self, adapter):
|
|
"""Security fix: file_size=None must be rejected (not silently allowed)."""
|
|
doc = _make_document(file_name="tricky.pdf", file_size=None)
|
|
msg = _make_message(document=doc)
|
|
update = _make_update(msg)
|
|
|
|
await adapter._handle_media_message(update, MagicMock())
|
|
event = adapter.handle_message.call_args[0][0]
|
|
assert "too large" in event.text or "could not be verified" in event.text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_missing_filename_uses_mime_lookup(self, adapter):
|
|
"""No file_name but valid mime_type should resolve to extension."""
|
|
content = b"some pdf bytes"
|
|
file_obj = _make_file_obj(content)
|
|
doc = _make_document(
|
|
file_name=None, mime_type="application/pdf",
|
|
file_size=len(content), file_obj=file_obj,
|
|
)
|
|
msg = _make_message(document=doc)
|
|
update = _make_update(msg)
|
|
|
|
await adapter._handle_media_message(update, MagicMock())
|
|
event = adapter.handle_message.call_args[0][0]
|
|
assert len(event.media_urls) == 1
|
|
assert event.media_types == ["application/pdf"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_missing_filename_and_mime_rejected(self, adapter):
|
|
doc = _make_document(file_name=None, mime_type=None, file_size=100)
|
|
msg = _make_message(document=doc)
|
|
update = _make_update(msg)
|
|
|
|
await adapter._handle_media_message(update, MagicMock())
|
|
event = adapter.handle_message.call_args[0][0]
|
|
assert "Unsupported" in event.text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unicode_decode_error_handled(self, adapter):
|
|
"""Binary bytes that aren't valid UTF-8 in a .txt — content not injected but file still cached."""
|
|
binary = bytes(range(128, 256)) # not valid UTF-8
|
|
file_obj = _make_file_obj(binary)
|
|
doc = _make_document(
|
|
file_name="binary.txt", mime_type="text/plain",
|
|
file_size=len(binary), file_obj=file_obj,
|
|
)
|
|
msg = _make_message(document=doc)
|
|
update = _make_update(msg)
|
|
|
|
await adapter._handle_media_message(update, MagicMock())
|
|
event = adapter.handle_message.call_args[0][0]
|
|
# File should still be cached
|
|
assert len(event.media_urls) == 1
|
|
assert os.path.exists(event.media_urls[0])
|
|
# Content NOT injected — text should be empty (no caption set)
|
|
assert "[Content of" not in (event.text or "")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_text_injection_capped(self, adapter):
|
|
"""A .txt file over 100 KB should NOT have its content injected."""
|
|
large = b"x" * (200 * 1024) # 200 KB
|
|
file_obj = _make_file_obj(large)
|
|
doc = _make_document(
|
|
file_name="big.txt", mime_type="text/plain",
|
|
file_size=len(large), file_obj=file_obj,
|
|
)
|
|
msg = _make_message(document=doc)
|
|
update = _make_update(msg)
|
|
|
|
await adapter._handle_media_message(update, MagicMock())
|
|
event = adapter.handle_message.call_args[0][0]
|
|
# File should be cached
|
|
assert len(event.media_urls) == 1
|
|
# Content should NOT be injected
|
|
assert "[Content of" not in (event.text or "")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_download_exception_handled(self, adapter):
|
|
"""If get_file() raises, the handler logs the error without crashing."""
|
|
doc = _make_document(file_name="crash.pdf", file_size=100)
|
|
doc.get_file = AsyncMock(side_effect=RuntimeError("Telegram API down"))
|
|
msg = _make_message(document=doc)
|
|
update = _make_update(msg)
|
|
|
|
# Should not raise
|
|
await adapter._handle_media_message(update, MagicMock())
|
|
# handle_message should still be called (the handler catches the exception)
|
|
adapter.handle_message.assert_called_once()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestMediaGroups — media group (album) buffering
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestMediaGroups:
|
|
@pytest.mark.asyncio
|
|
async def test_non_album_photo_burst_is_buffered_and_combined(self, adapter):
|
|
first_photo = _make_photo(_make_file_obj(b"first"))
|
|
second_photo = _make_photo(_make_file_obj(b"second"))
|
|
|
|
msg1 = _make_message(caption="two images", photo=[first_photo])
|
|
msg2 = _make_message(photo=[second_photo])
|
|
|
|
with patch("gateway.platforms.telegram.cache_image_from_bytes", side_effect=["/tmp/burst-one.jpg", "/tmp/burst-two.jpg"]):
|
|
await adapter._handle_media_message(_make_update(msg1), MagicMock())
|
|
await adapter._handle_media_message(_make_update(msg2), MagicMock())
|
|
assert adapter.handle_message.await_count == 0
|
|
await asyncio.sleep(adapter.MEDIA_GROUP_WAIT_SECONDS + 0.05)
|
|
|
|
adapter.handle_message.assert_awaited_once()
|
|
event = adapter.handle_message.await_args.args[0]
|
|
assert event.text == "two images"
|
|
assert event.media_urls == ["/tmp/burst-one.jpg", "/tmp/burst-two.jpg"]
|
|
assert len(event.media_types) == 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_photo_album_is_buffered_and_combined(self, adapter):
|
|
first_photo = _make_photo(_make_file_obj(b"first"))
|
|
second_photo = _make_photo(_make_file_obj(b"second"))
|
|
|
|
msg1 = _make_message(caption="two images", media_group_id="album-1", photo=[first_photo])
|
|
msg2 = _make_message(media_group_id="album-1", photo=[second_photo])
|
|
|
|
with patch("gateway.platforms.telegram.cache_image_from_bytes", side_effect=["/tmp/one.jpg", "/tmp/two.jpg"]):
|
|
await adapter._handle_media_message(_make_update(msg1), MagicMock())
|
|
await adapter._handle_media_message(_make_update(msg2), MagicMock())
|
|
assert adapter.handle_message.await_count == 0
|
|
await asyncio.sleep(adapter.MEDIA_GROUP_WAIT_SECONDS + 0.05)
|
|
|
|
adapter.handle_message.assert_awaited_once()
|
|
event = adapter.handle_message.call_args[0][0]
|
|
assert event.text == "two images"
|
|
assert event.media_urls == ["/tmp/one.jpg", "/tmp/two.jpg"]
|
|
assert len(event.media_types) == 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_disconnect_cancels_pending_media_group_flush(self, adapter):
|
|
first_photo = _make_photo(_make_file_obj(b"first"))
|
|
msg = _make_message(caption="two images", media_group_id="album-2", photo=[first_photo])
|
|
|
|
with patch("gateway.platforms.telegram.cache_image_from_bytes", return_value="/tmp/one.jpg"):
|
|
await adapter._handle_media_message(_make_update(msg), MagicMock())
|
|
|
|
assert "album-2" in adapter._media_group_events
|
|
assert "album-2" in adapter._media_group_tasks
|
|
|
|
await adapter.disconnect()
|
|
await asyncio.sleep(adapter.MEDIA_GROUP_WAIT_SECONDS + 0.05)
|
|
|
|
assert adapter._media_group_events == {}
|
|
assert adapter._media_group_tasks == {}
|
|
adapter.handle_message.assert_not_awaited()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestSendDocument — outbound file attachment delivery
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestSendDocument:
|
|
"""Tests for TelegramAdapter.send_document() — sending files to users."""
|
|
|
|
@pytest.fixture()
|
|
def connected_adapter(self, adapter):
|
|
"""Adapter with a mock bot attached."""
|
|
bot = AsyncMock()
|
|
adapter._bot = bot
|
|
return adapter
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_document_success(self, connected_adapter, tmp_path):
|
|
"""A local file is sent via bot.send_document and returns success."""
|
|
# Create a real temp file
|
|
test_file = tmp_path / "report.pdf"
|
|
test_file.write_bytes(b"%PDF-1.4 fake content")
|
|
|
|
mock_msg = MagicMock()
|
|
mock_msg.message_id = 99
|
|
connected_adapter._bot.send_document = AsyncMock(return_value=mock_msg)
|
|
|
|
result = await connected_adapter.send_document(
|
|
chat_id="12345",
|
|
file_path=str(test_file),
|
|
caption="Here's the report",
|
|
)
|
|
|
|
assert result.success is True
|
|
assert result.message_id == "99"
|
|
connected_adapter._bot.send_document.assert_called_once()
|
|
call_kwargs = connected_adapter._bot.send_document.call_args[1]
|
|
assert call_kwargs["chat_id"] == 12345
|
|
assert call_kwargs["filename"] == "report.pdf"
|
|
assert call_kwargs["caption"] == "Here's the report"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_document_custom_filename(self, connected_adapter, tmp_path):
|
|
"""The file_name parameter overrides the basename for display."""
|
|
test_file = tmp_path / "doc_abc123_ugly.csv"
|
|
test_file.write_bytes(b"a,b,c\n1,2,3")
|
|
|
|
mock_msg = MagicMock()
|
|
mock_msg.message_id = 100
|
|
connected_adapter._bot.send_document = AsyncMock(return_value=mock_msg)
|
|
|
|
result = await connected_adapter.send_document(
|
|
chat_id="12345",
|
|
file_path=str(test_file),
|
|
file_name="clean_data.csv",
|
|
)
|
|
|
|
assert result.success is True
|
|
call_kwargs = connected_adapter._bot.send_document.call_args[1]
|
|
assert call_kwargs["filename"] == "clean_data.csv"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_document_file_not_found(self, connected_adapter):
|
|
"""Missing file returns error without calling Telegram API."""
|
|
result = await connected_adapter.send_document(
|
|
chat_id="12345",
|
|
file_path="/nonexistent/file.pdf",
|
|
)
|
|
|
|
assert result.success is False
|
|
assert "not found" in result.error.lower()
|
|
connected_adapter._bot.send_document.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_document_not_connected(self, adapter):
|
|
"""If bot is None, returns not connected error."""
|
|
result = await adapter.send_document(
|
|
chat_id="12345",
|
|
file_path="/some/file.pdf",
|
|
)
|
|
|
|
assert result.success is False
|
|
assert "Not connected" in result.error
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_document_caption_truncated(self, connected_adapter, tmp_path):
|
|
"""Captions longer than 1024 chars are truncated."""
|
|
test_file = tmp_path / "data.json"
|
|
test_file.write_bytes(b"{}")
|
|
|
|
mock_msg = MagicMock()
|
|
mock_msg.message_id = 101
|
|
connected_adapter._bot.send_document = AsyncMock(return_value=mock_msg)
|
|
|
|
long_caption = "x" * 2000
|
|
await connected_adapter.send_document(
|
|
chat_id="12345",
|
|
file_path=str(test_file),
|
|
caption=long_caption,
|
|
)
|
|
|
|
call_kwargs = connected_adapter._bot.send_document.call_args[1]
|
|
assert len(call_kwargs["caption"]) == 1024
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_document_api_error_falls_back(self, connected_adapter, tmp_path):
|
|
"""If Telegram API raises, falls back to base class text message."""
|
|
test_file = tmp_path / "file.pdf"
|
|
test_file.write_bytes(b"data")
|
|
|
|
connected_adapter._bot.send_document = AsyncMock(
|
|
side_effect=RuntimeError("Telegram API error")
|
|
)
|
|
|
|
# The base fallback calls self.send() which is also on _bot, so mock it
|
|
# to avoid cascading errors.
|
|
connected_adapter.send = AsyncMock(
|
|
return_value=SendResult(success=True, message_id="fallback")
|
|
)
|
|
|
|
result = await connected_adapter.send_document(
|
|
chat_id="12345",
|
|
file_path=str(test_file),
|
|
)
|
|
|
|
# Should have fallen back to base class
|
|
assert result.success is True
|
|
assert result.message_id == "fallback"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_document_reply_to(self, connected_adapter, tmp_path):
|
|
"""reply_to parameter is forwarded as reply_to_message_id."""
|
|
test_file = tmp_path / "spec.md"
|
|
test_file.write_bytes(b"# Spec")
|
|
|
|
mock_msg = MagicMock()
|
|
mock_msg.message_id = 102
|
|
connected_adapter._bot.send_document = AsyncMock(return_value=mock_msg)
|
|
|
|
await connected_adapter.send_document(
|
|
chat_id="12345",
|
|
file_path=str(test_file),
|
|
reply_to="50",
|
|
)
|
|
|
|
call_kwargs = connected_adapter._bot.send_document.call_args[1]
|
|
assert call_kwargs["reply_to_message_id"] == 50
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_document_thread_id(self, connected_adapter, tmp_path):
|
|
"""metadata thread_id is forwarded as message_thread_id (required for Telegram forum groups)."""
|
|
test_file = tmp_path / "report.pdf"
|
|
test_file.write_bytes(b"%PDF-1.4 data")
|
|
|
|
mock_msg = MagicMock()
|
|
mock_msg.message_id = 103
|
|
connected_adapter._bot.send_document = AsyncMock(return_value=mock_msg)
|
|
|
|
await connected_adapter.send_document(
|
|
chat_id="12345",
|
|
file_path=str(test_file),
|
|
metadata={"thread_id": "789"},
|
|
)
|
|
|
|
call_kwargs = connected_adapter._bot.send_document.call_args[1]
|
|
assert call_kwargs["message_thread_id"] == 789
|
|
|
|
|
|
class TestTelegramPhotoBatching:
|
|
@pytest.mark.asyncio
|
|
async def test_flush_photo_batch_does_not_drop_newer_scheduled_task(self, adapter):
|
|
old_task = MagicMock()
|
|
new_task = MagicMock()
|
|
batch_key = "session:photo-burst"
|
|
adapter._pending_photo_batch_tasks[batch_key] = new_task
|
|
adapter._pending_photo_batches[batch_key] = MessageEvent(
|
|
text="",
|
|
message_type=MessageType.PHOTO,
|
|
source=SimpleNamespace(channel_id="chat-1"),
|
|
media_urls=["/tmp/a.jpg"],
|
|
media_types=["image/jpeg"],
|
|
)
|
|
|
|
with (
|
|
patch("gateway.platforms.telegram.asyncio.current_task", return_value=old_task),
|
|
patch("gateway.platforms.telegram.asyncio.sleep", new=AsyncMock()),
|
|
):
|
|
await adapter._flush_photo_batch(batch_key)
|
|
|
|
assert adapter._pending_photo_batch_tasks[batch_key] is new_task
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_disconnect_cancels_pending_photo_batch_tasks(self, adapter):
|
|
task = MagicMock()
|
|
task.done.return_value = False
|
|
adapter._pending_photo_batch_tasks["session:photo-burst"] = task
|
|
adapter._pending_photo_batches["session:photo-burst"] = MessageEvent(
|
|
text="",
|
|
message_type=MessageType.PHOTO,
|
|
source=SimpleNamespace(channel_id="chat-1"),
|
|
)
|
|
adapter._app = MagicMock()
|
|
adapter._app.updater.stop = AsyncMock()
|
|
adapter._app.stop = AsyncMock()
|
|
adapter._app.shutdown = AsyncMock()
|
|
|
|
await adapter.disconnect()
|
|
|
|
task.cancel.assert_called_once()
|
|
assert adapter._pending_photo_batch_tasks == {}
|
|
assert adapter._pending_photo_batches == {}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestSendVideo — outbound video delivery
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestSendVideo:
|
|
"""Tests for TelegramAdapter.send_video() — sending videos to users."""
|
|
|
|
@pytest.fixture()
|
|
def connected_adapter(self, adapter):
|
|
bot = AsyncMock()
|
|
adapter._bot = bot
|
|
return adapter
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_video_success(self, connected_adapter, tmp_path):
|
|
test_file = tmp_path / "clip.mp4"
|
|
test_file.write_bytes(b"\x00\x00\x00\x1c" + b"ftyp" + b"\x00" * 100)
|
|
|
|
mock_msg = MagicMock()
|
|
mock_msg.message_id = 200
|
|
connected_adapter._bot.send_video = AsyncMock(return_value=mock_msg)
|
|
|
|
result = await connected_adapter.send_video(
|
|
chat_id="12345",
|
|
video_path=str(test_file),
|
|
caption="Check this out",
|
|
)
|
|
|
|
assert result.success is True
|
|
assert result.message_id == "200"
|
|
connected_adapter._bot.send_video.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_video_file_not_found(self, connected_adapter):
|
|
result = await connected_adapter.send_video(
|
|
chat_id="12345",
|
|
video_path="/nonexistent/video.mp4",
|
|
)
|
|
|
|
assert result.success is False
|
|
assert "not found" in result.error.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_video_not_connected(self, adapter):
|
|
result = await adapter.send_video(
|
|
chat_id="12345",
|
|
video_path="/some/video.mp4",
|
|
)
|
|
|
|
assert result.success is False
|
|
assert "Not connected" in result.error
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_video_thread_id(self, connected_adapter, tmp_path):
|
|
"""metadata thread_id is forwarded as message_thread_id (required for Telegram forum groups)."""
|
|
test_file = tmp_path / "clip.mp4"
|
|
test_file.write_bytes(b"\x00\x00\x00\x1c" + b"ftyp" + b"\x00" * 100)
|
|
|
|
mock_msg = MagicMock()
|
|
mock_msg.message_id = 201
|
|
connected_adapter._bot.send_video = AsyncMock(return_value=mock_msg)
|
|
|
|
await connected_adapter.send_video(
|
|
chat_id="12345",
|
|
video_path=str(test_file),
|
|
metadata={"thread_id": "789"},
|
|
)
|
|
|
|
call_kwargs = connected_adapter._bot.send_video.call_args[1]
|
|
assert call_kwargs["message_thread_id"] == 789
|