Stream consumer now splits messages that exceed the platform's MAX_MESSAGE_LENGTH. When accumulated text grows past the safe limit, the current message is finalized and a new message is started for the overflow — same as how normal sends chunk long responses. Split point prefers line boundaries (rfind newline) for clean breaks. Works for all platforms (Telegram 4096, Discord 2000, etc.) by reading the adapter's MAX_MESSAGE_LENGTH at runtime. Also added a safety net in the Telegram adapter: if edit_message_text still hits MESSAGE_TOO_LONG (e.g. markdown formatting expansion), it truncates and returns success so the stream consumer doesn't die. Co-authored-by: Test <test@test.com>
203 lines
7.9 KiB
Python
203 lines
7.9 KiB
Python
"""Gateway streaming consumer — bridges sync agent callbacks to async platform delivery.
|
|
|
|
The agent fires stream_delta_callback(text) synchronously from its worker thread.
|
|
GatewayStreamConsumer:
|
|
1. Receives deltas via on_delta() (thread-safe, sync)
|
|
2. Queues them to an asyncio task via queue.Queue
|
|
3. The async run() task buffers, rate-limits, and progressively edits
|
|
a single message on the target platform
|
|
|
|
Design: Uses the edit transport (send initial message, then editMessageText).
|
|
This is universally supported across Telegram, Discord, and Slack.
|
|
|
|
Credit: jobless0x (#774, #1312), OutThisLife (#798), clicksingh (#697).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import queue
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Any, Optional
|
|
|
|
logger = logging.getLogger("gateway.stream_consumer")
|
|
|
|
# Sentinel to signal the stream is complete
|
|
_DONE = object()
|
|
|
|
|
|
@dataclass
|
|
class StreamConsumerConfig:
|
|
"""Runtime config for a single stream consumer instance."""
|
|
edit_interval: float = 0.3
|
|
buffer_threshold: int = 40
|
|
cursor: str = " ▉"
|
|
|
|
|
|
class GatewayStreamConsumer:
|
|
"""Async consumer that progressively edits a platform message with streamed tokens.
|
|
|
|
Usage::
|
|
|
|
consumer = GatewayStreamConsumer(adapter, chat_id, config, metadata=metadata)
|
|
# Pass consumer.on_delta as stream_delta_callback to AIAgent
|
|
agent = AIAgent(..., stream_delta_callback=consumer.on_delta)
|
|
# Start the consumer as an asyncio task
|
|
task = asyncio.create_task(consumer.run())
|
|
# ... run agent in thread pool ...
|
|
consumer.finish() # signal completion
|
|
await task # wait for final edit
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
adapter: Any,
|
|
chat_id: str,
|
|
config: Optional[StreamConsumerConfig] = None,
|
|
metadata: Optional[dict] = None,
|
|
):
|
|
self.adapter = adapter
|
|
self.chat_id = chat_id
|
|
self.cfg = config or StreamConsumerConfig()
|
|
self.metadata = metadata
|
|
self._queue: queue.Queue = queue.Queue()
|
|
self._accumulated = ""
|
|
self._message_id: Optional[str] = None
|
|
self._already_sent = False
|
|
self._edit_supported = True # Disabled on first edit failure (Signal/Email/HA)
|
|
self._last_edit_time = 0.0
|
|
self._last_sent_text = "" # Track last-sent text to skip redundant edits
|
|
|
|
@property
|
|
def already_sent(self) -> bool:
|
|
"""True if at least one message was sent/edited — signals the base
|
|
adapter to skip re-sending the final response."""
|
|
return self._already_sent
|
|
|
|
def on_delta(self, text: str) -> None:
|
|
"""Thread-safe callback — called from the agent's worker thread."""
|
|
if text:
|
|
self._queue.put(text)
|
|
|
|
def finish(self) -> None:
|
|
"""Signal that the stream is complete."""
|
|
self._queue.put(_DONE)
|
|
|
|
async def run(self) -> None:
|
|
"""Async task that drains the queue and edits the platform message."""
|
|
# Platform message length limit — leave room for cursor + formatting
|
|
_raw_limit = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096)
|
|
_safe_limit = max(500, _raw_limit - len(self.cfg.cursor) - 100)
|
|
|
|
try:
|
|
while True:
|
|
# Drain all available items from the queue
|
|
got_done = False
|
|
while True:
|
|
try:
|
|
item = self._queue.get_nowait()
|
|
if item is _DONE:
|
|
got_done = True
|
|
break
|
|
self._accumulated += item
|
|
except queue.Empty:
|
|
break
|
|
|
|
# Decide whether to flush an edit
|
|
now = time.monotonic()
|
|
elapsed = now - self._last_edit_time
|
|
should_edit = (
|
|
got_done
|
|
or (elapsed >= self.cfg.edit_interval
|
|
and len(self._accumulated) > 0)
|
|
or len(self._accumulated) >= self.cfg.buffer_threshold
|
|
)
|
|
|
|
if should_edit and self._accumulated:
|
|
# Split overflow: if accumulated text exceeds the platform
|
|
# limit, finalize the current message and start a new one.
|
|
while (
|
|
len(self._accumulated) > _safe_limit
|
|
and self._message_id is not None
|
|
):
|
|
split_at = self._accumulated.rfind("\n", 0, _safe_limit)
|
|
if split_at < _safe_limit // 2:
|
|
split_at = _safe_limit
|
|
chunk = self._accumulated[:split_at]
|
|
await self._send_or_edit(chunk)
|
|
self._accumulated = self._accumulated[split_at:].lstrip("\n")
|
|
self._message_id = None
|
|
self._last_sent_text = ""
|
|
|
|
display_text = self._accumulated
|
|
if not got_done:
|
|
display_text += self.cfg.cursor
|
|
|
|
await self._send_or_edit(display_text)
|
|
self._last_edit_time = time.monotonic()
|
|
|
|
if got_done:
|
|
# Final edit without cursor
|
|
if self._accumulated and self._message_id:
|
|
await self._send_or_edit(self._accumulated)
|
|
return
|
|
|
|
await asyncio.sleep(0.05) # Small yield to not busy-loop
|
|
|
|
except asyncio.CancelledError:
|
|
# Best-effort final edit on cancellation
|
|
if self._accumulated and self._message_id:
|
|
try:
|
|
await self._send_or_edit(self._accumulated)
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
logger.error("Stream consumer error: %s", e)
|
|
|
|
async def _send_or_edit(self, text: str) -> None:
|
|
"""Send or edit the streaming message."""
|
|
try:
|
|
if self._message_id is not None:
|
|
if self._edit_supported:
|
|
# Skip if text is identical to what we last sent
|
|
if text == self._last_sent_text:
|
|
return
|
|
# Edit existing message
|
|
result = await self.adapter.edit_message(
|
|
chat_id=self.chat_id,
|
|
message_id=self._message_id,
|
|
content=text,
|
|
)
|
|
if result.success:
|
|
self._already_sent = True
|
|
self._last_sent_text = text
|
|
else:
|
|
# Edit not supported by this adapter — stop streaming,
|
|
# let the normal send path handle the final response.
|
|
# Without this guard, adapters like Signal/Email would
|
|
# flood the chat with a new message every edit_interval.
|
|
logger.debug("Edit failed, disabling streaming for this adapter")
|
|
self._edit_supported = False
|
|
else:
|
|
# Editing not supported — skip intermediate updates.
|
|
# The final response will be sent by the normal path.
|
|
pass
|
|
else:
|
|
# First message — send new
|
|
result = await self.adapter.send(
|
|
chat_id=self.chat_id,
|
|
content=text,
|
|
metadata=self.metadata,
|
|
)
|
|
if result.success and result.message_id:
|
|
self._message_id = result.message_id
|
|
self._already_sent = True
|
|
self._last_sent_text = text
|
|
else:
|
|
# Initial send failed — disable streaming for this session
|
|
self._edit_supported = False
|
|
except Exception as e:
|
|
logger.error("Stream send/edit error: %s", e)
|