Implement continuous typing indicator in message handling

- Added a new private method `_keep_typing` to send a typing indicator continuously while processing messages, refreshing every 4 seconds to comply with Telegram/Discord limitations.
- Updated the `handle_message` method to initiate the typing indicator at the start of message processing and ensure it stops once processing is complete, improving user experience during message handling.
This commit is contained in:
teknium1
2026-02-03 14:51:31 -08:00
parent 7eac4ee9fe
commit a09b018bd5

View File

@@ -171,20 +171,34 @@ class BasePlatformAdapter(ABC):
"""
pass
async def _keep_typing(self, chat_id: str, interval: float = 4.0) -> None:
"""
Continuously send typing indicator until cancelled.
Telegram/Discord typing status expires after ~5 seconds, so we refresh every 4.
"""
try:
while True:
await self.send_typing(chat_id)
await asyncio.sleep(interval)
except asyncio.CancelledError:
pass # Normal cancellation when handler completes
async def handle_message(self, event: MessageEvent) -> None:
"""
Process an incoming message.
Calls the registered message handler and sends the response.
Keeps typing indicator active throughout processing.
"""
if not self._message_handler:
return
# Start continuous typing indicator (refreshes every 4 seconds)
typing_task = asyncio.create_task(self._keep_typing(event.source.chat_id))
try:
# Send typing indicator
await self.send_typing(event.source.chat_id)
# Call the handler
# Call the handler (this can take a while with tool calls)
response = await self._message_handler(event)
# Send response if any
@@ -196,6 +210,13 @@ class BasePlatformAdapter(ABC):
)
except Exception as e:
print(f"[{self.name}] Error handling message: {e}")
finally:
# Stop typing indicator
typing_task.cancel()
try:
await typing_task
except asyncio.CancelledError:
pass
def build_source(
self,