fix(gateway): process /queue'd messages after agent completion (#2469)
* fix: respect DashScope v1 runtime mode for alibaba Remove the hardcoded Alibaba branch from resolve_runtime_provider() that forced api_mode='anthropic_messages' regardless of the base URL. Alibaba now goes through the generic API-key provider path, which auto-detects the protocol from the URL: - /apps/anthropic → anthropic_messages (via endswith check) - /v1 → chat_completions (default) This fixes Alibaba setup with OpenAI-compatible DashScope endpoints (e.g. coding-intl.dashscope.aliyuncs.com/v1) that were broken because runtime always forced Anthropic mode even when setup saved a /v1 URL. Based on PR #2024 by @kshitijk4poor. * docs(skill): add split, merge, search examples to ocr-and-documents skill Adds pymupdf examples for PDF splitting, merging, and text search to the existing ocr-and-documents skill. No new dependencies — pymupdf already covers all three operations natively. * fix: replace all production print() calls with logger in rl_training_tool Replace all bare print() calls in production code paths with proper logger calls. - Add `import logging` and module-level `logger = logging.getLogger(__name__)` - Replace print() in _start_training_run() with logger.info() - Replace print() in _stop_training_run() with logger.info() - Replace print(Warning/Note) calls with logger.warning() and logger.info() Using the logging framework allows log level filtering, proper formatting, and log routing instead of always printing to stdout. * fix(gateway): process /queue'd messages after agent completion /queue stored messages in adapter._pending_messages but never consumed them after normal (non-interrupted) completion. The consumption path at line 5219 only checked pending messages when result.get('interrupted') was True — since /queue deliberately doesn't interrupt, queued messages were silently dropped. Now checks adapter._pending_messages after both interrupted AND normal completion. For queued messages (non-interrupt), the first response is delivered before recursing to process the queued follow-up. Skips the direct send when streaming already delivered the response. Reported by GhostMode on Discord. --------- Co-authored-by: kshitijk4poor <kshitijk4poor@users.noreply.github.com> Co-authored-by: memosr.eth <96793918+memosr@users.noreply.github.com>
This commit is contained in:
@@ -5226,22 +5226,31 @@ class GatewayRunner:
|
||||
self._effective_model = None
|
||||
self._effective_provider = None
|
||||
|
||||
# Check if we were interrupted and have a pending message
|
||||
# Check if we were interrupted OR have a queued message (/queue).
|
||||
result = result_holder[0]
|
||||
adapter = self.adapters.get(source.platform)
|
||||
|
||||
# Get pending message from adapter if interrupted.
|
||||
# Get pending message from adapter.
|
||||
# Use session_key (not source.chat_id) to match adapter's storage keys.
|
||||
pending = None
|
||||
if result and result.get("interrupted") and adapter:
|
||||
pending_event = adapter.get_pending_message(session_key) if session_key else None
|
||||
if pending_event:
|
||||
pending = pending_event.text
|
||||
elif result.get("interrupt_message"):
|
||||
pending = result.get("interrupt_message")
|
||||
if result and adapter and session_key:
|
||||
if result.get("interrupted"):
|
||||
# Interrupted — consume the interrupt message
|
||||
pending_event = adapter.get_pending_message(session_key)
|
||||
if pending_event:
|
||||
pending = pending_event.text
|
||||
elif result.get("interrupt_message"):
|
||||
pending = result.get("interrupt_message")
|
||||
else:
|
||||
# Normal completion — check for /queue'd messages that were
|
||||
# stored without triggering an interrupt.
|
||||
pending_event = adapter.get_pending_message(session_key)
|
||||
if pending_event:
|
||||
pending = pending_event.text
|
||||
logger.debug("Processing queued message after agent completion: '%s...'", pending[:40])
|
||||
|
||||
if pending:
|
||||
logger.debug("Processing interrupted message: '%s...'", pending[:40])
|
||||
logger.debug("Processing pending message: '%s...'", pending[:40])
|
||||
|
||||
# Clear the adapter's interrupt event so the next _run_agent call
|
||||
# doesn't immediately re-trigger the interrupt before the new agent
|
||||
@@ -5263,11 +5272,25 @@ class GatewayRunner:
|
||||
adapter.queue_message(session_key, pending)
|
||||
return result_holder[0] or {"final_response": response, "messages": history}
|
||||
|
||||
# Don't send the interrupted response to the user — it's just noise
|
||||
# like "Operation interrupted." They already know they sent a new
|
||||
# message, so go straight to processing it.
|
||||
|
||||
# Now process the pending message with updated history
|
||||
was_interrupted = result.get("interrupted")
|
||||
if not was_interrupted:
|
||||
# Queued message after normal completion — deliver the first
|
||||
# response before processing the queued follow-up.
|
||||
# Skip if streaming already delivered it.
|
||||
_sc = stream_consumer_holder[0]
|
||||
_already_streamed = _sc and getattr(_sc, "already_sent", False)
|
||||
first_response = result.get("final_response", "")
|
||||
if first_response and not _already_streamed:
|
||||
try:
|
||||
await adapter.send(source.chat_id, first_response,
|
||||
metadata=getattr(event, "metadata", None))
|
||||
except Exception as e:
|
||||
logger.warning("Failed to send first response before queued message: %s", e)
|
||||
# else: interrupted — discard the interrupted response ("Operation
|
||||
# interrupted." is just noise; the user already knows they sent a
|
||||
# new message).
|
||||
|
||||
# Process the pending message with updated history
|
||||
updated_history = result.get("messages", history)
|
||||
return await self._run_agent(
|
||||
message=pending,
|
||||
|
||||
165
tests/gateway/test_queue_consumption.py
Normal file
165
tests/gateway/test_queue_consumption.py
Normal file
@@ -0,0 +1,165 @@
|
||||
"""Tests for /queue message consumption after normal agent completion.
|
||||
|
||||
Verifies that messages queued via /queue (which store in
|
||||
adapter._pending_messages WITHOUT triggering an interrupt) are consumed
|
||||
after the agent finishes its current task — not silently dropped.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.platforms.base import (
|
||||
BasePlatformAdapter,
|
||||
MessageEvent,
|
||||
MessageType,
|
||||
PlatformConfig,
|
||||
Platform,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Minimal adapter for testing pending message storage
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class _StubAdapter(BasePlatformAdapter):
|
||||
def __init__(self):
|
||||
super().__init__(PlatformConfig(enabled=True, token="test"), Platform.TELEGRAM)
|
||||
|
||||
async def connect(self) -> bool:
|
||||
return True
|
||||
|
||||
async def disconnect(self) -> None:
|
||||
self._mark_disconnected()
|
||||
|
||||
async def send(self, chat_id, content, reply_to=None, metadata=None):
|
||||
from gateway.platforms.base import SendResult
|
||||
return SendResult(success=True, message_id="msg-1")
|
||||
|
||||
async def get_chat_info(self, chat_id):
|
||||
return {"id": chat_id, "type": "dm"}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestQueueMessageStorage:
|
||||
"""Verify /queue stores messages correctly in adapter._pending_messages."""
|
||||
|
||||
def test_queue_stores_message_in_pending(self):
|
||||
adapter = _StubAdapter()
|
||||
session_key = "telegram:user:123"
|
||||
event = MessageEvent(
|
||||
text="do this next",
|
||||
message_type=MessageType.TEXT,
|
||||
source=MagicMock(chat_id="123", platform=Platform.TELEGRAM),
|
||||
message_id="q1",
|
||||
)
|
||||
adapter._pending_messages[session_key] = event
|
||||
|
||||
assert session_key in adapter._pending_messages
|
||||
assert adapter._pending_messages[session_key].text == "do this next"
|
||||
|
||||
def test_get_pending_message_consumes_and_clears(self):
|
||||
adapter = _StubAdapter()
|
||||
session_key = "telegram:user:123"
|
||||
event = MessageEvent(
|
||||
text="queued prompt",
|
||||
message_type=MessageType.TEXT,
|
||||
source=MagicMock(chat_id="123", platform=Platform.TELEGRAM),
|
||||
message_id="q2",
|
||||
)
|
||||
adapter._pending_messages[session_key] = event
|
||||
|
||||
retrieved = adapter.get_pending_message(session_key)
|
||||
assert retrieved is not None
|
||||
assert retrieved.text == "queued prompt"
|
||||
# Should be consumed (cleared)
|
||||
assert adapter.get_pending_message(session_key) is None
|
||||
|
||||
def test_queue_does_not_set_interrupt_event(self):
|
||||
"""The whole point of /queue — no interrupt signal."""
|
||||
adapter = _StubAdapter()
|
||||
session_key = "telegram:user:123"
|
||||
|
||||
# Simulate an active session (agent running)
|
||||
adapter._active_sessions[session_key] = asyncio.Event()
|
||||
|
||||
# Store a queued message (what /queue does)
|
||||
event = MessageEvent(
|
||||
text="queued",
|
||||
message_type=MessageType.TEXT,
|
||||
source=MagicMock(),
|
||||
message_id="q3",
|
||||
)
|
||||
adapter._pending_messages[session_key] = event
|
||||
|
||||
# The interrupt event should NOT be set
|
||||
assert not adapter._active_sessions[session_key].is_set()
|
||||
assert not adapter.has_pending_interrupt(session_key)
|
||||
|
||||
def test_regular_message_sets_interrupt_event(self):
|
||||
"""Contrast: regular messages DO trigger interrupt."""
|
||||
adapter = _StubAdapter()
|
||||
session_key = "telegram:user:123"
|
||||
|
||||
adapter._active_sessions[session_key] = asyncio.Event()
|
||||
|
||||
# Simulate regular message arrival (what handle_message does)
|
||||
event = MessageEvent(
|
||||
text="new message",
|
||||
message_type=MessageType.TEXT,
|
||||
source=MagicMock(),
|
||||
message_id="m1",
|
||||
)
|
||||
adapter._pending_messages[session_key] = event
|
||||
adapter._active_sessions[session_key].set() # this is what handle_message does
|
||||
|
||||
assert adapter.has_pending_interrupt(session_key)
|
||||
|
||||
|
||||
class TestQueueConsumptionAfterCompletion:
|
||||
"""Verify that pending messages are consumed after normal completion."""
|
||||
|
||||
def test_pending_message_available_after_normal_completion(self):
|
||||
"""After agent finishes without interrupt, pending message should
|
||||
still be retrievable from adapter._pending_messages."""
|
||||
adapter = _StubAdapter()
|
||||
session_key = "telegram:user:123"
|
||||
|
||||
# Simulate: agent starts, /queue stores a message, agent finishes
|
||||
adapter._active_sessions[session_key] = asyncio.Event()
|
||||
event = MessageEvent(
|
||||
text="process this after",
|
||||
message_type=MessageType.TEXT,
|
||||
source=MagicMock(),
|
||||
message_id="q4",
|
||||
)
|
||||
adapter._pending_messages[session_key] = event
|
||||
|
||||
# Agent finishes (no interrupt)
|
||||
del adapter._active_sessions[session_key]
|
||||
|
||||
# The queued message should still be retrievable
|
||||
retrieved = adapter.get_pending_message(session_key)
|
||||
assert retrieved is not None
|
||||
assert retrieved.text == "process this after"
|
||||
|
||||
def test_multiple_queues_last_one_wins(self):
|
||||
"""If user /queue's multiple times, last message overwrites."""
|
||||
adapter = _StubAdapter()
|
||||
session_key = "telegram:user:123"
|
||||
|
||||
for text in ["first", "second", "third"]:
|
||||
event = MessageEvent(
|
||||
text=text,
|
||||
message_type=MessageType.TEXT,
|
||||
source=MagicMock(),
|
||||
message_id=f"q-{text}",
|
||||
)
|
||||
adapter._pending_messages[session_key] = event
|
||||
|
||||
retrieved = adapter.get_pending_message(session_key)
|
||||
assert retrieved.text == "third"
|
||||
Reference in New Issue
Block a user