Files
hermes-agent/tests/gateway/test_base_topic_sessions.py
Teknium 227601c200 feat(discord): add message processing reactions (salvage #1980) (#3871)
Adds lifecycle hooks to the base platform adapter so Discord (and future
platforms) can react to message processing events:

  👀  when processing starts
    on successful completion (delivery confirmed)
    on failure, error, or cancellation

Implementation:
- base.py: on_processing_start/on_processing_complete hooks with
  _run_processing_hook error isolation wrapper; delivery tracking
  via _record_delivery closure for accurate success detection
- discord.py: _add_reaction/_remove_reaction helpers + hook overrides
- Tests for base hook lifecycle and Discord-specific reactions

Co-authored-by: alanwilhelm <alanwilhelm@users.noreply.github.com>
2026-03-29 21:55:23 -07:00

223 lines
7.1 KiB
Python

"""Tests for BasePlatformAdapter topic-aware session handling."""
import asyncio
from types import SimpleNamespace
import pytest
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import BasePlatformAdapter, MessageEvent, SendResult
from gateway.session import SessionSource, build_session_key
class DummyTelegramAdapter(BasePlatformAdapter):
def __init__(self):
super().__init__(PlatformConfig(enabled=True, token="fake-token"), Platform.TELEGRAM)
self.sent = []
self.typing = []
self.processing_hooks = []
async def connect(self) -> bool:
return True
async def disconnect(self) -> None:
return None
async def send(self, chat_id, content, reply_to=None, metadata=None) -> SendResult:
self.sent.append(
{
"chat_id": chat_id,
"content": content,
"reply_to": reply_to,
"metadata": metadata,
}
)
return SendResult(success=True, message_id="1")
async def send_typing(self, chat_id: str, metadata=None) -> None:
self.typing.append({"chat_id": chat_id, "metadata": metadata})
return None
async def get_chat_info(self, chat_id: str):
return {"id": chat_id}
async def on_processing_start(self, event: MessageEvent) -> None:
self.processing_hooks.append(("start", event.message_id))
async def on_processing_complete(self, event: MessageEvent, success: bool) -> None:
self.processing_hooks.append(("complete", event.message_id, success))
def _make_event(chat_id: str, thread_id: str, message_id: str = "1") -> MessageEvent:
return MessageEvent(
text="hello",
source=SessionSource(
platform=Platform.TELEGRAM,
chat_id=chat_id,
chat_type="group",
thread_id=thread_id,
),
message_id=message_id,
)
class TestBasePlatformTopicSessions:
@pytest.mark.asyncio
async def test_handle_message_does_not_interrupt_different_topic(self, monkeypatch):
adapter = DummyTelegramAdapter()
adapter.set_message_handler(lambda event: asyncio.sleep(0, result=None))
active_event = _make_event("-1001", "10")
adapter._active_sessions[build_session_key(active_event.source)] = asyncio.Event()
scheduled = []
def fake_create_task(coro):
scheduled.append(coro)
coro.close()
return SimpleNamespace()
monkeypatch.setattr(asyncio, "create_task", fake_create_task)
await adapter.handle_message(_make_event("-1001", "11"))
assert len(scheduled) == 1
assert adapter._pending_messages == {}
@pytest.mark.asyncio
async def test_handle_message_interrupts_same_topic(self, monkeypatch):
adapter = DummyTelegramAdapter()
adapter.set_message_handler(lambda event: asyncio.sleep(0, result=None))
active_event = _make_event("-1001", "10")
adapter._active_sessions[build_session_key(active_event.source)] = asyncio.Event()
scheduled = []
def fake_create_task(coro):
scheduled.append(coro)
coro.close()
return SimpleNamespace()
monkeypatch.setattr(asyncio, "create_task", fake_create_task)
pending_event = _make_event("-1001", "10", message_id="2")
await adapter.handle_message(pending_event)
assert scheduled == []
assert adapter.get_pending_message(build_session_key(pending_event.source)) == pending_event
@pytest.mark.asyncio
async def test_process_message_background_replies_in_same_topic(self):
adapter = DummyTelegramAdapter()
typing_calls = []
async def handler(_event):
await asyncio.sleep(0)
return "ack"
async def hold_typing(_chat_id, interval=2.0, metadata=None):
typing_calls.append({"chat_id": _chat_id, "metadata": metadata})
await asyncio.Event().wait()
adapter.set_message_handler(handler)
adapter._keep_typing = hold_typing
event = _make_event("-1001", "17585")
await adapter._process_message_background(event, build_session_key(event.source))
assert adapter.sent == [
{
"chat_id": "-1001",
"content": "ack",
"reply_to": "1",
"metadata": {"thread_id": "17585"},
}
]
assert typing_calls == [
{
"chat_id": "-1001",
"metadata": {"thread_id": "17585"},
}
]
assert adapter.processing_hooks == [
("start", "1"),
("complete", "1", True),
]
@pytest.mark.asyncio
async def test_process_message_background_marks_total_send_failure_unsuccessful(self):
adapter = DummyTelegramAdapter()
async def handler(_event):
await asyncio.sleep(0)
return "ack"
async def failing_send(*_args, **_kwargs):
return SendResult(success=False, error="send failed")
async def hold_typing(_chat_id, interval=2.0, metadata=None):
await asyncio.Event().wait()
adapter.set_message_handler(handler)
adapter.send = failing_send
adapter._keep_typing = hold_typing
event = _make_event("-1001", "17585")
await adapter._process_message_background(event, build_session_key(event.source))
assert adapter.processing_hooks == [
("start", "1"),
("complete", "1", False),
]
@pytest.mark.asyncio
async def test_process_message_background_marks_exception_unsuccessful(self):
adapter = DummyTelegramAdapter()
async def handler(_event):
await asyncio.sleep(0)
raise RuntimeError("boom")
async def hold_typing(_chat_id, interval=2.0, metadata=None):
await asyncio.Event().wait()
adapter.set_message_handler(handler)
adapter._keep_typing = hold_typing
event = _make_event("-1001", "17585")
await adapter._process_message_background(event, build_session_key(event.source))
assert adapter.processing_hooks == [
("start", "1"),
("complete", "1", False),
]
@pytest.mark.asyncio
async def test_process_message_background_marks_cancellation_unsuccessful(self):
adapter = DummyTelegramAdapter()
release = asyncio.Event()
async def handler(_event):
await release.wait()
return "ack"
async def hold_typing(_chat_id, interval=2.0, metadata=None):
await asyncio.Event().wait()
adapter.set_message_handler(handler)
adapter._keep_typing = hold_typing
event = _make_event("-1001", "17585")
task = asyncio.create_task(adapter._process_message_background(event, build_session_key(event.source)))
await asyncio.sleep(0)
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
assert adapter.processing_hooks == [
("start", "1"),
("complete", "1", False),
]