"""Gateway streaming consumer — bridges sync agent callbacks to async platform delivery. The agent fires stream_delta_callback(text) synchronously from its worker thread. GatewayStreamConsumer: 1. Receives deltas via on_delta() (thread-safe, sync) 2. Queues them to an asyncio task via queue.Queue 3. The async run() task buffers, rate-limits, and progressively edits a single message on the target platform Design: Uses the edit transport (send initial message, then editMessageText). This is universally supported across Telegram, Discord, and Slack. Credit: jobless0x (#774, #1312), OutThisLife (#798), clicksingh (#697). """ from __future__ import annotations import asyncio import logging import queue import threading import time from dataclasses import dataclass, field from typing import Any, Dict, Optional logger = logging.getLogger("gateway.stream_consumer") # Sentinel to signal the stream is complete _DONE = object() @dataclass class StreamConsumerConfig: """Runtime config for a single stream consumer instance.""" edit_interval: float = 0.3 buffer_threshold: int = 40 cursor: str = " ▉" # Adaptive back-off settings for high-throughput streaming min_poll_interval: float = 0.01 # 10ms when queue is busy (100 updates/sec) max_poll_interval: float = 0.05 # 50ms when queue is idle busy_queue_threshold: int = 5 # Queue depth considered "busy" enable_metrics: bool = True # Enable queue depth/processing metrics class GatewayStreamConsumer: """Async consumer that progressively edits a platform message with streamed tokens. Usage:: consumer = GatewayStreamConsumer(adapter, chat_id, config, metadata=metadata) # Pass consumer.on_delta as stream_delta_callback to AIAgent agent = AIAgent(..., stream_delta_callback=consumer.on_delta) # Start the consumer as an asyncio task task = asyncio.create_task(consumer.run()) # ... run agent in thread pool ... consumer.finish() # signal completion await task # wait for final edit """ def __init__( self, adapter: Any, chat_id: str, config: Optional[StreamConsumerConfig] = None, metadata: Optional[dict] = None, ): self.adapter = adapter self.chat_id = chat_id self.cfg = config or StreamConsumerConfig() self.metadata = metadata self._queue: queue.Queue = queue.Queue() self._accumulated = "" self._message_id: Optional[str] = None self._already_sent = False self._edit_supported = True # Disabled on first edit failure (Signal/Email/HA) self._last_edit_time = 0.0 self._last_sent_text = "" # Track last-sent text to skip redundant edits # Event-driven signaling: set when new items are available self._item_available = asyncio.Event() self._lock = threading.Lock() self._done_received = False # Metrics tracking self._metrics: Dict[str, Any] = { "items_received": 0, "items_processed": 0, "edits_sent": 0, "max_queue_depth": 0, "start_time": 0.0, "end_time": 0.0, } @property def already_sent(self) -> bool: """True if at least one message was sent/edited — signals the base adapter to skip re-sending the final response.""" return self._already_sent def on_delta(self, text: str) -> None: """Thread-safe callback — called from the agent's worker thread.""" if text: with self._lock: self._queue.put(text) self._metrics["items_received"] += 1 queue_size = self._queue.qsize() if queue_size > self._metrics["max_queue_depth"]: self._metrics["max_queue_depth"] = queue_size # Signal the async loop that new data is available try: self._item_available.set() except RuntimeError: # Event loop may not be running yet, that's ok pass def finish(self) -> None: """Signal that the stream is complete.""" with self._lock: self._queue.put(_DONE) self._done_received = True try: self._item_available.set() except RuntimeError: pass @property def metrics(self) -> Dict[str, Any]: """Return processing metrics for this stream.""" metrics = self._metrics.copy() if metrics["start_time"] > 0 and metrics["end_time"] > 0: duration = metrics["end_time"] - metrics["start_time"] if duration > 0: metrics["throughput"] = metrics["items_processed"] / duration metrics["duration_sec"] = duration return metrics async def run(self) -> None: """Async task that drains the queue and edits the platform message. Optimized with event-driven architecture and adaptive back-off: - Uses asyncio.Event for signaling instead of busy-wait - Adaptive poll intervals: 10ms when busy, 50ms when idle - Target throughput: 100+ updates/sec when queue is busy """ # Platform message length limit — leave room for cursor + formatting _raw_limit = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096) _safe_limit = max(500, _raw_limit - len(self.cfg.cursor) - 100) self._metrics["start_time"] = time.monotonic() consecutive_empty_polls = 0 try: while True: # Wait for items to be available (event-driven) # Use timeout to also handle periodic edit intervals wait_timeout = self._calculate_wait_timeout() try: await asyncio.wait_for( self._item_available.wait(), timeout=wait_timeout ) except asyncio.TimeoutError: pass # Continue to process edits based on time interval # Clear the event - we'll process all available items self._item_available.clear() # Drain all available items from the queue got_done = False items_this_cycle = 0 while True: try: item = self._queue.get_nowait() if item is _DONE: got_done = True break self._accumulated += item items_this_cycle += 1 self._metrics["items_processed"] += 1 except queue.Empty: break # Adaptive back-off: adjust sleep based on queue depth queue_depth = self._queue.qsize() if queue_depth > 0 or items_this_cycle > 0: consecutive_empty_polls = 0 # Reset on activity else: consecutive_empty_polls += 1 # Decide whether to flush an edit now = time.monotonic() elapsed = now - self._last_edit_time should_edit = ( got_done or (elapsed >= self.cfg.edit_interval and len(self._accumulated) > 0) or len(self._accumulated) >= self.cfg.buffer_threshold ) if should_edit and self._accumulated: await self._process_edit(_safe_limit, got_done) self._last_edit_time = time.monotonic() if got_done: # Final edit without cursor if self._accumulated and self._message_id: await self._send_or_edit(self._accumulated) self._metrics["end_time"] = time.monotonic() self._log_metrics() return # Adaptive yield: shorter sleep when queue is busy sleep_interval = self._calculate_sleep_interval(queue_depth, consecutive_empty_polls) if sleep_interval > 0: await asyncio.sleep(sleep_interval) except asyncio.CancelledError: self._metrics["end_time"] = time.monotonic() # Best-effort final edit on cancellation if self._accumulated and self._message_id: try: await self._send_or_edit(self._accumulated) except Exception: pass raise except Exception as e: self._metrics["end_time"] = time.monotonic() logger.error("Stream consumer error: %s", e) raise def _calculate_wait_timeout(self) -> float: """Calculate timeout for waiting on new items.""" # If we have accumulated text and haven't edited recently, # wake up to check edit_interval if self._accumulated and self._last_edit_time > 0: time_since_edit = time.monotonic() - self._last_edit_time remaining = self.cfg.edit_interval - time_since_edit if remaining > 0: return min(remaining, self.cfg.max_poll_interval) return self.cfg.max_poll_interval def _calculate_sleep_interval(self, queue_depth: int, empty_polls: int) -> float: """Calculate adaptive sleep interval based on queue state.""" # If queue is busy, use minimum poll interval for high throughput if queue_depth >= self.cfg.busy_queue_threshold: return self.cfg.min_poll_interval # If we just processed items, check if more might be coming if queue_depth > 0: return self.cfg.min_poll_interval # Gradually increase sleep time when idle if empty_polls < 3: return self.cfg.min_poll_interval elif empty_polls < 10: return (self.cfg.min_poll_interval + self.cfg.max_poll_interval) / 2 else: return self.cfg.max_poll_interval async def _process_edit(self, safe_limit: int, got_done: bool) -> None: """Process accumulated text and send/edit message.""" # Split overflow: if accumulated text exceeds the platform # limit, finalize the current message and start a new one. while ( len(self._accumulated) > safe_limit and self._message_id is not None ): split_at = self._accumulated.rfind("\n", 0, safe_limit) if split_at < safe_limit // 2: split_at = safe_limit chunk = self._accumulated[:split_at] await self._send_or_edit(chunk) self._accumulated = self._accumulated[split_at:].lstrip("\n") self._message_id = None self._last_sent_text = "" display_text = self._accumulated if not got_done: display_text += self.cfg.cursor await self._send_or_edit(display_text) self._metrics["edits_sent"] += 1 def _log_metrics(self) -> None: """Log performance metrics if enabled.""" if not self.cfg.enable_metrics: return metrics = self.metrics logger.debug( "Stream metrics: items=%(items_processed)d, edits=%(edits_sent)d, " "max_queue=%(max_queue_depth)d, throughput=%(throughput).1f/sec, " "duration=%(duration_sec).3fs", metrics ) async def _send_or_edit(self, text: str) -> None: """Send or edit the streaming message.""" try: if self._message_id is not None: if self._edit_supported: # Skip if text is identical to what we last sent if text == self._last_sent_text: return # Edit existing message result = await self.adapter.edit_message( chat_id=self.chat_id, message_id=self._message_id, content=text, ) if result.success: self._already_sent = True self._last_sent_text = text else: # Edit not supported by this adapter — stop streaming, # let the normal send path handle the final response. # Without this guard, adapters like Signal/Email would # flood the chat with a new message every edit_interval. logger.debug("Edit failed, disabling streaming for this adapter") self._edit_supported = False else: # Editing not supported — skip intermediate updates. # The final response will be sent by the normal path. pass else: # First message — send new result = await self.adapter.send( chat_id=self.chat_id, content=text, metadata=self.metadata, ) if result.success and result.message_id: self._message_id = result.message_id self._already_sent = True self._last_sent_text = text else: # Initial send failed — disable streaming for this session self._edit_supported = False except Exception as e: logger.error("Stream send/edit error: %s", e)