fix(gateway): race condition, photo media loss, and flood control in Telegram
Three bugs causing intermittent silent drops, partial responses, and flood control delays on the Telegram platform: 1. Race condition in handle_message() — _active_sessions was set inside the background task, not before create_task(). Two rapid messages could both pass the guard and spawn duplicate processing tasks. Fix: set _active_sessions synchronously before spawning the task (grammY sequentialize / aiogram EventIsolation pattern). 2. Photo media loss on dequeue — when a photo (no caption) was queued during active processing and later dequeued, only .text was extracted. Empty text → message silently dropped. Fix: _build_media_placeholder() creates text context for media-only events so they survive the dequeue path. 3. Progress message edits triggered Telegram flood control — rapid tool calls edited the progress message every 0.3s, hitting Telegram's rate limit (23s+ waits). This blocked progress updates and could cause stream consumer timeouts. Fix: throttle edits to 1.5s minimum interval, detect flood control errors and gracefully degrade to new messages. edit_message() now returns failure for flood waits >5s instead of blocking.
This commit is contained in:
@@ -1046,6 +1046,13 @@ class BasePlatformAdapter(ABC):
|
||||
self._active_sessions[session_key].set()
|
||||
return # Don't process now - will be handled after current task finishes
|
||||
|
||||
# Mark session as active BEFORE spawning background task to close
|
||||
# the race window where a second message arriving before the task
|
||||
# starts would also pass the _active_sessions check and spawn a
|
||||
# duplicate task. (grammY sequentialize / aiogram EventIsolation
|
||||
# pattern — set the guard synchronously, not inside the task.)
|
||||
self._active_sessions[session_key] = asyncio.Event()
|
||||
|
||||
# Spawn background task to process this message
|
||||
task = asyncio.create_task(self._process_message_background(event, session_key))
|
||||
try:
|
||||
@@ -1092,8 +1099,10 @@ class BasePlatformAdapter(ABC):
|
||||
if getattr(result, "success", False):
|
||||
delivery_succeeded = True
|
||||
|
||||
# Create interrupt event for this session
|
||||
interrupt_event = asyncio.Event()
|
||||
# Reuse the interrupt event set by handle_message() (which marks
|
||||
# the session active before spawning this task to prevent races).
|
||||
# Fall back to a new Event only if the entry was removed externally.
|
||||
interrupt_event = self._active_sessions.get(session_key) or asyncio.Event()
|
||||
self._active_sessions[session_key] = interrupt_event
|
||||
|
||||
# Start continuous typing indicator (refreshes every 2 seconds)
|
||||
|
||||
@@ -900,7 +900,10 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
except Exception:
|
||||
pass # best-effort truncation
|
||||
return SendResult(success=True, message_id=message_id)
|
||||
# Flood control / RetryAfter — back off and retry once
|
||||
# Flood control / RetryAfter — short waits are retried inline,
|
||||
# long waits (>5s) return a failure so the caller can decide
|
||||
# whether to wait or degrade gracefully. (grammY auto-retry
|
||||
# pattern: maxDelaySeconds threshold.)
|
||||
retry_after = getattr(e, "retry_after", None)
|
||||
if retry_after is not None or "retry after" in err_str:
|
||||
wait = retry_after if retry_after else 1.0
|
||||
@@ -908,6 +911,13 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
"[%s] Telegram flood control, waiting %.1fs",
|
||||
self.name, wait,
|
||||
)
|
||||
if wait > 5.0:
|
||||
# Long wait — return failure immediately so callers
|
||||
# (progress edits, stream consumer) aren't blocked.
|
||||
return SendResult(
|
||||
success=False,
|
||||
error=f"flood_control:{wait}",
|
||||
)
|
||||
await asyncio.sleep(wait)
|
||||
try:
|
||||
await self._bot.edit_message_text(
|
||||
|
||||
@@ -303,6 +303,28 @@ def _resolve_runtime_agent_kwargs() -> dict:
|
||||
}
|
||||
|
||||
|
||||
def _build_media_placeholder(event) -> str:
|
||||
"""Build a text placeholder for media-only events so they aren't dropped.
|
||||
|
||||
When a photo/document is queued during active processing and later
|
||||
dequeued, only .text is extracted. If the event has no caption,
|
||||
the media would be silently lost. This builds a placeholder that
|
||||
the vision enrichment pipeline will replace with a real description.
|
||||
"""
|
||||
parts = []
|
||||
media_urls = getattr(event, "media_urls", None) or []
|
||||
media_types = getattr(event, "media_types", None) or []
|
||||
for i, url in enumerate(media_urls):
|
||||
mtype = media_types[i] if i < len(media_types) else ""
|
||||
if mtype.startswith("image/") or getattr(event, "message_type", None) == MessageType.PHOTO:
|
||||
parts.append(f"[User sent an image: {url}]")
|
||||
elif mtype.startswith("audio/"):
|
||||
parts.append(f"[User sent audio: {url}]")
|
||||
else:
|
||||
parts.append(f"[User sent a file: {url}]")
|
||||
return "\n".join(parts)
|
||||
|
||||
|
||||
def _check_unavailable_skill(command_name: str) -> str | None:
|
||||
"""Check if a command matches a known-but-inactive skill.
|
||||
|
||||
@@ -5384,11 +5406,13 @@ class GatewayRunner:
|
||||
progress_lines = [] # Accumulated tool lines
|
||||
progress_msg_id = None # ID of the progress message to edit
|
||||
can_edit = True # False once an edit fails (platform doesn't support it)
|
||||
_last_edit_ts = 0.0 # Throttle edits to avoid Telegram flood control
|
||||
_PROGRESS_EDIT_INTERVAL = 1.5 # Minimum seconds between edits
|
||||
|
||||
while True:
|
||||
try:
|
||||
raw = progress_queue.get_nowait()
|
||||
|
||||
|
||||
# Handle dedup messages: update last line with repeat counter
|
||||
if isinstance(raw, tuple) and len(raw) == 3 and raw[0] == "__dedup__":
|
||||
_, base_msg, count = raw
|
||||
@@ -5399,6 +5423,15 @@ class GatewayRunner:
|
||||
msg = raw
|
||||
progress_lines.append(msg)
|
||||
|
||||
# Throttle edits: batch rapid tool updates into fewer
|
||||
# API calls to avoid hitting Telegram flood control.
|
||||
# (grammY auto-retry pattern: proactively rate-limit
|
||||
# instead of reacting to 429s.)
|
||||
_now = time.monotonic()
|
||||
if _now - _last_edit_ts < _PROGRESS_EDIT_INTERVAL:
|
||||
await asyncio.sleep(0.1)
|
||||
continue
|
||||
|
||||
if can_edit and progress_msg_id is not None:
|
||||
# Try to edit the existing progress message
|
||||
full_text = "\n".join(progress_lines)
|
||||
@@ -5408,8 +5441,15 @@ class GatewayRunner:
|
||||
content=full_text,
|
||||
)
|
||||
if not result.success:
|
||||
# Platform doesn't support editing — stop trying,
|
||||
# send just this new line as a separate message
|
||||
_err = (getattr(result, "error", "") or "").lower()
|
||||
if "flood" in _err or "retry after" in _err:
|
||||
# Flood control hit — disable further edits,
|
||||
# switch to sending new messages only for
|
||||
# important updates. Don't block 23s.
|
||||
logger.info(
|
||||
"[%s] Progress edits disabled due to flood control",
|
||||
adapter.name,
|
||||
)
|
||||
can_edit = False
|
||||
await adapter.send(chat_id=source.chat_id, content=msg, metadata=_progress_metadata)
|
||||
else:
|
||||
@@ -5423,6 +5463,8 @@ class GatewayRunner:
|
||||
if result.success and result.message_id:
|
||||
progress_msg_id = result.message_id
|
||||
|
||||
_last_edit_ts = time.monotonic()
|
||||
|
||||
# Restore typing indicator
|
||||
await asyncio.sleep(0.3)
|
||||
await adapter.send_typing(source.chat_id, metadata=_progress_metadata)
|
||||
@@ -5977,6 +6019,11 @@ class GatewayRunner:
|
||||
pending_event = adapter.get_pending_message(session_key)
|
||||
if pending_event:
|
||||
pending = pending_event.text
|
||||
# Preserve media context for photo/document events
|
||||
# whose text is empty (no caption). Without this,
|
||||
# captionless photos are silently dropped.
|
||||
if not pending and getattr(pending_event, "media_urls", None):
|
||||
pending = _build_media_placeholder(pending_event)
|
||||
elif result.get("interrupt_message"):
|
||||
pending = result.get("interrupt_message")
|
||||
else:
|
||||
@@ -5985,7 +6032,10 @@ class GatewayRunner:
|
||||
pending_event = adapter.get_pending_message(session_key)
|
||||
if pending_event:
|
||||
pending = pending_event.text
|
||||
logger.debug("Processing queued message after agent completion: '%s...'", pending[:40])
|
||||
if not pending and getattr(pending_event, "media_urls", None):
|
||||
pending = _build_media_placeholder(pending_event)
|
||||
if pending:
|
||||
logger.debug("Processing queued message after agent completion: '%s...'", pending[:40])
|
||||
|
||||
if pending:
|
||||
logger.debug("Processing pending message: '%s...'", pending[:40])
|
||||
|
||||
Reference in New Issue
Block a user