Adds lifecycle hooks to the base platform adapter so Discord (and future platforms) can react to message processing events: 👀 when processing starts ✅ on successful completion (delivery confirmed) ❌ on failure, error, or cancellation Implementation: - base.py: on_processing_start/on_processing_complete hooks with _run_processing_hook error isolation wrapper; delivery tracking via _record_delivery closure for accurate success detection - discord.py: _add_reaction/_remove_reaction helpers + hook overrides - Tests for base hook lifecycle and Discord-specific reactions Co-authored-by: alanwilhelm <alanwilhelm@users.noreply.github.com>
223 lines
7.1 KiB
Python
223 lines
7.1 KiB
Python
"""Tests for BasePlatformAdapter topic-aware session handling."""
|
|
|
|
import asyncio
|
|
from types import SimpleNamespace
|
|
|
|
import pytest
|
|
|
|
from gateway.config import Platform, PlatformConfig
|
|
from gateway.platforms.base import BasePlatformAdapter, MessageEvent, SendResult
|
|
from gateway.session import SessionSource, build_session_key
|
|
|
|
|
|
class DummyTelegramAdapter(BasePlatformAdapter):
|
|
def __init__(self):
|
|
super().__init__(PlatformConfig(enabled=True, token="fake-token"), Platform.TELEGRAM)
|
|
self.sent = []
|
|
self.typing = []
|
|
self.processing_hooks = []
|
|
|
|
async def connect(self) -> bool:
|
|
return True
|
|
|
|
async def disconnect(self) -> None:
|
|
return None
|
|
|
|
async def send(self, chat_id, content, reply_to=None, metadata=None) -> SendResult:
|
|
self.sent.append(
|
|
{
|
|
"chat_id": chat_id,
|
|
"content": content,
|
|
"reply_to": reply_to,
|
|
"metadata": metadata,
|
|
}
|
|
)
|
|
return SendResult(success=True, message_id="1")
|
|
|
|
async def send_typing(self, chat_id: str, metadata=None) -> None:
|
|
self.typing.append({"chat_id": chat_id, "metadata": metadata})
|
|
return None
|
|
|
|
async def get_chat_info(self, chat_id: str):
|
|
return {"id": chat_id}
|
|
|
|
async def on_processing_start(self, event: MessageEvent) -> None:
|
|
self.processing_hooks.append(("start", event.message_id))
|
|
|
|
async def on_processing_complete(self, event: MessageEvent, success: bool) -> None:
|
|
self.processing_hooks.append(("complete", event.message_id, success))
|
|
|
|
|
|
def _make_event(chat_id: str, thread_id: str, message_id: str = "1") -> MessageEvent:
|
|
return MessageEvent(
|
|
text="hello",
|
|
source=SessionSource(
|
|
platform=Platform.TELEGRAM,
|
|
chat_id=chat_id,
|
|
chat_type="group",
|
|
thread_id=thread_id,
|
|
),
|
|
message_id=message_id,
|
|
)
|
|
|
|
|
|
class TestBasePlatformTopicSessions:
|
|
@pytest.mark.asyncio
|
|
async def test_handle_message_does_not_interrupt_different_topic(self, monkeypatch):
|
|
adapter = DummyTelegramAdapter()
|
|
adapter.set_message_handler(lambda event: asyncio.sleep(0, result=None))
|
|
|
|
active_event = _make_event("-1001", "10")
|
|
adapter._active_sessions[build_session_key(active_event.source)] = asyncio.Event()
|
|
|
|
scheduled = []
|
|
|
|
def fake_create_task(coro):
|
|
scheduled.append(coro)
|
|
coro.close()
|
|
return SimpleNamespace()
|
|
|
|
monkeypatch.setattr(asyncio, "create_task", fake_create_task)
|
|
|
|
await adapter.handle_message(_make_event("-1001", "11"))
|
|
|
|
assert len(scheduled) == 1
|
|
assert adapter._pending_messages == {}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handle_message_interrupts_same_topic(self, monkeypatch):
|
|
adapter = DummyTelegramAdapter()
|
|
adapter.set_message_handler(lambda event: asyncio.sleep(0, result=None))
|
|
|
|
active_event = _make_event("-1001", "10")
|
|
adapter._active_sessions[build_session_key(active_event.source)] = asyncio.Event()
|
|
|
|
scheduled = []
|
|
|
|
def fake_create_task(coro):
|
|
scheduled.append(coro)
|
|
coro.close()
|
|
return SimpleNamespace()
|
|
|
|
monkeypatch.setattr(asyncio, "create_task", fake_create_task)
|
|
|
|
pending_event = _make_event("-1001", "10", message_id="2")
|
|
await adapter.handle_message(pending_event)
|
|
|
|
assert scheduled == []
|
|
assert adapter.get_pending_message(build_session_key(pending_event.source)) == pending_event
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_message_background_replies_in_same_topic(self):
|
|
adapter = DummyTelegramAdapter()
|
|
typing_calls = []
|
|
|
|
async def handler(_event):
|
|
await asyncio.sleep(0)
|
|
return "ack"
|
|
|
|
async def hold_typing(_chat_id, interval=2.0, metadata=None):
|
|
typing_calls.append({"chat_id": _chat_id, "metadata": metadata})
|
|
await asyncio.Event().wait()
|
|
|
|
adapter.set_message_handler(handler)
|
|
adapter._keep_typing = hold_typing
|
|
|
|
event = _make_event("-1001", "17585")
|
|
await adapter._process_message_background(event, build_session_key(event.source))
|
|
|
|
assert adapter.sent == [
|
|
{
|
|
"chat_id": "-1001",
|
|
"content": "ack",
|
|
"reply_to": "1",
|
|
"metadata": {"thread_id": "17585"},
|
|
}
|
|
]
|
|
assert typing_calls == [
|
|
{
|
|
"chat_id": "-1001",
|
|
"metadata": {"thread_id": "17585"},
|
|
}
|
|
]
|
|
assert adapter.processing_hooks == [
|
|
("start", "1"),
|
|
("complete", "1", True),
|
|
]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_message_background_marks_total_send_failure_unsuccessful(self):
|
|
adapter = DummyTelegramAdapter()
|
|
|
|
async def handler(_event):
|
|
await asyncio.sleep(0)
|
|
return "ack"
|
|
|
|
async def failing_send(*_args, **_kwargs):
|
|
return SendResult(success=False, error="send failed")
|
|
|
|
async def hold_typing(_chat_id, interval=2.0, metadata=None):
|
|
await asyncio.Event().wait()
|
|
|
|
adapter.set_message_handler(handler)
|
|
adapter.send = failing_send
|
|
adapter._keep_typing = hold_typing
|
|
|
|
event = _make_event("-1001", "17585")
|
|
await adapter._process_message_background(event, build_session_key(event.source))
|
|
|
|
assert adapter.processing_hooks == [
|
|
("start", "1"),
|
|
("complete", "1", False),
|
|
]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_message_background_marks_exception_unsuccessful(self):
|
|
adapter = DummyTelegramAdapter()
|
|
|
|
async def handler(_event):
|
|
await asyncio.sleep(0)
|
|
raise RuntimeError("boom")
|
|
|
|
async def hold_typing(_chat_id, interval=2.0, metadata=None):
|
|
await asyncio.Event().wait()
|
|
|
|
adapter.set_message_handler(handler)
|
|
adapter._keep_typing = hold_typing
|
|
|
|
event = _make_event("-1001", "17585")
|
|
await adapter._process_message_background(event, build_session_key(event.source))
|
|
|
|
assert adapter.processing_hooks == [
|
|
("start", "1"),
|
|
("complete", "1", False),
|
|
]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_message_background_marks_cancellation_unsuccessful(self):
|
|
adapter = DummyTelegramAdapter()
|
|
release = asyncio.Event()
|
|
|
|
async def handler(_event):
|
|
await release.wait()
|
|
return "ack"
|
|
|
|
async def hold_typing(_chat_id, interval=2.0, metadata=None):
|
|
await asyncio.Event().wait()
|
|
|
|
adapter.set_message_handler(handler)
|
|
adapter._keep_typing = hold_typing
|
|
|
|
event = _make_event("-1001", "17585")
|
|
task = asyncio.create_task(adapter._process_message_background(event, build_session_key(event.source)))
|
|
await asyncio.sleep(0)
|
|
task.cancel()
|
|
|
|
with pytest.raises(asyncio.CancelledError):
|
|
await task
|
|
|
|
assert adapter.processing_hooks == [
|
|
("start", "1"),
|
|
("complete", "1", False),
|
|
]
|