From 69f85a4dce41e5ba53b2b5ea7f6fcb09d65263b7 Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Thu, 2 Apr 2026 16:32:21 +0530 Subject: [PATCH] fix(gateway): race condition, photo media loss, and flood control in Telegram MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three bugs causing intermittent silent drops, partial responses, and flood control delays on the Telegram platform: 1. Race condition in handle_message() — _active_sessions was set inside the background task, not before create_task(). Two rapid messages could both pass the guard and spawn duplicate processing tasks. Fix: set _active_sessions synchronously before spawning the task (grammY sequentialize / aiogram EventIsolation pattern). 2. Photo media loss on dequeue — when a photo (no caption) was queued during active processing and later dequeued, only .text was extracted. Empty text → message silently dropped. Fix: _build_media_placeholder() creates text context for media-only events so they survive the dequeue path. 3. Progress message edits triggered Telegram flood control — rapid tool calls edited the progress message every 0.3s, hitting Telegram's rate limit (23s+ waits). This blocked progress updates and could cause stream consumer timeouts. Fix: throttle edits to 1.5s minimum interval, detect flood control errors and gracefully degrade to new messages. edit_message() now returns failure for flood waits >5s instead of blocking. --- gateway/platforms/base.py | 13 ++++++-- gateway/platforms/telegram.py | 12 +++++++- gateway/run.py | 58 ++++++++++++++++++++++++++++++++--- 3 files changed, 76 insertions(+), 7 deletions(-) diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index 9a821727e..6b9c97c3c 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -1046,6 +1046,13 @@ class BasePlatformAdapter(ABC): self._active_sessions[session_key].set() return # Don't process now - will be handled after current task finishes + # Mark session as active BEFORE spawning background task to close + # the race window where a second message arriving before the task + # starts would also pass the _active_sessions check and spawn a + # duplicate task. (grammY sequentialize / aiogram EventIsolation + # pattern — set the guard synchronously, not inside the task.) + self._active_sessions[session_key] = asyncio.Event() + # Spawn background task to process this message task = asyncio.create_task(self._process_message_background(event, session_key)) try: @@ -1092,8 +1099,10 @@ class BasePlatformAdapter(ABC): if getattr(result, "success", False): delivery_succeeded = True - # Create interrupt event for this session - interrupt_event = asyncio.Event() + # Reuse the interrupt event set by handle_message() (which marks + # the session active before spawning this task to prevent races). + # Fall back to a new Event only if the entry was removed externally. + interrupt_event = self._active_sessions.get(session_key) or asyncio.Event() self._active_sessions[session_key] = interrupt_event # Start continuous typing indicator (refreshes every 2 seconds) diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index e5e2885c7..304c5625d 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -900,7 +900,10 @@ class TelegramAdapter(BasePlatformAdapter): except Exception: pass # best-effort truncation return SendResult(success=True, message_id=message_id) - # Flood control / RetryAfter — back off and retry once + # Flood control / RetryAfter — short waits are retried inline, + # long waits (>5s) return a failure so the caller can decide + # whether to wait or degrade gracefully. (grammY auto-retry + # pattern: maxDelaySeconds threshold.) retry_after = getattr(e, "retry_after", None) if retry_after is not None or "retry after" in err_str: wait = retry_after if retry_after else 1.0 @@ -908,6 +911,13 @@ class TelegramAdapter(BasePlatformAdapter): "[%s] Telegram flood control, waiting %.1fs", self.name, wait, ) + if wait > 5.0: + # Long wait — return failure immediately so callers + # (progress edits, stream consumer) aren't blocked. + return SendResult( + success=False, + error=f"flood_control:{wait}", + ) await asyncio.sleep(wait) try: await self._bot.edit_message_text( diff --git a/gateway/run.py b/gateway/run.py index 7a750a2c8..7c711d39e 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -303,6 +303,28 @@ def _resolve_runtime_agent_kwargs() -> dict: } +def _build_media_placeholder(event) -> str: + """Build a text placeholder for media-only events so they aren't dropped. + + When a photo/document is queued during active processing and later + dequeued, only .text is extracted. If the event has no caption, + the media would be silently lost. This builds a placeholder that + the vision enrichment pipeline will replace with a real description. + """ + parts = [] + media_urls = getattr(event, "media_urls", None) or [] + media_types = getattr(event, "media_types", None) or [] + for i, url in enumerate(media_urls): + mtype = media_types[i] if i < len(media_types) else "" + if mtype.startswith("image/") or getattr(event, "message_type", None) == MessageType.PHOTO: + parts.append(f"[User sent an image: {url}]") + elif mtype.startswith("audio/"): + parts.append(f"[User sent audio: {url}]") + else: + parts.append(f"[User sent a file: {url}]") + return "\n".join(parts) + + def _check_unavailable_skill(command_name: str) -> str | None: """Check if a command matches a known-but-inactive skill. @@ -5384,11 +5406,13 @@ class GatewayRunner: progress_lines = [] # Accumulated tool lines progress_msg_id = None # ID of the progress message to edit can_edit = True # False once an edit fails (platform doesn't support it) + _last_edit_ts = 0.0 # Throttle edits to avoid Telegram flood control + _PROGRESS_EDIT_INTERVAL = 1.5 # Minimum seconds between edits while True: try: raw = progress_queue.get_nowait() - + # Handle dedup messages: update last line with repeat counter if isinstance(raw, tuple) and len(raw) == 3 and raw[0] == "__dedup__": _, base_msg, count = raw @@ -5399,6 +5423,15 @@ class GatewayRunner: msg = raw progress_lines.append(msg) + # Throttle edits: batch rapid tool updates into fewer + # API calls to avoid hitting Telegram flood control. + # (grammY auto-retry pattern: proactively rate-limit + # instead of reacting to 429s.) + _now = time.monotonic() + if _now - _last_edit_ts < _PROGRESS_EDIT_INTERVAL: + await asyncio.sleep(0.1) + continue + if can_edit and progress_msg_id is not None: # Try to edit the existing progress message full_text = "\n".join(progress_lines) @@ -5408,8 +5441,15 @@ class GatewayRunner: content=full_text, ) if not result.success: - # Platform doesn't support editing — stop trying, - # send just this new line as a separate message + _err = (getattr(result, "error", "") or "").lower() + if "flood" in _err or "retry after" in _err: + # Flood control hit — disable further edits, + # switch to sending new messages only for + # important updates. Don't block 23s. + logger.info( + "[%s] Progress edits disabled due to flood control", + adapter.name, + ) can_edit = False await adapter.send(chat_id=source.chat_id, content=msg, metadata=_progress_metadata) else: @@ -5423,6 +5463,8 @@ class GatewayRunner: if result.success and result.message_id: progress_msg_id = result.message_id + _last_edit_ts = time.monotonic() + # Restore typing indicator await asyncio.sleep(0.3) await adapter.send_typing(source.chat_id, metadata=_progress_metadata) @@ -5977,6 +6019,11 @@ class GatewayRunner: pending_event = adapter.get_pending_message(session_key) if pending_event: pending = pending_event.text + # Preserve media context for photo/document events + # whose text is empty (no caption). Without this, + # captionless photos are silently dropped. + if not pending and getattr(pending_event, "media_urls", None): + pending = _build_media_placeholder(pending_event) elif result.get("interrupt_message"): pending = result.get("interrupt_message") else: @@ -5985,7 +6032,10 @@ class GatewayRunner: pending_event = adapter.get_pending_message(session_key) if pending_event: pending = pending_event.text - logger.debug("Processing queued message after agent completion: '%s...'", pending[:40]) + if not pending and getattr(pending_event, "media_urls", None): + pending = _build_media_placeholder(pending_event) + if pending: + logger.debug("Processing queued message after agent completion: '%s...'", pending[:40]) if pending: logger.debug("Processing pending message: '%s...'", pending[:40])