* fix: respect DashScope v1 runtime mode for alibaba Remove the hardcoded Alibaba branch from resolve_runtime_provider() that forced api_mode='anthropic_messages' regardless of the base URL. Alibaba now goes through the generic API-key provider path, which auto-detects the protocol from the URL: - /apps/anthropic → anthropic_messages (via endswith check) - /v1 → chat_completions (default) This fixes Alibaba setup with OpenAI-compatible DashScope endpoints (e.g. coding-intl.dashscope.aliyuncs.com/v1) that were broken because runtime always forced Anthropic mode even when setup saved a /v1 URL. Based on PR #2024 by @kshitijk4poor. * docs(skill): add split, merge, search examples to ocr-and-documents skill Adds pymupdf examples for PDF splitting, merging, and text search to the existing ocr-and-documents skill. No new dependencies — pymupdf already covers all three operations natively. * fix: replace all production print() calls with logger in rl_training_tool Replace all bare print() calls in production code paths with proper logger calls. - Add `import logging` and module-level `logger = logging.getLogger(__name__)` - Replace print() in _start_training_run() with logger.info() - Replace print() in _stop_training_run() with logger.info() - Replace print(Warning/Note) calls with logger.warning() and logger.info() Using the logging framework allows log level filtering, proper formatting, and log routing instead of always printing to stdout. * fix(gateway): process /queue'd messages after agent completion /queue stored messages in adapter._pending_messages but never consumed them after normal (non-interrupted) completion. The consumption path at line 5219 only checked pending messages when result.get('interrupted') was True — since /queue deliberately doesn't interrupt, queued messages were silently dropped. Now checks adapter._pending_messages after both interrupted AND normal completion. For queued messages (non-interrupt), the first response is delivered before recursing to process the queued follow-up. Skips the direct send when streaming already delivered the response. Reported by GhostMode on Discord. --------- Co-authored-by: kshitijk4poor <kshitijk4poor@users.noreply.github.com> Co-authored-by: memosr.eth <96793918+memosr@users.noreply.github.com>
166 lines
5.8 KiB
Python
166 lines
5.8 KiB
Python
"""Tests for /queue message consumption after normal agent completion.
|
|
|
|
Verifies that messages queued via /queue (which store in
|
|
adapter._pending_messages WITHOUT triggering an interrupt) are consumed
|
|
after the agent finishes its current task — not silently dropped.
|
|
"""
|
|
|
|
import asyncio
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from gateway.platforms.base import (
|
|
BasePlatformAdapter,
|
|
MessageEvent,
|
|
MessageType,
|
|
PlatformConfig,
|
|
Platform,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Minimal adapter for testing pending message storage
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class _StubAdapter(BasePlatformAdapter):
|
|
def __init__(self):
|
|
super().__init__(PlatformConfig(enabled=True, token="test"), Platform.TELEGRAM)
|
|
|
|
async def connect(self) -> bool:
|
|
return True
|
|
|
|
async def disconnect(self) -> None:
|
|
self._mark_disconnected()
|
|
|
|
async def send(self, chat_id, content, reply_to=None, metadata=None):
|
|
from gateway.platforms.base import SendResult
|
|
return SendResult(success=True, message_id="msg-1")
|
|
|
|
async def get_chat_info(self, chat_id):
|
|
return {"id": chat_id, "type": "dm"}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestQueueMessageStorage:
|
|
"""Verify /queue stores messages correctly in adapter._pending_messages."""
|
|
|
|
def test_queue_stores_message_in_pending(self):
|
|
adapter = _StubAdapter()
|
|
session_key = "telegram:user:123"
|
|
event = MessageEvent(
|
|
text="do this next",
|
|
message_type=MessageType.TEXT,
|
|
source=MagicMock(chat_id="123", platform=Platform.TELEGRAM),
|
|
message_id="q1",
|
|
)
|
|
adapter._pending_messages[session_key] = event
|
|
|
|
assert session_key in adapter._pending_messages
|
|
assert adapter._pending_messages[session_key].text == "do this next"
|
|
|
|
def test_get_pending_message_consumes_and_clears(self):
|
|
adapter = _StubAdapter()
|
|
session_key = "telegram:user:123"
|
|
event = MessageEvent(
|
|
text="queued prompt",
|
|
message_type=MessageType.TEXT,
|
|
source=MagicMock(chat_id="123", platform=Platform.TELEGRAM),
|
|
message_id="q2",
|
|
)
|
|
adapter._pending_messages[session_key] = event
|
|
|
|
retrieved = adapter.get_pending_message(session_key)
|
|
assert retrieved is not None
|
|
assert retrieved.text == "queued prompt"
|
|
# Should be consumed (cleared)
|
|
assert adapter.get_pending_message(session_key) is None
|
|
|
|
def test_queue_does_not_set_interrupt_event(self):
|
|
"""The whole point of /queue — no interrupt signal."""
|
|
adapter = _StubAdapter()
|
|
session_key = "telegram:user:123"
|
|
|
|
# Simulate an active session (agent running)
|
|
adapter._active_sessions[session_key] = asyncio.Event()
|
|
|
|
# Store a queued message (what /queue does)
|
|
event = MessageEvent(
|
|
text="queued",
|
|
message_type=MessageType.TEXT,
|
|
source=MagicMock(),
|
|
message_id="q3",
|
|
)
|
|
adapter._pending_messages[session_key] = event
|
|
|
|
# The interrupt event should NOT be set
|
|
assert not adapter._active_sessions[session_key].is_set()
|
|
assert not adapter.has_pending_interrupt(session_key)
|
|
|
|
def test_regular_message_sets_interrupt_event(self):
|
|
"""Contrast: regular messages DO trigger interrupt."""
|
|
adapter = _StubAdapter()
|
|
session_key = "telegram:user:123"
|
|
|
|
adapter._active_sessions[session_key] = asyncio.Event()
|
|
|
|
# Simulate regular message arrival (what handle_message does)
|
|
event = MessageEvent(
|
|
text="new message",
|
|
message_type=MessageType.TEXT,
|
|
source=MagicMock(),
|
|
message_id="m1",
|
|
)
|
|
adapter._pending_messages[session_key] = event
|
|
adapter._active_sessions[session_key].set() # this is what handle_message does
|
|
|
|
assert adapter.has_pending_interrupt(session_key)
|
|
|
|
|
|
class TestQueueConsumptionAfterCompletion:
|
|
"""Verify that pending messages are consumed after normal completion."""
|
|
|
|
def test_pending_message_available_after_normal_completion(self):
|
|
"""After agent finishes without interrupt, pending message should
|
|
still be retrievable from adapter._pending_messages."""
|
|
adapter = _StubAdapter()
|
|
session_key = "telegram:user:123"
|
|
|
|
# Simulate: agent starts, /queue stores a message, agent finishes
|
|
adapter._active_sessions[session_key] = asyncio.Event()
|
|
event = MessageEvent(
|
|
text="process this after",
|
|
message_type=MessageType.TEXT,
|
|
source=MagicMock(),
|
|
message_id="q4",
|
|
)
|
|
adapter._pending_messages[session_key] = event
|
|
|
|
# Agent finishes (no interrupt)
|
|
del adapter._active_sessions[session_key]
|
|
|
|
# The queued message should still be retrievable
|
|
retrieved = adapter.get_pending_message(session_key)
|
|
assert retrieved is not None
|
|
assert retrieved.text == "process this after"
|
|
|
|
def test_multiple_queues_last_one_wins(self):
|
|
"""If user /queue's multiple times, last message overwrites."""
|
|
adapter = _StubAdapter()
|
|
session_key = "telegram:user:123"
|
|
|
|
for text in ["first", "second", "third"]:
|
|
event = MessageEvent(
|
|
text=text,
|
|
message_type=MessageType.TEXT,
|
|
source=MagicMock(),
|
|
message_id=f"q-{text}",
|
|
)
|
|
adapter._pending_messages[session_key] = event
|
|
|
|
retrieved = adapter.get_pending_message(session_key)
|
|
assert retrieved.text == "third"
|