Merge PR #288: feat(whatsapp): stream tool progress as a single live-updating message

Authored by satelerd. Adds edit_message() to BasePlatformAdapter and
implements it for WhatsApp via Baileys native editing. Progress messages
accumulate into a single live-updating message instead of N separate ones.

Cherry-picked from stale branch.
This commit is contained in:
teknium1
2026-03-05 03:44:13 -08:00
4 changed files with 109 additions and 10 deletions

View File

@@ -398,7 +398,20 @@ class BasePlatformAdapter(ABC):
SendResult with success status and message ID
"""
pass
async def edit_message(
self,
chat_id: str,
message_id: str,
content: str,
) -> SendResult:
"""
Edit a previously sent message. Optional — platforms that don't
support editing return success=False and callers fall back to
sending a new message.
"""
return SendResult(success=False, error="Not supported")
async def send_typing(self, chat_id: str) -> None:
"""
Send a typing indicator.

View File

@@ -351,7 +351,36 @@ class WhatsAppAdapter(BasePlatformAdapter):
)
except Exception as e:
return SendResult(success=False, error=str(e))
async def edit_message(
self,
chat_id: str,
message_id: str,
content: str,
) -> SendResult:
"""Edit a previously sent message via the WhatsApp bridge."""
if not self._running:
return SendResult(success=False, error="Not connected")
try:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.post(
f"http://localhost:{self._bridge_port}/edit",
json={
"chatId": chat_id,
"messageId": message_id,
"message": content,
},
timeout=aiohttp.ClientTimeout(total=15)
) as resp:
if resp.status == 200:
return SendResult(success=True, message_id=message_id)
else:
error = await resp.text()
return SendResult(success=False, error=error)
except Exception as e:
return SendResult(success=False, error=str(e))
async def send_typing(self, chat_id: str) -> None:
"""Send typing indicator via bridge."""
if not self._running:

View File

@@ -1939,32 +1939,67 @@ class GatewayRunner:
progress_queue.put(msg)
# Background task to send progress messages
# Accumulates tool lines into a single message that gets edited
async def send_progress_messages():
if not progress_queue:
return
adapter = self.adapters.get(source.platform)
if not adapter:
return
progress_lines = [] # Accumulated tool lines
progress_msg_id = None # ID of the progress message to edit
while True:
try:
# Non-blocking check with small timeout
msg = progress_queue.get_nowait()
await adapter.send(chat_id=source.chat_id, content=msg)
# Restore typing indicator after sending progress message
progress_lines.append(msg)
full_text = "\n".join(progress_lines)
if progress_msg_id is None:
# First tool: send as new message
result = await adapter.send(chat_id=source.chat_id, content=full_text)
if result.success and result.message_id:
progress_msg_id = result.message_id
else:
# Subsequent tools: try to edit, fall back to new message
result = await adapter.edit_message(
chat_id=source.chat_id,
message_id=progress_msg_id,
content=full_text,
)
if not result.success:
# Edit failed — send as new message and track it
result = await adapter.send(chat_id=source.chat_id, content=full_text)
if result.success and result.message_id:
progress_msg_id = result.message_id
# Restore typing indicator
await asyncio.sleep(0.3)
await adapter.send_typing(source.chat_id)
except queue.Empty:
await asyncio.sleep(0.3) # Check again soon
await asyncio.sleep(0.3)
except asyncio.CancelledError:
# Drain remaining messages
# Drain remaining queued messages
while not progress_queue.empty():
try:
msg = progress_queue.get_nowait()
await adapter.send(chat_id=source.chat_id, content=msg)
progress_lines.append(msg)
except Exception:
break
# Final edit with all remaining tools
if progress_lines and progress_msg_id:
full_text = "\n".join(progress_lines)
try:
await adapter.edit_message(
chat_id=source.chat_id,
message_id=progress_msg_id,
content=full_text,
)
except Exception:
pass
return
except Exception as e:
logger.error("Progress message error: %s", e)

View File

@@ -8,6 +8,7 @@
* Endpoints (matches gateway/platforms/whatsapp.py expectations):
* GET /messages - Long-poll for new incoming messages
* POST /send - Send a message { chatId, message, replyTo? }
* POST /edit - Edit a sent message { chatId, messageId, message }
* POST /typing - Send typing indicator { chatId }
* GET /chat/:id - Get chat info
* GET /health - Health check
@@ -216,6 +217,27 @@ app.post('/send', async (req, res) => {
}
});
// Edit a previously sent message
app.post('/edit', async (req, res) => {
if (!sock || connectionState !== 'connected') {
return res.status(503).json({ error: 'Not connected to WhatsApp' });
}
const { chatId, messageId, message } = req.body;
if (!chatId || !messageId || !message) {
return res.status(400).json({ error: 'chatId, messageId, and message are required' });
}
try {
const prefixed = `⚕ *Hermes Agent*\n────────────\n${message}`;
const key = { id: messageId, fromMe: true, remoteJid: chatId };
await sock.sendMessage(chatId, { text: prefixed, edit: key });
res.json({ success: true });
} catch (err) {
res.status(500).json({ error: err.message });
}
});
// Typing indicator
app.post('/typing', async (req, res) => {
if (!sock || connectionState !== 'connected') {