During rapid tool-calling, the model often emits 1-2 tokens before switching to tool calls. The stream consumer would create a new message with 'X ▉' (short text + cursor), and if the follow-up edit to strip the cursor was rate-limited by the platform, the cursor remained as a permanent standalone message — reported on Telegram as 'white box' artifacts. Add a minimum-content guard in _send_or_edit: when creating a new standalone message (no existing message_id), require at least 4 visible characters alongside the cursor before sending. Shorter text accumulates into the next streaming segment instead. This prevents cursor-only 'tofu' messages across all platforms without affecting normal streaming (edits to existing messages, final sends without cursor, and messages with substantial text are all unaffected). Reported by @michalkomar on X.
745 lines
33 KiB
Python
745 lines
33 KiB
Python
"""Gateway streaming consumer — bridges sync agent callbacks to async platform delivery.
|
|
|
|
The agent fires stream_delta_callback(text) synchronously from its worker thread.
|
|
GatewayStreamConsumer:
|
|
1. Receives deltas via on_delta() (thread-safe, sync)
|
|
2. Queues them to an asyncio task via queue.Queue
|
|
3. The async run() task buffers, rate-limits, and progressively edits
|
|
a single message on the target platform
|
|
|
|
Design: Uses the edit transport (send initial message, then editMessageText).
|
|
This is universally supported across Telegram, Discord, and Slack.
|
|
|
|
Credit: jobless0x (#774, #1312), OutThisLife (#798), clicksingh (#697).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import queue
|
|
import re
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Any, Optional
|
|
|
|
logger = logging.getLogger("gateway.stream_consumer")
|
|
|
|
# Sentinel to signal the stream is complete
|
|
_DONE = object()
|
|
|
|
# Sentinel to signal a tool boundary — finalize current message and start a
|
|
# new one so that subsequent text appears below tool progress messages.
|
|
_NEW_SEGMENT = object()
|
|
|
|
# Queue marker for a completed assistant commentary message emitted between
|
|
# API/tool iterations (for example: "I'll inspect the repo first.").
|
|
_COMMENTARY = object()
|
|
|
|
|
|
@dataclass
|
|
class StreamConsumerConfig:
|
|
"""Runtime config for a single stream consumer instance."""
|
|
edit_interval: float = 1.0
|
|
buffer_threshold: int = 40
|
|
cursor: str = " ▉"
|
|
|
|
|
|
class GatewayStreamConsumer:
|
|
"""Async consumer that progressively edits a platform message with streamed tokens.
|
|
|
|
Usage::
|
|
|
|
consumer = GatewayStreamConsumer(adapter, chat_id, config, metadata=metadata)
|
|
# Pass consumer.on_delta as stream_delta_callback to AIAgent
|
|
agent = AIAgent(..., stream_delta_callback=consumer.on_delta)
|
|
# Start the consumer as an asyncio task
|
|
task = asyncio.create_task(consumer.run())
|
|
# ... run agent in thread pool ...
|
|
consumer.finish() # signal completion
|
|
await task # wait for final edit
|
|
"""
|
|
|
|
# After this many consecutive flood-control failures, permanently disable
|
|
# progressive edits for the remainder of the stream.
|
|
_MAX_FLOOD_STRIKES = 3
|
|
|
|
# Reasoning/thinking tags that models emit inline in content.
|
|
# Must stay in sync with cli.py _OPEN_TAGS/_CLOSE_TAGS and
|
|
# run_agent.py _strip_think_blocks() tag variants.
|
|
_OPEN_THINK_TAGS = (
|
|
"<REASONING_SCRATCHPAD>", "<think>", "<reasoning>",
|
|
"<THINKING>", "<thinking>", "<thought>",
|
|
)
|
|
_CLOSE_THINK_TAGS = (
|
|
"</REASONING_SCRATCHPAD>", "</think>", "</reasoning>",
|
|
"</THINKING>", "</thinking>", "</thought>",
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
adapter: Any,
|
|
chat_id: str,
|
|
config: Optional[StreamConsumerConfig] = None,
|
|
metadata: Optional[dict] = None,
|
|
):
|
|
self.adapter = adapter
|
|
self.chat_id = chat_id
|
|
self.cfg = config or StreamConsumerConfig()
|
|
self.metadata = metadata
|
|
self._queue: queue.Queue = queue.Queue()
|
|
self._accumulated = ""
|
|
self._message_id: Optional[str] = None
|
|
self._already_sent = False
|
|
self._edit_supported = True # Disabled when progressive edits are no longer usable
|
|
self._last_edit_time = 0.0
|
|
self._last_sent_text = "" # Track last-sent text to skip redundant edits
|
|
self._fallback_final_send = False
|
|
self._fallback_prefix = ""
|
|
self._flood_strikes = 0 # Consecutive flood-control edit failures
|
|
self._current_edit_interval = self.cfg.edit_interval # Adaptive backoff
|
|
self._final_response_sent = False
|
|
|
|
# Think-block filter state (mirrors CLI's _stream_delta tag suppression)
|
|
self._in_think_block = False
|
|
self._think_buffer = ""
|
|
|
|
@property
|
|
def already_sent(self) -> bool:
|
|
"""True if at least one message was sent or edited during the run."""
|
|
return self._already_sent
|
|
|
|
@property
|
|
def final_response_sent(self) -> bool:
|
|
"""True when the stream consumer delivered the final assistant reply."""
|
|
return self._final_response_sent
|
|
|
|
def on_segment_break(self) -> None:
|
|
"""Finalize the current stream segment and start a fresh message."""
|
|
self._queue.put(_NEW_SEGMENT)
|
|
|
|
def on_commentary(self, text: str) -> None:
|
|
"""Queue a completed interim assistant commentary message."""
|
|
if text:
|
|
self._queue.put((_COMMENTARY, text))
|
|
|
|
def _reset_segment_state(self, *, preserve_no_edit: bool = False) -> None:
|
|
if preserve_no_edit and self._message_id == "__no_edit__":
|
|
return
|
|
self._message_id = None
|
|
self._accumulated = ""
|
|
self._last_sent_text = ""
|
|
self._fallback_final_send = False
|
|
self._fallback_prefix = ""
|
|
|
|
def on_delta(self, text: str) -> None:
|
|
"""Thread-safe callback — called from the agent's worker thread.
|
|
|
|
When *text* is ``None``, signals a tool boundary: the current message
|
|
is finalized and subsequent text will be sent as a new message so it
|
|
appears below any tool-progress messages the gateway sent in between.
|
|
"""
|
|
if text:
|
|
self._queue.put(text)
|
|
elif text is None:
|
|
self.on_segment_break()
|
|
|
|
def finish(self) -> None:
|
|
"""Signal that the stream is complete."""
|
|
self._queue.put(_DONE)
|
|
|
|
# ── Think-block filtering ────────────────────────────────────────
|
|
# Models like MiniMax emit inline <think>...</think> blocks in their
|
|
# content. The CLI's _stream_delta suppresses these via a state
|
|
# machine; we do the same here so gateway users never see raw
|
|
# reasoning tags. The agent also strips them from the final
|
|
# response (run_agent.py _strip_think_blocks), but the stream
|
|
# consumer sends intermediate edits before that stripping happens.
|
|
|
|
def _filter_and_accumulate(self, text: str) -> None:
|
|
"""Add a text delta to the accumulated buffer, suppressing think blocks.
|
|
|
|
Uses a state machine that tracks whether we are inside a
|
|
reasoning/thinking block. Text inside such blocks is silently
|
|
discarded. Partial tags at buffer boundaries are held back in
|
|
``_think_buffer`` until enough characters arrive to decide.
|
|
"""
|
|
buf = self._think_buffer + text
|
|
self._think_buffer = ""
|
|
|
|
while buf:
|
|
if self._in_think_block:
|
|
# Look for the earliest closing tag
|
|
best_idx = -1
|
|
best_len = 0
|
|
for tag in self._CLOSE_THINK_TAGS:
|
|
idx = buf.find(tag)
|
|
if idx != -1 and (best_idx == -1 or idx < best_idx):
|
|
best_idx = idx
|
|
best_len = len(tag)
|
|
|
|
if best_len:
|
|
# Found closing tag — discard block, process remainder
|
|
self._in_think_block = False
|
|
buf = buf[best_idx + best_len:]
|
|
else:
|
|
# No closing tag yet — hold tail that could be a
|
|
# partial closing tag prefix, discard the rest.
|
|
max_tag = max(len(t) for t in self._CLOSE_THINK_TAGS)
|
|
self._think_buffer = buf[-max_tag:] if len(buf) > max_tag else buf
|
|
return
|
|
else:
|
|
# Look for earliest opening tag at a block boundary
|
|
# (start of text / preceded by newline + optional whitespace).
|
|
# This prevents false positives when models *mention* tags
|
|
# in prose (e.g. "the <think> tag is used for…").
|
|
best_idx = -1
|
|
best_len = 0
|
|
for tag in self._OPEN_THINK_TAGS:
|
|
search_start = 0
|
|
while True:
|
|
idx = buf.find(tag, search_start)
|
|
if idx == -1:
|
|
break
|
|
# Block-boundary check (mirrors cli.py logic)
|
|
if idx == 0:
|
|
is_boundary = (
|
|
not self._accumulated
|
|
or self._accumulated.endswith("\n")
|
|
)
|
|
else:
|
|
preceding = buf[:idx]
|
|
last_nl = preceding.rfind("\n")
|
|
if last_nl == -1:
|
|
is_boundary = (
|
|
(not self._accumulated
|
|
or self._accumulated.endswith("\n"))
|
|
and preceding.strip() == ""
|
|
)
|
|
else:
|
|
is_boundary = preceding[last_nl + 1:].strip() == ""
|
|
|
|
if is_boundary and (best_idx == -1 or idx < best_idx):
|
|
best_idx = idx
|
|
best_len = len(tag)
|
|
break # first boundary hit for this tag is enough
|
|
search_start = idx + 1
|
|
|
|
if best_len:
|
|
# Emit text before the tag, enter think block
|
|
self._accumulated += buf[:best_idx]
|
|
self._in_think_block = True
|
|
buf = buf[best_idx + best_len:]
|
|
else:
|
|
# No opening tag — check for a partial tag at the tail
|
|
held_back = 0
|
|
for tag in self._OPEN_THINK_TAGS:
|
|
for i in range(1, len(tag)):
|
|
if buf.endswith(tag[:i]) and i > held_back:
|
|
held_back = i
|
|
if held_back:
|
|
self._accumulated += buf[:-held_back]
|
|
self._think_buffer = buf[-held_back:]
|
|
else:
|
|
self._accumulated += buf
|
|
return
|
|
|
|
def _flush_think_buffer(self) -> None:
|
|
"""Flush any held-back partial-tag buffer into accumulated text.
|
|
|
|
Called when the stream ends (got_done) so that partial text that
|
|
was held back waiting for a possible opening tag is not lost.
|
|
"""
|
|
if self._think_buffer and not self._in_think_block:
|
|
self._accumulated += self._think_buffer
|
|
self._think_buffer = ""
|
|
|
|
async def run(self) -> None:
|
|
"""Async task that drains the queue and edits the platform message."""
|
|
# Platform message length limit — leave room for cursor + formatting
|
|
_raw_limit = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096)
|
|
_safe_limit = max(500, _raw_limit - len(self.cfg.cursor) - 100)
|
|
|
|
try:
|
|
while True:
|
|
# Drain all available items from the queue
|
|
got_done = False
|
|
got_segment_break = False
|
|
commentary_text = None
|
|
while True:
|
|
try:
|
|
item = self._queue.get_nowait()
|
|
if item is _DONE:
|
|
got_done = True
|
|
break
|
|
if item is _NEW_SEGMENT:
|
|
got_segment_break = True
|
|
break
|
|
if isinstance(item, tuple) and len(item) == 2 and item[0] is _COMMENTARY:
|
|
commentary_text = item[1]
|
|
break
|
|
self._filter_and_accumulate(item)
|
|
except queue.Empty:
|
|
break
|
|
|
|
# Flush any held-back partial-tag buffer on stream end
|
|
# so trailing text that was waiting for a potential open
|
|
# tag is not lost.
|
|
if got_done:
|
|
self._flush_think_buffer()
|
|
|
|
# Decide whether to flush an edit
|
|
now = time.monotonic()
|
|
elapsed = now - self._last_edit_time
|
|
should_edit = (
|
|
got_done
|
|
or got_segment_break
|
|
or commentary_text is not None
|
|
or (elapsed >= self._current_edit_interval
|
|
and self._accumulated)
|
|
or len(self._accumulated) >= self.cfg.buffer_threshold
|
|
)
|
|
|
|
current_update_visible = False
|
|
if should_edit and self._accumulated:
|
|
# Split overflow: if accumulated text exceeds the platform
|
|
# limit, split into properly sized chunks.
|
|
if (
|
|
len(self._accumulated) > _safe_limit
|
|
and self._message_id is None
|
|
):
|
|
# No existing message to edit (first message or after a
|
|
# segment break). Use truncate_message — the same
|
|
# helper the non-streaming path uses — to split with
|
|
# proper word/code-fence boundaries and chunk
|
|
# indicators like "(1/2)".
|
|
chunks = self.adapter.truncate_message(
|
|
self._accumulated, _safe_limit
|
|
)
|
|
for chunk in chunks:
|
|
await self._send_new_chunk(chunk, self._message_id)
|
|
self._accumulated = ""
|
|
self._last_sent_text = ""
|
|
self._last_edit_time = time.monotonic()
|
|
if got_done:
|
|
self._final_response_sent = self._already_sent
|
|
return
|
|
if got_segment_break:
|
|
self._message_id = None
|
|
self._fallback_final_send = False
|
|
self._fallback_prefix = ""
|
|
continue
|
|
|
|
# Existing message: edit it with the first chunk, then
|
|
# start a new message for the overflow remainder.
|
|
while (
|
|
len(self._accumulated) > _safe_limit
|
|
and self._message_id is not None
|
|
and self._edit_supported
|
|
):
|
|
split_at = self._accumulated.rfind("\n", 0, _safe_limit)
|
|
if split_at < _safe_limit // 2:
|
|
split_at = _safe_limit
|
|
chunk = self._accumulated[:split_at]
|
|
ok = await self._send_or_edit(chunk)
|
|
if self._fallback_final_send or not ok:
|
|
# Edit failed (or backed off due to flood control)
|
|
# while attempting to split an oversized message.
|
|
# Keep the full accumulated text intact so the
|
|
# fallback final-send path can deliver the remaining
|
|
# continuation without dropping content.
|
|
break
|
|
self._accumulated = self._accumulated[split_at:].lstrip("\n")
|
|
self._message_id = None
|
|
self._last_sent_text = ""
|
|
|
|
display_text = self._accumulated
|
|
if not got_done and not got_segment_break and commentary_text is None:
|
|
display_text += self.cfg.cursor
|
|
|
|
current_update_visible = await self._send_or_edit(display_text)
|
|
self._last_edit_time = time.monotonic()
|
|
|
|
if got_done:
|
|
# Final edit without cursor. If progressive editing failed
|
|
# mid-stream, send a single continuation/fallback message
|
|
# here instead of letting the base gateway path send the
|
|
# full response again.
|
|
if self._accumulated:
|
|
if self._fallback_final_send:
|
|
await self._send_fallback_final(self._accumulated)
|
|
elif current_update_visible:
|
|
self._final_response_sent = True
|
|
elif self._message_id:
|
|
self._final_response_sent = await self._send_or_edit(self._accumulated)
|
|
elif not self._already_sent:
|
|
self._final_response_sent = await self._send_or_edit(self._accumulated)
|
|
return
|
|
|
|
if commentary_text is not None:
|
|
self._reset_segment_state()
|
|
await self._send_commentary(commentary_text)
|
|
self._last_edit_time = time.monotonic()
|
|
self._reset_segment_state()
|
|
|
|
# Tool boundary: reset message state so the next text chunk
|
|
# creates a fresh message below any tool-progress messages.
|
|
#
|
|
# Exception: when _message_id is "__no_edit__" the platform
|
|
# never returned a real message ID (e.g. Signal, webhook with
|
|
# github_comment delivery). Resetting to None would re-enter
|
|
# the "first send" path on every tool boundary and post one
|
|
# platform message per tool call — that is what caused 155
|
|
# comments under a single PR. Instead, preserve the sentinel
|
|
# so the full continuation is delivered once via
|
|
# _send_fallback_final.
|
|
# (When editing fails mid-stream due to flood control the id is
|
|
# a real string like "msg_1", not "__no_edit__", so that case
|
|
# still resets and creates a fresh segment as intended.)
|
|
if got_segment_break:
|
|
self._reset_segment_state(preserve_no_edit=True)
|
|
|
|
await asyncio.sleep(0.05) # Small yield to not busy-loop
|
|
|
|
except asyncio.CancelledError:
|
|
# Best-effort final edit on cancellation
|
|
if self._accumulated and self._message_id:
|
|
try:
|
|
await self._send_or_edit(self._accumulated)
|
|
except Exception:
|
|
pass
|
|
# If we delivered any content before being cancelled, mark the
|
|
# final response as sent so the gateway's already_sent check
|
|
# doesn't trigger a duplicate message. The 5-second
|
|
# stream_task timeout (gateway/run.py) can cancel us while
|
|
# waiting on a slow Telegram API call — without this flag the
|
|
# gateway falls through to the normal send path.
|
|
if self._already_sent:
|
|
self._final_response_sent = True
|
|
except Exception as e:
|
|
logger.error("Stream consumer error: %s", e)
|
|
|
|
# Pattern to strip MEDIA:<path> tags (including optional surrounding quotes).
|
|
# Matches the simple cleanup regex used by the non-streaming path in
|
|
# gateway/platforms/base.py for post-processing.
|
|
_MEDIA_RE = re.compile(r'''[`"']?MEDIA:\s*\S+[`"']?''')
|
|
|
|
@staticmethod
|
|
def _clean_for_display(text: str) -> str:
|
|
"""Strip MEDIA: directives and internal markers from text before display.
|
|
|
|
The streaming path delivers raw text chunks that may include
|
|
``MEDIA:<path>`` tags and ``[[audio_as_voice]]`` directives meant for
|
|
the platform adapter's post-processing. The actual media files are
|
|
delivered separately via ``_deliver_media_from_response()`` after the
|
|
stream finishes — we just need to hide the raw directives from the
|
|
user.
|
|
"""
|
|
if "MEDIA:" not in text and "[[audio_as_voice]]" not in text:
|
|
return text
|
|
cleaned = text.replace("[[audio_as_voice]]", "")
|
|
cleaned = GatewayStreamConsumer._MEDIA_RE.sub("", cleaned)
|
|
# Collapse excessive blank lines left behind by removed tags
|
|
cleaned = re.sub(r'\n{3,}', '\n\n', cleaned)
|
|
# Strip trailing whitespace/newlines but preserve leading content
|
|
return cleaned.rstrip()
|
|
|
|
async def _send_new_chunk(self, text: str, reply_to_id: Optional[str]) -> Optional[str]:
|
|
"""Send a new message chunk, optionally threaded to a previous message.
|
|
|
|
Returns the message_id so callers can thread subsequent chunks.
|
|
"""
|
|
text = self._clean_for_display(text)
|
|
if not text.strip():
|
|
return reply_to_id
|
|
try:
|
|
meta = dict(self.metadata) if self.metadata else {}
|
|
result = await self.adapter.send(
|
|
chat_id=self.chat_id,
|
|
content=text,
|
|
reply_to=reply_to_id,
|
|
metadata=meta,
|
|
)
|
|
if result.success and result.message_id:
|
|
self._message_id = str(result.message_id)
|
|
self._already_sent = True
|
|
self._last_sent_text = text
|
|
return str(result.message_id)
|
|
else:
|
|
self._edit_supported = False
|
|
return reply_to_id
|
|
except Exception as e:
|
|
logger.error("Stream send chunk error: %s", e)
|
|
return reply_to_id
|
|
|
|
def _visible_prefix(self) -> str:
|
|
"""Return the visible text already shown in the streamed message."""
|
|
prefix = self._last_sent_text or ""
|
|
if self.cfg.cursor and prefix.endswith(self.cfg.cursor):
|
|
prefix = prefix[:-len(self.cfg.cursor)]
|
|
return self._clean_for_display(prefix)
|
|
|
|
def _continuation_text(self, final_text: str) -> str:
|
|
"""Return only the part of final_text the user has not already seen."""
|
|
prefix = self._fallback_prefix or self._visible_prefix()
|
|
if prefix and final_text.startswith(prefix):
|
|
return final_text[len(prefix):].lstrip()
|
|
return final_text
|
|
|
|
@staticmethod
|
|
def _split_text_chunks(text: str, limit: int) -> list[str]:
|
|
"""Split text into reasonably sized chunks for fallback sends."""
|
|
if len(text) <= limit:
|
|
return [text]
|
|
chunks: list[str] = []
|
|
remaining = text
|
|
while len(remaining) > limit:
|
|
split_at = remaining.rfind("\n", 0, limit)
|
|
if split_at < limit // 2:
|
|
split_at = limit
|
|
chunks.append(remaining[:split_at])
|
|
remaining = remaining[split_at:].lstrip("\n")
|
|
if remaining:
|
|
chunks.append(remaining)
|
|
return chunks
|
|
|
|
async def _send_fallback_final(self, text: str) -> None:
|
|
"""Send the final continuation after streaming edits stop working.
|
|
|
|
Retries each chunk once on flood-control failures with a short delay.
|
|
"""
|
|
final_text = self._clean_for_display(text)
|
|
continuation = self._continuation_text(final_text)
|
|
self._fallback_final_send = False
|
|
if not continuation.strip():
|
|
# Nothing new to send — the visible partial already matches final text.
|
|
self._already_sent = True
|
|
self._final_response_sent = True
|
|
return
|
|
|
|
raw_limit = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096)
|
|
safe_limit = max(500, raw_limit - 100)
|
|
chunks = self._split_text_chunks(continuation, safe_limit)
|
|
|
|
last_message_id: Optional[str] = None
|
|
last_successful_chunk = ""
|
|
sent_any_chunk = False
|
|
for chunk in chunks:
|
|
# Try sending with one retry on flood-control errors.
|
|
result = None
|
|
for attempt in range(2):
|
|
result = await self.adapter.send(
|
|
chat_id=self.chat_id,
|
|
content=chunk,
|
|
metadata=self.metadata,
|
|
)
|
|
if result.success:
|
|
break
|
|
if attempt == 0 and self._is_flood_error(result):
|
|
logger.debug(
|
|
"Flood control on fallback send, retrying in 3s"
|
|
)
|
|
await asyncio.sleep(3.0)
|
|
else:
|
|
break # non-flood error or second attempt failed
|
|
|
|
if not result or not result.success:
|
|
if sent_any_chunk:
|
|
# Some continuation text already reached the user. Suppress
|
|
# the base gateway final-send path so we don't resend the
|
|
# full response and create another duplicate.
|
|
self._already_sent = True
|
|
self._final_response_sent = True
|
|
self._message_id = last_message_id
|
|
self._last_sent_text = last_successful_chunk
|
|
self._fallback_prefix = ""
|
|
return
|
|
# No fallback chunk reached the user — allow the normal gateway
|
|
# final-send path to try one more time.
|
|
self._already_sent = False
|
|
self._message_id = None
|
|
self._last_sent_text = ""
|
|
self._fallback_prefix = ""
|
|
return
|
|
sent_any_chunk = True
|
|
last_successful_chunk = chunk
|
|
last_message_id = result.message_id or last_message_id
|
|
|
|
self._message_id = last_message_id
|
|
self._already_sent = True
|
|
self._final_response_sent = True
|
|
self._last_sent_text = chunks[-1]
|
|
self._fallback_prefix = ""
|
|
|
|
def _is_flood_error(self, result) -> bool:
|
|
"""Check if a SendResult failure is due to flood control / rate limiting."""
|
|
err = getattr(result, "error", "") or ""
|
|
err_lower = err.lower()
|
|
return "flood" in err_lower or "retry after" in err_lower or "rate" in err_lower
|
|
|
|
async def _try_strip_cursor(self) -> None:
|
|
"""Best-effort edit to remove the cursor from the last visible message.
|
|
|
|
Called when entering fallback mode so the user doesn't see a stuck
|
|
cursor (▉) in the partial message.
|
|
"""
|
|
if not self._message_id or self._message_id == "__no_edit__":
|
|
return
|
|
prefix = self._visible_prefix()
|
|
if not prefix or not prefix.strip():
|
|
return
|
|
try:
|
|
await self.adapter.edit_message(
|
|
chat_id=self.chat_id,
|
|
message_id=self._message_id,
|
|
content=prefix,
|
|
)
|
|
self._last_sent_text = prefix
|
|
except Exception:
|
|
pass # best-effort — don't let this block the fallback path
|
|
|
|
async def _send_commentary(self, text: str) -> bool:
|
|
"""Send a completed interim assistant commentary message."""
|
|
text = self._clean_for_display(text)
|
|
if not text.strip():
|
|
return False
|
|
try:
|
|
result = await self.adapter.send(
|
|
chat_id=self.chat_id,
|
|
content=text,
|
|
metadata=self.metadata,
|
|
)
|
|
if result.success:
|
|
self._already_sent = True
|
|
return True
|
|
except Exception as e:
|
|
logger.error("Commentary send error: %s", e)
|
|
return False
|
|
|
|
async def _send_or_edit(self, text: str) -> bool:
|
|
"""Send or edit the streaming message.
|
|
|
|
Returns True if the text was successfully delivered (sent or edited),
|
|
False otherwise. Callers like the overflow split loop use this to
|
|
decide whether to advance past the delivered chunk.
|
|
"""
|
|
# Strip MEDIA: directives so they don't appear as visible text.
|
|
# Media files are delivered as native attachments after the stream
|
|
# finishes (via _deliver_media_from_response in gateway/run.py).
|
|
text = self._clean_for_display(text)
|
|
# A bare streaming cursor is not meaningful user-visible content and
|
|
# can render as a stray tofu/white-box message on some clients.
|
|
visible_without_cursor = text
|
|
if self.cfg.cursor:
|
|
visible_without_cursor = visible_without_cursor.replace(self.cfg.cursor, "")
|
|
_visible_stripped = visible_without_cursor.strip()
|
|
if not _visible_stripped:
|
|
return True # cursor-only / whitespace-only update
|
|
if not text.strip():
|
|
return True # nothing to send is "success"
|
|
# Guard: do not create a brand-new standalone message when the only
|
|
# visible content is a handful of characters alongside the streaming
|
|
# cursor. During rapid tool-calling the model often emits 1-2 tokens
|
|
# before switching to tool calls; the resulting "X ▉" message risks
|
|
# leaving the cursor permanently visible if the follow-up edit (to
|
|
# strip the cursor on segment break) is rate-limited by the platform.
|
|
# This was reported on Telegram, Matrix, and other clients where the
|
|
# ▉ block character renders as a visible white box ("tofu").
|
|
# Existing messages (edits) are unaffected — only first sends gated.
|
|
_MIN_NEW_MSG_CHARS = 4
|
|
if (self._message_id is None
|
|
and self.cfg.cursor
|
|
and self.cfg.cursor in text
|
|
and len(_visible_stripped) < _MIN_NEW_MSG_CHARS):
|
|
return True # too short for a standalone message — accumulate more
|
|
try:
|
|
if self._message_id is not None:
|
|
if self._edit_supported:
|
|
# Skip if text is identical to what we last sent
|
|
if text == self._last_sent_text:
|
|
return True
|
|
# Edit existing message
|
|
result = await self.adapter.edit_message(
|
|
chat_id=self.chat_id,
|
|
message_id=self._message_id,
|
|
content=text,
|
|
)
|
|
if result.success:
|
|
self._already_sent = True
|
|
self._last_sent_text = text
|
|
# Successful edit — reset flood strike counter
|
|
self._flood_strikes = 0
|
|
return True
|
|
else:
|
|
# Edit failed. If this looks like flood control / rate
|
|
# limiting, use adaptive backoff: double the edit interval
|
|
# and retry on the next cycle. Only permanently disable
|
|
# edits after _MAX_FLOOD_STRIKES consecutive failures.
|
|
if self._is_flood_error(result):
|
|
self._flood_strikes += 1
|
|
self._current_edit_interval = min(
|
|
self._current_edit_interval * 2, 10.0,
|
|
)
|
|
logger.debug(
|
|
"Flood control on edit (strike %d/%d), "
|
|
"backoff interval → %.1fs",
|
|
self._flood_strikes,
|
|
self._MAX_FLOOD_STRIKES,
|
|
self._current_edit_interval,
|
|
)
|
|
if self._flood_strikes < self._MAX_FLOOD_STRIKES:
|
|
# Don't disable edits yet — just slow down.
|
|
# Update _last_edit_time so the next edit
|
|
# respects the new interval.
|
|
self._last_edit_time = time.monotonic()
|
|
return False
|
|
|
|
# Non-flood error OR flood strikes exhausted: enter
|
|
# fallback mode — send only the missing tail once the
|
|
# final response is available.
|
|
logger.debug(
|
|
"Edit failed (strikes=%d), entering fallback mode",
|
|
self._flood_strikes,
|
|
)
|
|
self._fallback_prefix = self._visible_prefix()
|
|
self._fallback_final_send = True
|
|
self._edit_supported = False
|
|
self._already_sent = True
|
|
# Best-effort: strip the cursor from the last visible
|
|
# message so the user doesn't see a stuck ▉.
|
|
await self._try_strip_cursor()
|
|
return False
|
|
else:
|
|
# Editing not supported — skip intermediate updates.
|
|
# The final response will be sent by the fallback path.
|
|
return False
|
|
else:
|
|
# First message — send new
|
|
result = await self.adapter.send(
|
|
chat_id=self.chat_id,
|
|
content=text,
|
|
metadata=self.metadata,
|
|
)
|
|
if result.success:
|
|
if result.message_id:
|
|
self._message_id = result.message_id
|
|
else:
|
|
self._edit_supported = False
|
|
self._already_sent = True
|
|
self._last_sent_text = text
|
|
if not result.message_id:
|
|
self._fallback_prefix = self._visible_prefix()
|
|
self._fallback_final_send = True
|
|
# Sentinel prevents re-entering the first-send path on
|
|
# every delta/tool boundary when platforms accept a
|
|
# message but do not return an editable message id.
|
|
self._message_id = "__no_edit__"
|
|
return True
|
|
else:
|
|
# Initial send failed — disable streaming for this session
|
|
self._edit_supported = False
|
|
return False
|
|
except Exception as e:
|
|
logger.error("Stream send/edit error: %s", e)
|
|
return False
|