"""Gateway streaming consumer — bridges sync agent callbacks to async platform delivery. The agent fires stream_delta_callback(text) synchronously from its worker thread. GatewayStreamConsumer: 1. Receives deltas via on_delta() (thread-safe, sync) 2. Queues them to an asyncio task via queue.Queue 3. The async run() task buffers, rate-limits, and progressively edits a single message on the target platform Design: Uses the edit transport (send initial message, then editMessageText). This is universally supported across Telegram, Discord, and Slack. Credit: jobless0x (#774, #1312), OutThisLife (#798), clicksingh (#697). """ from __future__ import annotations import asyncio import logging import queue import re import time from dataclasses import dataclass from typing import Any, Optional logger = logging.getLogger("gateway.stream_consumer") # Sentinel to signal the stream is complete _DONE = object() # Sentinel to signal a tool boundary — finalize current message and start a # new one so that subsequent text appears below tool progress messages. _NEW_SEGMENT = object() # Queue marker for a completed assistant commentary message emitted between # API/tool iterations (for example: "I'll inspect the repo first."). _COMMENTARY = object() @dataclass class StreamConsumerConfig: """Runtime config for a single stream consumer instance.""" edit_interval: float = 1.0 buffer_threshold: int = 40 cursor: str = " ▉" class GatewayStreamConsumer: """Async consumer that progressively edits a platform message with streamed tokens. Usage:: consumer = GatewayStreamConsumer(adapter, chat_id, config, metadata=metadata) # Pass consumer.on_delta as stream_delta_callback to AIAgent agent = AIAgent(..., stream_delta_callback=consumer.on_delta) # Start the consumer as an asyncio task task = asyncio.create_task(consumer.run()) # ... run agent in thread pool ... consumer.finish() # signal completion await task # wait for final edit """ # After this many consecutive flood-control failures, permanently disable # progressive edits for the remainder of the stream. _MAX_FLOOD_STRIKES = 3 # Reasoning/thinking tags that models emit inline in content. # Must stay in sync with cli.py _OPEN_TAGS/_CLOSE_TAGS and # run_agent.py _strip_think_blocks() tag variants. _OPEN_THINK_TAGS = ( "", "", "", "", "", "", ) _CLOSE_THINK_TAGS = ( "", "", "", "", "", "", ) def __init__( self, adapter: Any, chat_id: str, config: Optional[StreamConsumerConfig] = None, metadata: Optional[dict] = None, ): self.adapter = adapter self.chat_id = chat_id self.cfg = config or StreamConsumerConfig() self.metadata = metadata self._queue: queue.Queue = queue.Queue() self._accumulated = "" self._message_id: Optional[str] = None self._already_sent = False self._edit_supported = True # Disabled when progressive edits are no longer usable self._last_edit_time = 0.0 self._last_sent_text = "" # Track last-sent text to skip redundant edits self._fallback_final_send = False self._fallback_prefix = "" self._flood_strikes = 0 # Consecutive flood-control edit failures self._current_edit_interval = self.cfg.edit_interval # Adaptive backoff self._final_response_sent = False # Think-block filter state (mirrors CLI's _stream_delta tag suppression) self._in_think_block = False self._think_buffer = "" @property def already_sent(self) -> bool: """True if at least one message was sent or edited during the run.""" return self._already_sent @property def final_response_sent(self) -> bool: """True when the stream consumer delivered the final assistant reply.""" return self._final_response_sent def on_segment_break(self) -> None: """Finalize the current stream segment and start a fresh message.""" self._queue.put(_NEW_SEGMENT) def on_commentary(self, text: str) -> None: """Queue a completed interim assistant commentary message.""" if text: self._queue.put((_COMMENTARY, text)) def _reset_segment_state(self, *, preserve_no_edit: bool = False) -> None: if preserve_no_edit and self._message_id == "__no_edit__": return self._message_id = None self._accumulated = "" self._last_sent_text = "" self._fallback_final_send = False self._fallback_prefix = "" def on_delta(self, text: str) -> None: """Thread-safe callback — called from the agent's worker thread. When *text* is ``None``, signals a tool boundary: the current message is finalized and subsequent text will be sent as a new message so it appears below any tool-progress messages the gateway sent in between. """ if text: self._queue.put(text) elif text is None: self.on_segment_break() def finish(self) -> None: """Signal that the stream is complete.""" self._queue.put(_DONE) # ── Think-block filtering ──────────────────────────────────────── # Models like MiniMax emit inline ... blocks in their # content. The CLI's _stream_delta suppresses these via a state # machine; we do the same here so gateway users never see raw # reasoning tags. The agent also strips them from the final # response (run_agent.py _strip_think_blocks), but the stream # consumer sends intermediate edits before that stripping happens. def _filter_and_accumulate(self, text: str) -> None: """Add a text delta to the accumulated buffer, suppressing think blocks. Uses a state machine that tracks whether we are inside a reasoning/thinking block. Text inside such blocks is silently discarded. Partial tags at buffer boundaries are held back in ``_think_buffer`` until enough characters arrive to decide. """ buf = self._think_buffer + text self._think_buffer = "" while buf: if self._in_think_block: # Look for the earliest closing tag best_idx = -1 best_len = 0 for tag in self._CLOSE_THINK_TAGS: idx = buf.find(tag) if idx != -1 and (best_idx == -1 or idx < best_idx): best_idx = idx best_len = len(tag) if best_len: # Found closing tag — discard block, process remainder self._in_think_block = False buf = buf[best_idx + best_len:] else: # No closing tag yet — hold tail that could be a # partial closing tag prefix, discard the rest. max_tag = max(len(t) for t in self._CLOSE_THINK_TAGS) self._think_buffer = buf[-max_tag:] if len(buf) > max_tag else buf return else: # Look for earliest opening tag at a block boundary # (start of text / preceded by newline + optional whitespace). # This prevents false positives when models *mention* tags # in prose (e.g. "the tag is used for…"). best_idx = -1 best_len = 0 for tag in self._OPEN_THINK_TAGS: search_start = 0 while True: idx = buf.find(tag, search_start) if idx == -1: break # Block-boundary check (mirrors cli.py logic) if idx == 0: is_boundary = ( not self._accumulated or self._accumulated.endswith("\n") ) else: preceding = buf[:idx] last_nl = preceding.rfind("\n") if last_nl == -1: is_boundary = ( (not self._accumulated or self._accumulated.endswith("\n")) and preceding.strip() == "" ) else: is_boundary = preceding[last_nl + 1:].strip() == "" if is_boundary and (best_idx == -1 or idx < best_idx): best_idx = idx best_len = len(tag) break # first boundary hit for this tag is enough search_start = idx + 1 if best_len: # Emit text before the tag, enter think block self._accumulated += buf[:best_idx] self._in_think_block = True buf = buf[best_idx + best_len:] else: # No opening tag — check for a partial tag at the tail held_back = 0 for tag in self._OPEN_THINK_TAGS: for i in range(1, len(tag)): if buf.endswith(tag[:i]) and i > held_back: held_back = i if held_back: self._accumulated += buf[:-held_back] self._think_buffer = buf[-held_back:] else: self._accumulated += buf return def _flush_think_buffer(self) -> None: """Flush any held-back partial-tag buffer into accumulated text. Called when the stream ends (got_done) so that partial text that was held back waiting for a possible opening tag is not lost. """ if self._think_buffer and not self._in_think_block: self._accumulated += self._think_buffer self._think_buffer = "" async def run(self) -> None: """Async task that drains the queue and edits the platform message.""" # Platform message length limit — leave room for cursor + formatting _raw_limit = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096) _safe_limit = max(500, _raw_limit - len(self.cfg.cursor) - 100) try: while True: # Drain all available items from the queue got_done = False got_segment_break = False commentary_text = None while True: try: item = self._queue.get_nowait() if item is _DONE: got_done = True break if item is _NEW_SEGMENT: got_segment_break = True break if isinstance(item, tuple) and len(item) == 2 and item[0] is _COMMENTARY: commentary_text = item[1] break self._filter_and_accumulate(item) except queue.Empty: break # Flush any held-back partial-tag buffer on stream end # so trailing text that was waiting for a potential open # tag is not lost. if got_done: self._flush_think_buffer() # Decide whether to flush an edit now = time.monotonic() elapsed = now - self._last_edit_time should_edit = ( got_done or got_segment_break or commentary_text is not None or (elapsed >= self._current_edit_interval and self._accumulated) or len(self._accumulated) >= self.cfg.buffer_threshold ) current_update_visible = False if should_edit and self._accumulated: # Split overflow: if accumulated text exceeds the platform # limit, split into properly sized chunks. if ( len(self._accumulated) > _safe_limit and self._message_id is None ): # No existing message to edit (first message or after a # segment break). Use truncate_message — the same # helper the non-streaming path uses — to split with # proper word/code-fence boundaries and chunk # indicators like "(1/2)". chunks = self.adapter.truncate_message( self._accumulated, _safe_limit ) for chunk in chunks: await self._send_new_chunk(chunk, self._message_id) self._accumulated = "" self._last_sent_text = "" self._last_edit_time = time.monotonic() if got_done: self._final_response_sent = self._already_sent return if got_segment_break: self._message_id = None self._fallback_final_send = False self._fallback_prefix = "" continue # Existing message: edit it with the first chunk, then # start a new message for the overflow remainder. while ( len(self._accumulated) > _safe_limit and self._message_id is not None and self._edit_supported ): split_at = self._accumulated.rfind("\n", 0, _safe_limit) if split_at < _safe_limit // 2: split_at = _safe_limit chunk = self._accumulated[:split_at] ok = await self._send_or_edit(chunk) if self._fallback_final_send or not ok: # Edit failed (or backed off due to flood control) # while attempting to split an oversized message. # Keep the full accumulated text intact so the # fallback final-send path can deliver the remaining # continuation without dropping content. break self._accumulated = self._accumulated[split_at:].lstrip("\n") self._message_id = None self._last_sent_text = "" display_text = self._accumulated if not got_done and not got_segment_break and commentary_text is None: display_text += self.cfg.cursor current_update_visible = await self._send_or_edit(display_text) self._last_edit_time = time.monotonic() if got_done: # Final edit without cursor. If progressive editing failed # mid-stream, send a single continuation/fallback message # here instead of letting the base gateway path send the # full response again. if self._accumulated: if self._fallback_final_send: await self._send_fallback_final(self._accumulated) elif current_update_visible: self._final_response_sent = True elif self._message_id: self._final_response_sent = await self._send_or_edit(self._accumulated) elif not self._already_sent: self._final_response_sent = await self._send_or_edit(self._accumulated) return if commentary_text is not None: self._reset_segment_state() await self._send_commentary(commentary_text) self._last_edit_time = time.monotonic() self._reset_segment_state() # Tool boundary: reset message state so the next text chunk # creates a fresh message below any tool-progress messages. # # Exception: when _message_id is "__no_edit__" the platform # never returned a real message ID (e.g. Signal, webhook with # github_comment delivery). Resetting to None would re-enter # the "first send" path on every tool boundary and post one # platform message per tool call — that is what caused 155 # comments under a single PR. Instead, preserve the sentinel # so the full continuation is delivered once via # _send_fallback_final. # (When editing fails mid-stream due to flood control the id is # a real string like "msg_1", not "__no_edit__", so that case # still resets and creates a fresh segment as intended.) if got_segment_break: self._reset_segment_state(preserve_no_edit=True) await asyncio.sleep(0.05) # Small yield to not busy-loop except asyncio.CancelledError: # Best-effort final edit on cancellation if self._accumulated and self._message_id: try: await self._send_or_edit(self._accumulated) except Exception: pass # If we delivered any content before being cancelled, mark the # final response as sent so the gateway's already_sent check # doesn't trigger a duplicate message. The 5-second # stream_task timeout (gateway/run.py) can cancel us while # waiting on a slow Telegram API call — without this flag the # gateway falls through to the normal send path. if self._already_sent: self._final_response_sent = True except Exception as e: logger.error("Stream consumer error: %s", e) # Pattern to strip MEDIA: tags (including optional surrounding quotes). # Matches the simple cleanup regex used by the non-streaming path in # gateway/platforms/base.py for post-processing. _MEDIA_RE = re.compile(r'''[`"']?MEDIA:\s*\S+[`"']?''') @staticmethod def _clean_for_display(text: str) -> str: """Strip MEDIA: directives and internal markers from text before display. The streaming path delivers raw text chunks that may include ``MEDIA:`` tags and ``[[audio_as_voice]]`` directives meant for the platform adapter's post-processing. The actual media files are delivered separately via ``_deliver_media_from_response()`` after the stream finishes — we just need to hide the raw directives from the user. """ if "MEDIA:" not in text and "[[audio_as_voice]]" not in text: return text cleaned = text.replace("[[audio_as_voice]]", "") cleaned = GatewayStreamConsumer._MEDIA_RE.sub("", cleaned) # Collapse excessive blank lines left behind by removed tags cleaned = re.sub(r'\n{3,}', '\n\n', cleaned) # Strip trailing whitespace/newlines but preserve leading content return cleaned.rstrip() async def _send_new_chunk(self, text: str, reply_to_id: Optional[str]) -> Optional[str]: """Send a new message chunk, optionally threaded to a previous message. Returns the message_id so callers can thread subsequent chunks. """ text = self._clean_for_display(text) if not text.strip(): return reply_to_id try: meta = dict(self.metadata) if self.metadata else {} result = await self.adapter.send( chat_id=self.chat_id, content=text, reply_to=reply_to_id, metadata=meta, ) if result.success and result.message_id: self._message_id = str(result.message_id) self._already_sent = True self._last_sent_text = text return str(result.message_id) else: self._edit_supported = False return reply_to_id except Exception as e: logger.error("Stream send chunk error: %s", e) return reply_to_id def _visible_prefix(self) -> str: """Return the visible text already shown in the streamed message.""" prefix = self._last_sent_text or "" if self.cfg.cursor and prefix.endswith(self.cfg.cursor): prefix = prefix[:-len(self.cfg.cursor)] return self._clean_for_display(prefix) def _continuation_text(self, final_text: str) -> str: """Return only the part of final_text the user has not already seen.""" prefix = self._fallback_prefix or self._visible_prefix() if prefix and final_text.startswith(prefix): return final_text[len(prefix):].lstrip() return final_text @staticmethod def _split_text_chunks(text: str, limit: int) -> list[str]: """Split text into reasonably sized chunks for fallback sends.""" if len(text) <= limit: return [text] chunks: list[str] = [] remaining = text while len(remaining) > limit: split_at = remaining.rfind("\n", 0, limit) if split_at < limit // 2: split_at = limit chunks.append(remaining[:split_at]) remaining = remaining[split_at:].lstrip("\n") if remaining: chunks.append(remaining) return chunks async def _send_fallback_final(self, text: str) -> None: """Send the final continuation after streaming edits stop working. Retries each chunk once on flood-control failures with a short delay. """ final_text = self._clean_for_display(text) continuation = self._continuation_text(final_text) self._fallback_final_send = False if not continuation.strip(): # Nothing new to send — the visible partial already matches final text. self._already_sent = True self._final_response_sent = True return raw_limit = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096) safe_limit = max(500, raw_limit - 100) chunks = self._split_text_chunks(continuation, safe_limit) last_message_id: Optional[str] = None last_successful_chunk = "" sent_any_chunk = False for chunk in chunks: # Try sending with one retry on flood-control errors. result = None for attempt in range(2): result = await self.adapter.send( chat_id=self.chat_id, content=chunk, metadata=self.metadata, ) if result.success: break if attempt == 0 and self._is_flood_error(result): logger.debug( "Flood control on fallback send, retrying in 3s" ) await asyncio.sleep(3.0) else: break # non-flood error or second attempt failed if not result or not result.success: if sent_any_chunk: # Some continuation text already reached the user. Suppress # the base gateway final-send path so we don't resend the # full response and create another duplicate. self._already_sent = True self._final_response_sent = True self._message_id = last_message_id self._last_sent_text = last_successful_chunk self._fallback_prefix = "" return # No fallback chunk reached the user — allow the normal gateway # final-send path to try one more time. self._already_sent = False self._message_id = None self._last_sent_text = "" self._fallback_prefix = "" return sent_any_chunk = True last_successful_chunk = chunk last_message_id = result.message_id or last_message_id self._message_id = last_message_id self._already_sent = True self._final_response_sent = True self._last_sent_text = chunks[-1] self._fallback_prefix = "" def _is_flood_error(self, result) -> bool: """Check if a SendResult failure is due to flood control / rate limiting.""" err = getattr(result, "error", "") or "" err_lower = err.lower() return "flood" in err_lower or "retry after" in err_lower or "rate" in err_lower async def _try_strip_cursor(self) -> None: """Best-effort edit to remove the cursor from the last visible message. Called when entering fallback mode so the user doesn't see a stuck cursor (▉) in the partial message. """ if not self._message_id or self._message_id == "__no_edit__": return prefix = self._visible_prefix() if not prefix or not prefix.strip(): return try: await self.adapter.edit_message( chat_id=self.chat_id, message_id=self._message_id, content=prefix, ) self._last_sent_text = prefix except Exception: pass # best-effort — don't let this block the fallback path async def _send_commentary(self, text: str) -> bool: """Send a completed interim assistant commentary message.""" text = self._clean_for_display(text) if not text.strip(): return False try: result = await self.adapter.send( chat_id=self.chat_id, content=text, metadata=self.metadata, ) if result.success: self._already_sent = True return True except Exception as e: logger.error("Commentary send error: %s", e) return False async def _send_or_edit(self, text: str) -> bool: """Send or edit the streaming message. Returns True if the text was successfully delivered (sent or edited), False otherwise. Callers like the overflow split loop use this to decide whether to advance past the delivered chunk. """ # Strip MEDIA: directives so they don't appear as visible text. # Media files are delivered as native attachments after the stream # finishes (via _deliver_media_from_response in gateway/run.py). text = self._clean_for_display(text) # A bare streaming cursor is not meaningful user-visible content and # can render as a stray tofu/white-box message on some clients. visible_without_cursor = text if self.cfg.cursor: visible_without_cursor = visible_without_cursor.replace(self.cfg.cursor, "") _visible_stripped = visible_without_cursor.strip() if not _visible_stripped: return True # cursor-only / whitespace-only update if not text.strip(): return True # nothing to send is "success" # Guard: do not create a brand-new standalone message when the only # visible content is a handful of characters alongside the streaming # cursor. During rapid tool-calling the model often emits 1-2 tokens # before switching to tool calls; the resulting "X ▉" message risks # leaving the cursor permanently visible if the follow-up edit (to # strip the cursor on segment break) is rate-limited by the platform. # This was reported on Telegram, Matrix, and other clients where the # ▉ block character renders as a visible white box ("tofu"). # Existing messages (edits) are unaffected — only first sends gated. _MIN_NEW_MSG_CHARS = 4 if (self._message_id is None and self.cfg.cursor and self.cfg.cursor in text and len(_visible_stripped) < _MIN_NEW_MSG_CHARS): return True # too short for a standalone message — accumulate more try: if self._message_id is not None: if self._edit_supported: # Skip if text is identical to what we last sent if text == self._last_sent_text: return True # Edit existing message result = await self.adapter.edit_message( chat_id=self.chat_id, message_id=self._message_id, content=text, ) if result.success: self._already_sent = True self._last_sent_text = text # Successful edit — reset flood strike counter self._flood_strikes = 0 return True else: # Edit failed. If this looks like flood control / rate # limiting, use adaptive backoff: double the edit interval # and retry on the next cycle. Only permanently disable # edits after _MAX_FLOOD_STRIKES consecutive failures. if self._is_flood_error(result): self._flood_strikes += 1 self._current_edit_interval = min( self._current_edit_interval * 2, 10.0, ) logger.debug( "Flood control on edit (strike %d/%d), " "backoff interval → %.1fs", self._flood_strikes, self._MAX_FLOOD_STRIKES, self._current_edit_interval, ) if self._flood_strikes < self._MAX_FLOOD_STRIKES: # Don't disable edits yet — just slow down. # Update _last_edit_time so the next edit # respects the new interval. self._last_edit_time = time.monotonic() return False # Non-flood error OR flood strikes exhausted: enter # fallback mode — send only the missing tail once the # final response is available. logger.debug( "Edit failed (strikes=%d), entering fallback mode", self._flood_strikes, ) self._fallback_prefix = self._visible_prefix() self._fallback_final_send = True self._edit_supported = False self._already_sent = True # Best-effort: strip the cursor from the last visible # message so the user doesn't see a stuck ▉. await self._try_strip_cursor() return False else: # Editing not supported — skip intermediate updates. # The final response will be sent by the fallback path. return False else: # First message — send new result = await self.adapter.send( chat_id=self.chat_id, content=text, metadata=self.metadata, ) if result.success: if result.message_id: self._message_id = result.message_id else: self._edit_supported = False self._already_sent = True self._last_sent_text = text if not result.message_id: self._fallback_prefix = self._visible_prefix() self._fallback_final_send = True # Sentinel prevents re-entering the first-send path on # every delta/tool boundary when platforms accept a # message but do not return an editable message id. self._message_id = "__no_edit__" return True else: # Initial send failed — disable streaming for this session self._edit_supported = False return False except Exception as e: logger.error("Stream send/edit error: %s", e) return False