Replace per-request aiohttp.ClientSession() in every WhatsApp adapter method with a single persistent self._http_session, matching the pattern used by Mattermost, HomeAssistant, and SMS adapters. Changes: - Create self._http_session in connect(), close in disconnect() - All bridge HTTP calls (send, edit, send-media, typing, get_chat_info, poll_messages) now use the shared session - Explicitly cancel _poll_task on disconnect() instead of relying solely on self._running = False - Health-check sessions in connect() remain ephemeral (persistent session not yet created at that point) - Remove per-method ImportError guards for aiohttp (always available when gateway runs via [messaging] extras) Salvaged from PR #1851 by Himess. The _poll_task storage was already on main from PR #3267; this adds the disconnect cancellation and the persistent session. Tests: 4 new tests for session close, already-closed skip, poll task cancellation, and done-task skip.
810 lines
34 KiB
Python
810 lines
34 KiB
Python
"""
|
|
WhatsApp platform adapter.
|
|
|
|
WhatsApp integration is more complex than Telegram/Discord because:
|
|
- No official bot API for personal accounts
|
|
- Business API requires Meta Business verification
|
|
- Most solutions use web-based automation
|
|
|
|
This adapter supports multiple backends:
|
|
1. WhatsApp Business API (requires Meta verification)
|
|
2. whatsapp-web.js (via Node.js subprocess) - for personal accounts
|
|
3. Baileys (via Node.js subprocess) - alternative for personal accounts
|
|
|
|
For simplicity, we'll implement a generic interface that can work
|
|
with different backends via a bridge pattern.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import platform
|
|
import subprocess
|
|
|
|
_IS_WINDOWS = platform.system() == "Windows"
|
|
from pathlib import Path
|
|
from typing import Dict, Optional, Any
|
|
|
|
from hermes_cli.config import get_hermes_home
|
|
from hermes_constants import get_hermes_dir
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _kill_port_process(port: int) -> None:
|
|
"""Kill any process listening on the given TCP port."""
|
|
try:
|
|
if _IS_WINDOWS:
|
|
# Use netstat to find the PID bound to this port, then taskkill
|
|
result = subprocess.run(
|
|
["netstat", "-ano", "-p", "TCP"],
|
|
capture_output=True, text=True, timeout=5,
|
|
)
|
|
for line in result.stdout.splitlines():
|
|
parts = line.split()
|
|
if len(parts) >= 5 and parts[3] == "LISTENING":
|
|
local_addr = parts[1]
|
|
if local_addr.endswith(f":{port}"):
|
|
try:
|
|
subprocess.run(
|
|
["taskkill", "/PID", parts[4], "/F"],
|
|
capture_output=True, timeout=5,
|
|
)
|
|
except subprocess.SubprocessError:
|
|
pass
|
|
else:
|
|
result = subprocess.run(
|
|
["fuser", f"{port}/tcp"],
|
|
capture_output=True, timeout=5,
|
|
)
|
|
if result.returncode == 0:
|
|
subprocess.run(
|
|
["fuser", "-k", f"{port}/tcp"],
|
|
capture_output=True, timeout=5,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
import sys
|
|
sys.path.insert(0, str(Path(__file__).resolve().parents[2]))
|
|
|
|
from gateway.config import Platform, PlatformConfig
|
|
from gateway.platforms.base import (
|
|
BasePlatformAdapter,
|
|
MessageEvent,
|
|
MessageType,
|
|
SendResult,
|
|
SUPPORTED_DOCUMENT_TYPES,
|
|
cache_image_from_url,
|
|
cache_audio_from_url,
|
|
)
|
|
|
|
|
|
def check_whatsapp_requirements() -> bool:
|
|
"""
|
|
Check if WhatsApp dependencies are available.
|
|
|
|
WhatsApp requires a Node.js bridge for most implementations.
|
|
"""
|
|
# Check for Node.js
|
|
try:
|
|
result = subprocess.run(
|
|
["node", "--version"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
return result.returncode == 0
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
class WhatsAppAdapter(BasePlatformAdapter):
|
|
"""
|
|
WhatsApp adapter.
|
|
|
|
This implementation uses a simple HTTP bridge pattern where:
|
|
1. A Node.js process runs the WhatsApp Web client
|
|
2. Messages are forwarded via HTTP/IPC to this Python adapter
|
|
3. Responses are sent back through the bridge
|
|
|
|
The actual Node.js bridge implementation can vary:
|
|
- whatsapp-web.js based
|
|
- Baileys based
|
|
- Business API based
|
|
|
|
Configuration:
|
|
- bridge_script: Path to the Node.js bridge script
|
|
- bridge_port: Port for HTTP communication (default: 3000)
|
|
- session_path: Path to store WhatsApp session data
|
|
"""
|
|
|
|
# WhatsApp message limits
|
|
MAX_MESSAGE_LENGTH = 65536 # WhatsApp allows longer messages
|
|
|
|
# Default bridge location relative to the hermes-agent install
|
|
_DEFAULT_BRIDGE_DIR = Path(__file__).resolve().parents[2] / "scripts" / "whatsapp-bridge"
|
|
|
|
def __init__(self, config: PlatformConfig):
|
|
super().__init__(config, Platform.WHATSAPP)
|
|
self._bridge_process: Optional[subprocess.Popen] = None
|
|
self._bridge_port: int = config.extra.get("bridge_port", 3000)
|
|
self._bridge_script: Optional[str] = config.extra.get(
|
|
"bridge_script",
|
|
str(self._DEFAULT_BRIDGE_DIR / "bridge.js"),
|
|
)
|
|
self._session_path: Path = Path(config.extra.get(
|
|
"session_path",
|
|
get_hermes_dir("platforms/whatsapp/session", "whatsapp/session")
|
|
))
|
|
self._reply_prefix: Optional[str] = config.extra.get("reply_prefix")
|
|
self._message_queue: asyncio.Queue = asyncio.Queue()
|
|
self._bridge_log_fh = None
|
|
self._bridge_log: Optional[Path] = None
|
|
self._poll_task: Optional[asyncio.Task] = None
|
|
self._http_session: Optional["aiohttp.ClientSession"] = None
|
|
self._session_lock_identity: Optional[str] = None
|
|
|
|
async def connect(self) -> bool:
|
|
"""
|
|
Start the WhatsApp bridge.
|
|
|
|
This launches the Node.js bridge process and waits for it to be ready.
|
|
"""
|
|
if not check_whatsapp_requirements():
|
|
logger.warning("[%s] Node.js not found. WhatsApp requires Node.js.", self.name)
|
|
return False
|
|
|
|
bridge_path = Path(self._bridge_script)
|
|
if not bridge_path.exists():
|
|
logger.warning("[%s] Bridge script not found: %s", self.name, bridge_path)
|
|
return False
|
|
|
|
logger.info("[%s] Bridge found at %s", self.name, bridge_path)
|
|
|
|
# Acquire scoped lock to prevent duplicate sessions
|
|
try:
|
|
from gateway.status import acquire_scoped_lock
|
|
|
|
self._session_lock_identity = str(self._session_path)
|
|
acquired, existing = acquire_scoped_lock(
|
|
"whatsapp-session",
|
|
self._session_lock_identity,
|
|
metadata={"platform": self.platform.value},
|
|
)
|
|
if not acquired:
|
|
owner_pid = existing.get("pid") if isinstance(existing, dict) else None
|
|
message = (
|
|
"Another local Hermes gateway is already using this WhatsApp session"
|
|
+ (f" (PID {owner_pid})." if owner_pid else ".")
|
|
+ " Stop the other gateway before starting a second WhatsApp bridge."
|
|
)
|
|
logger.error("[%s] %s", self.name, message)
|
|
self._set_fatal_error("whatsapp_session_lock", message, retryable=False)
|
|
return False
|
|
except Exception as e:
|
|
logger.warning("[%s] Could not acquire session lock (non-fatal): %s", self.name, e)
|
|
|
|
# Auto-install npm dependencies if node_modules doesn't exist
|
|
bridge_dir = bridge_path.parent
|
|
if not (bridge_dir / "node_modules").exists():
|
|
print(f"[{self.name}] Installing WhatsApp bridge dependencies...")
|
|
try:
|
|
install_result = subprocess.run(
|
|
["npm", "install", "--silent"],
|
|
cwd=str(bridge_dir),
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=60,
|
|
)
|
|
if install_result.returncode != 0:
|
|
print(f"[{self.name}] npm install failed: {install_result.stderr}")
|
|
return False
|
|
print(f"[{self.name}] Dependencies installed")
|
|
except Exception as e:
|
|
print(f"[{self.name}] Failed to install dependencies: {e}")
|
|
return False
|
|
|
|
try:
|
|
# Ensure session directory exists
|
|
self._session_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Check if bridge is already running and connected
|
|
import aiohttp
|
|
import asyncio
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(
|
|
f"http://127.0.0.1:{self._bridge_port}/health",
|
|
timeout=aiohttp.ClientTimeout(total=2)
|
|
) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
bridge_status = data.get("status", "unknown")
|
|
if bridge_status == "connected":
|
|
print(f"[{self.name}] Using existing bridge (status: {bridge_status})")
|
|
self._mark_connected()
|
|
self._bridge_process = None # Not managed by us
|
|
self._http_session = aiohttp.ClientSession()
|
|
self._poll_task = asyncio.create_task(self._poll_messages())
|
|
return True
|
|
else:
|
|
print(f"[{self.name}] Bridge found but not connected (status: {bridge_status}), restarting")
|
|
except Exception:
|
|
pass # Bridge not running, start a new one
|
|
|
|
# Kill any orphaned bridge from a previous gateway run
|
|
_kill_port_process(self._bridge_port)
|
|
await asyncio.sleep(1)
|
|
|
|
# Start the bridge process in its own process group.
|
|
# Route output to a log file so QR codes, errors, and reconnection
|
|
# messages are preserved for troubleshooting.
|
|
whatsapp_mode = os.getenv("WHATSAPP_MODE", "self-chat")
|
|
self._bridge_log = self._session_path.parent / "bridge.log"
|
|
bridge_log_fh = open(self._bridge_log, "a")
|
|
self._bridge_log_fh = bridge_log_fh
|
|
|
|
# Build bridge subprocess environment.
|
|
# Pass WHATSAPP_REPLY_PREFIX from config.yaml so the Node bridge
|
|
# can use it without the user needing to set a separate env var.
|
|
bridge_env = os.environ.copy()
|
|
if self._reply_prefix is not None:
|
|
bridge_env["WHATSAPP_REPLY_PREFIX"] = self._reply_prefix
|
|
|
|
self._bridge_process = subprocess.Popen(
|
|
[
|
|
"node",
|
|
str(bridge_path),
|
|
"--port", str(self._bridge_port),
|
|
"--session", str(self._session_path),
|
|
"--mode", whatsapp_mode,
|
|
],
|
|
stdout=bridge_log_fh,
|
|
stderr=bridge_log_fh,
|
|
preexec_fn=None if _IS_WINDOWS else os.setsid,
|
|
env=bridge_env,
|
|
)
|
|
|
|
# Wait for the bridge to connect to WhatsApp.
|
|
# Phase 1: wait for the HTTP server to come up (up to 15s).
|
|
# Phase 2: wait for WhatsApp status: connected (up to 15s more).
|
|
import aiohttp
|
|
http_ready = False
|
|
data = {}
|
|
for attempt in range(15):
|
|
await asyncio.sleep(1)
|
|
if self._bridge_process.poll() is not None:
|
|
print(f"[{self.name}] Bridge process died (exit code {self._bridge_process.returncode})")
|
|
print(f"[{self.name}] Check log: {self._bridge_log}")
|
|
self._close_bridge_log()
|
|
return False
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(
|
|
f"http://127.0.0.1:{self._bridge_port}/health",
|
|
timeout=aiohttp.ClientTimeout(total=2)
|
|
) as resp:
|
|
if resp.status == 200:
|
|
http_ready = True
|
|
data = await resp.json()
|
|
if data.get("status") == "connected":
|
|
print(f"[{self.name}] Bridge ready (status: connected)")
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
if not http_ready:
|
|
print(f"[{self.name}] Bridge HTTP server did not start in 15s")
|
|
print(f"[{self.name}] Check log: {self._bridge_log}")
|
|
self._close_bridge_log()
|
|
return False
|
|
|
|
# Phase 2: HTTP is up but WhatsApp may still be connecting.
|
|
# Give it more time to authenticate with saved credentials.
|
|
if data.get("status") != "connected":
|
|
print(f"[{self.name}] Bridge HTTP ready, waiting for WhatsApp connection...")
|
|
for attempt in range(15):
|
|
await asyncio.sleep(1)
|
|
if self._bridge_process.poll() is not None:
|
|
print(f"[{self.name}] Bridge process died during connection")
|
|
print(f"[{self.name}] Check log: {self._bridge_log}")
|
|
self._close_bridge_log()
|
|
return False
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(
|
|
f"http://127.0.0.1:{self._bridge_port}/health",
|
|
timeout=aiohttp.ClientTimeout(total=2)
|
|
) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
if data.get("status") == "connected":
|
|
print(f"[{self.name}] Bridge ready (status: connected)")
|
|
break
|
|
except Exception:
|
|
continue
|
|
else:
|
|
# Still not connected — warn but proceed (bridge may
|
|
# auto-reconnect later, e.g. after a code 515 restart).
|
|
print(f"[{self.name}] ⚠ WhatsApp not connected after 30s")
|
|
print(f"[{self.name}] Bridge log: {self._bridge_log}")
|
|
print(f"[{self.name}] If session expired, re-pair: hermes whatsapp")
|
|
|
|
# Create a persistent HTTP session for all bridge communication
|
|
self._http_session = aiohttp.ClientSession()
|
|
|
|
# Start message polling task
|
|
self._poll_task = asyncio.create_task(self._poll_messages())
|
|
|
|
self._mark_connected()
|
|
print(f"[{self.name}] Bridge started on port {self._bridge_port}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
if self._session_lock_identity:
|
|
try:
|
|
from gateway.status import release_scoped_lock
|
|
release_scoped_lock("whatsapp-session", self._session_lock_identity)
|
|
except Exception:
|
|
pass
|
|
logger.error("[%s] Failed to start bridge: %s", self.name, e, exc_info=True)
|
|
self._close_bridge_log()
|
|
return False
|
|
|
|
def _close_bridge_log(self) -> None:
|
|
"""Close the bridge log file handle if open."""
|
|
if self._bridge_log_fh:
|
|
try:
|
|
self._bridge_log_fh.close()
|
|
except Exception:
|
|
pass
|
|
self._bridge_log_fh = None
|
|
|
|
async def _check_managed_bridge_exit(self) -> Optional[str]:
|
|
"""Return a fatal error message if the managed bridge child exited."""
|
|
if self._bridge_process is None:
|
|
return None
|
|
|
|
returncode = self._bridge_process.poll()
|
|
if returncode is None:
|
|
return None
|
|
|
|
message = f"WhatsApp bridge process exited unexpectedly (code {returncode})."
|
|
if not self.has_fatal_error:
|
|
logger.error("[%s] %s", self.name, message)
|
|
self._set_fatal_error("whatsapp_bridge_exited", message, retryable=True)
|
|
self._close_bridge_log()
|
|
await self._notify_fatal_error()
|
|
return self.fatal_error_message or message
|
|
|
|
async def disconnect(self) -> None:
|
|
"""Stop the WhatsApp bridge and clean up any orphaned processes."""
|
|
if self._bridge_process:
|
|
try:
|
|
# Kill the entire process group so child node processes die too
|
|
import signal
|
|
try:
|
|
if _IS_WINDOWS:
|
|
self._bridge_process.terminate()
|
|
else:
|
|
os.killpg(os.getpgid(self._bridge_process.pid), signal.SIGTERM)
|
|
except (ProcessLookupError, PermissionError):
|
|
self._bridge_process.terminate()
|
|
await asyncio.sleep(1)
|
|
if self._bridge_process.poll() is None:
|
|
try:
|
|
if _IS_WINDOWS:
|
|
self._bridge_process.kill()
|
|
else:
|
|
os.killpg(os.getpgid(self._bridge_process.pid), signal.SIGKILL)
|
|
except (ProcessLookupError, PermissionError):
|
|
self._bridge_process.kill()
|
|
except Exception as e:
|
|
print(f"[{self.name}] Error stopping bridge: {e}")
|
|
else:
|
|
# Bridge was not started by us, don't kill it
|
|
print(f"[{self.name}] Disconnecting (external bridge left running)")
|
|
|
|
# Cancel the poll task explicitly
|
|
if self._poll_task and not self._poll_task.done():
|
|
self._poll_task.cancel()
|
|
try:
|
|
await self._poll_task
|
|
except (asyncio.CancelledError, Exception):
|
|
pass
|
|
self._poll_task = None
|
|
|
|
# Close the persistent HTTP session
|
|
if self._http_session and not self._http_session.closed:
|
|
await self._http_session.close()
|
|
self._http_session = None
|
|
|
|
if self._session_lock_identity:
|
|
try:
|
|
from gateway.status import release_scoped_lock
|
|
release_scoped_lock("whatsapp-session", self._session_lock_identity)
|
|
except Exception as e:
|
|
logger.warning("[%s] Error releasing WhatsApp session lock: %s", self.name, e, exc_info=True)
|
|
|
|
self._mark_disconnected()
|
|
self._bridge_process = None
|
|
self._close_bridge_log()
|
|
self._session_lock_identity = None
|
|
print(f"[{self.name}] Disconnected")
|
|
|
|
async def send(
|
|
self,
|
|
chat_id: str,
|
|
content: str,
|
|
reply_to: Optional[str] = None,
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
) -> SendResult:
|
|
"""Send a message via the WhatsApp bridge."""
|
|
if not self._running or not self._http_session:
|
|
return SendResult(success=False, error="Not connected")
|
|
bridge_exit = await self._check_managed_bridge_exit()
|
|
if bridge_exit:
|
|
return SendResult(success=False, error=bridge_exit)
|
|
|
|
try:
|
|
import aiohttp
|
|
|
|
payload = {
|
|
"chatId": chat_id,
|
|
"message": content,
|
|
}
|
|
if reply_to:
|
|
payload["replyTo"] = reply_to
|
|
|
|
async with self._http_session.post(
|
|
f"http://127.0.0.1:{self._bridge_port}/send",
|
|
json=payload,
|
|
timeout=aiohttp.ClientTimeout(total=30)
|
|
) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
return SendResult(
|
|
success=True,
|
|
message_id=data.get("messageId"),
|
|
raw_response=data
|
|
)
|
|
else:
|
|
error = await resp.text()
|
|
return SendResult(success=False, error=error)
|
|
except Exception as e:
|
|
return SendResult(success=False, error=str(e))
|
|
|
|
async def edit_message(
|
|
self,
|
|
chat_id: str,
|
|
message_id: str,
|
|
content: str,
|
|
) -> SendResult:
|
|
"""Edit a previously sent message via the WhatsApp bridge."""
|
|
if not self._running or not self._http_session:
|
|
return SendResult(success=False, error="Not connected")
|
|
bridge_exit = await self._check_managed_bridge_exit()
|
|
if bridge_exit:
|
|
return SendResult(success=False, error=bridge_exit)
|
|
try:
|
|
import aiohttp
|
|
async with self._http_session.post(
|
|
f"http://127.0.0.1:{self._bridge_port}/edit",
|
|
json={
|
|
"chatId": chat_id,
|
|
"messageId": message_id,
|
|
"message": content,
|
|
},
|
|
timeout=aiohttp.ClientTimeout(total=15)
|
|
) as resp:
|
|
if resp.status == 200:
|
|
return SendResult(success=True, message_id=message_id)
|
|
else:
|
|
error = await resp.text()
|
|
return SendResult(success=False, error=error)
|
|
except Exception as e:
|
|
return SendResult(success=False, error=str(e))
|
|
|
|
async def _send_media_to_bridge(
|
|
self,
|
|
chat_id: str,
|
|
file_path: str,
|
|
media_type: str,
|
|
caption: Optional[str] = None,
|
|
file_name: Optional[str] = None,
|
|
) -> SendResult:
|
|
"""Send any media file via bridge /send-media endpoint."""
|
|
if not self._running or not self._http_session:
|
|
return SendResult(success=False, error="Not connected")
|
|
bridge_exit = await self._check_managed_bridge_exit()
|
|
if bridge_exit:
|
|
return SendResult(success=False, error=bridge_exit)
|
|
try:
|
|
import aiohttp
|
|
|
|
if not os.path.exists(file_path):
|
|
return SendResult(success=False, error=f"File not found: {file_path}")
|
|
|
|
payload: Dict[str, Any] = {
|
|
"chatId": chat_id,
|
|
"filePath": file_path,
|
|
"mediaType": media_type,
|
|
}
|
|
if caption:
|
|
payload["caption"] = caption
|
|
if file_name:
|
|
payload["fileName"] = file_name
|
|
|
|
async with self._http_session.post(
|
|
f"http://127.0.0.1:{self._bridge_port}/send-media",
|
|
json=payload,
|
|
timeout=aiohttp.ClientTimeout(total=120),
|
|
) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
return SendResult(
|
|
success=True,
|
|
message_id=data.get("messageId"),
|
|
raw_response=data,
|
|
)
|
|
else:
|
|
error = await resp.text()
|
|
return SendResult(success=False, error=error)
|
|
|
|
except Exception as e:
|
|
return SendResult(success=False, error=str(e))
|
|
|
|
async def send_image(
|
|
self,
|
|
chat_id: str,
|
|
image_url: str,
|
|
caption: Optional[str] = None,
|
|
reply_to: Optional[str] = None,
|
|
) -> SendResult:
|
|
"""Download image URL to cache, send natively via bridge."""
|
|
try:
|
|
local_path = await cache_image_from_url(image_url)
|
|
return await self._send_media_to_bridge(chat_id, local_path, "image", caption)
|
|
except Exception:
|
|
return await super().send_image(chat_id, image_url, caption, reply_to)
|
|
|
|
async def send_image_file(
|
|
self,
|
|
chat_id: str,
|
|
image_path: str,
|
|
caption: Optional[str] = None,
|
|
reply_to: Optional[str] = None,
|
|
**kwargs,
|
|
) -> SendResult:
|
|
"""Send a local image file natively via bridge."""
|
|
return await self._send_media_to_bridge(chat_id, image_path, "image", caption)
|
|
|
|
async def send_video(
|
|
self,
|
|
chat_id: str,
|
|
video_path: str,
|
|
caption: Optional[str] = None,
|
|
reply_to: Optional[str] = None,
|
|
**kwargs,
|
|
) -> SendResult:
|
|
"""Send a video natively via bridge — plays inline in WhatsApp."""
|
|
return await self._send_media_to_bridge(chat_id, video_path, "video", caption)
|
|
|
|
async def send_document(
|
|
self,
|
|
chat_id: str,
|
|
file_path: str,
|
|
caption: Optional[str] = None,
|
|
file_name: Optional[str] = None,
|
|
reply_to: Optional[str] = None,
|
|
**kwargs,
|
|
) -> SendResult:
|
|
"""Send a document/file as a downloadable attachment via bridge."""
|
|
return await self._send_media_to_bridge(
|
|
chat_id, file_path, "document", caption,
|
|
file_name or os.path.basename(file_path),
|
|
)
|
|
|
|
async def send_typing(self, chat_id: str, metadata=None) -> None:
|
|
"""Send typing indicator via bridge."""
|
|
if not self._running or not self._http_session:
|
|
return
|
|
if await self._check_managed_bridge_exit():
|
|
return
|
|
|
|
try:
|
|
import aiohttp
|
|
|
|
await self._http_session.post(
|
|
f"http://127.0.0.1:{self._bridge_port}/typing",
|
|
json={"chatId": chat_id},
|
|
timeout=aiohttp.ClientTimeout(total=5)
|
|
)
|
|
except Exception:
|
|
pass # Ignore typing indicator failures
|
|
|
|
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
|
|
"""Get information about a WhatsApp chat."""
|
|
if not self._running or not self._http_session:
|
|
return {"name": "Unknown", "type": "dm"}
|
|
if await self._check_managed_bridge_exit():
|
|
return {"name": chat_id, "type": "dm"}
|
|
|
|
try:
|
|
import aiohttp
|
|
|
|
async with self._http_session.get(
|
|
f"http://127.0.0.1:{self._bridge_port}/chat/{chat_id}",
|
|
timeout=aiohttp.ClientTimeout(total=10)
|
|
) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
return {
|
|
"name": data.get("name", chat_id),
|
|
"type": "group" if data.get("isGroup") else "dm",
|
|
"participants": data.get("participants", []),
|
|
}
|
|
except Exception as e:
|
|
logger.debug("Could not get WhatsApp chat info for %s: %s", chat_id, e)
|
|
|
|
return {"name": chat_id, "type": "dm"}
|
|
|
|
async def _poll_messages(self) -> None:
|
|
"""Poll the bridge for incoming messages."""
|
|
import aiohttp
|
|
|
|
while self._running:
|
|
if not self._http_session:
|
|
break
|
|
bridge_exit = await self._check_managed_bridge_exit()
|
|
if bridge_exit:
|
|
print(f"[{self.name}] {bridge_exit}")
|
|
break
|
|
try:
|
|
async with self._http_session.get(
|
|
f"http://127.0.0.1:{self._bridge_port}/messages",
|
|
timeout=aiohttp.ClientTimeout(total=30)
|
|
) as resp:
|
|
if resp.status == 200:
|
|
messages = await resp.json()
|
|
for msg_data in messages:
|
|
event = await self._build_message_event(msg_data)
|
|
if event:
|
|
await self.handle_message(event)
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
bridge_exit = await self._check_managed_bridge_exit()
|
|
if bridge_exit:
|
|
print(f"[{self.name}] {bridge_exit}")
|
|
break
|
|
print(f"[{self.name}] Poll error: {e}")
|
|
await asyncio.sleep(5)
|
|
|
|
await asyncio.sleep(1) # Poll interval
|
|
|
|
async def _build_message_event(self, data: Dict[str, Any]) -> Optional[MessageEvent]:
|
|
"""Build a MessageEvent from bridge message data, downloading images to cache."""
|
|
try:
|
|
# Determine message type
|
|
msg_type = MessageType.TEXT
|
|
if data.get("hasMedia"):
|
|
media_type = data.get("mediaType", "")
|
|
if "image" in media_type:
|
|
msg_type = MessageType.PHOTO
|
|
elif "video" in media_type:
|
|
msg_type = MessageType.VIDEO
|
|
elif "audio" in media_type or "ptt" in media_type: # ptt = voice note
|
|
msg_type = MessageType.VOICE
|
|
else:
|
|
msg_type = MessageType.DOCUMENT
|
|
|
|
# Determine chat type
|
|
is_group = data.get("isGroup", False)
|
|
chat_type = "group" if is_group else "dm"
|
|
|
|
# Build source
|
|
source = self.build_source(
|
|
chat_id=data.get("chatId", ""),
|
|
chat_name=data.get("chatName"),
|
|
chat_type=chat_type,
|
|
user_id=data.get("senderId"),
|
|
user_name=data.get("senderName"),
|
|
)
|
|
|
|
# Download media URLs to the local cache so agent tools
|
|
# can access them reliably regardless of URL expiration.
|
|
raw_urls = data.get("mediaUrls", [])
|
|
cached_urls = []
|
|
media_types = []
|
|
for url in raw_urls:
|
|
if msg_type == MessageType.PHOTO and url.startswith(("http://", "https://")):
|
|
try:
|
|
cached_path = await cache_image_from_url(url, ext=".jpg")
|
|
cached_urls.append(cached_path)
|
|
media_types.append("image/jpeg")
|
|
print(f"[{self.name}] Cached user image: {cached_path}", flush=True)
|
|
except Exception as e:
|
|
print(f"[{self.name}] Failed to cache image: {e}", flush=True)
|
|
cached_urls.append(url)
|
|
media_types.append("image/jpeg")
|
|
elif msg_type == MessageType.PHOTO and os.path.isabs(url):
|
|
# Local file path — bridge already downloaded the image
|
|
cached_urls.append(url)
|
|
media_types.append("image/jpeg")
|
|
print(f"[{self.name}] Using bridge-cached image: {url}", flush=True)
|
|
elif msg_type == MessageType.VOICE and url.startswith(("http://", "https://")):
|
|
try:
|
|
cached_path = await cache_audio_from_url(url, ext=".ogg")
|
|
cached_urls.append(cached_path)
|
|
media_types.append("audio/ogg")
|
|
print(f"[{self.name}] Cached user voice: {cached_path}", flush=True)
|
|
except Exception as e:
|
|
print(f"[{self.name}] Failed to cache voice: {e}", flush=True)
|
|
cached_urls.append(url)
|
|
media_types.append("audio/ogg")
|
|
elif msg_type == MessageType.VOICE and os.path.isabs(url):
|
|
# Local file path — bridge already downloaded the audio
|
|
cached_urls.append(url)
|
|
media_types.append("audio/ogg")
|
|
print(f"[{self.name}] Using bridge-cached audio: {url}", flush=True)
|
|
elif msg_type == MessageType.DOCUMENT and os.path.isabs(url):
|
|
# Local file path — bridge already downloaded the document
|
|
cached_urls.append(url)
|
|
ext = Path(url).suffix.lower()
|
|
mime = SUPPORTED_DOCUMENT_TYPES.get(ext, "application/octet-stream")
|
|
media_types.append(mime)
|
|
print(f"[{self.name}] Using bridge-cached document: {url}", flush=True)
|
|
elif msg_type == MessageType.VIDEO and os.path.isabs(url):
|
|
cached_urls.append(url)
|
|
media_types.append("video/mp4")
|
|
print(f"[{self.name}] Using bridge-cached video: {url}", flush=True)
|
|
else:
|
|
cached_urls.append(url)
|
|
media_types.append("unknown")
|
|
|
|
# For text-readable documents, inject file content directly into
|
|
# the message text so the agent can read it inline.
|
|
# Cap at 100KB to match Telegram/Discord/Slack behaviour.
|
|
body = data.get("body", "")
|
|
MAX_TEXT_INJECT_BYTES = 100 * 1024
|
|
if msg_type == MessageType.DOCUMENT and cached_urls:
|
|
for doc_path in cached_urls:
|
|
ext = Path(doc_path).suffix.lower()
|
|
if ext in (".txt", ".md", ".csv", ".json", ".xml", ".yaml", ".yml", ".log", ".py", ".js", ".ts", ".html", ".css"):
|
|
try:
|
|
file_size = Path(doc_path).stat().st_size
|
|
if file_size > MAX_TEXT_INJECT_BYTES:
|
|
print(f"[{self.name}] Skipping text injection for {doc_path} ({file_size} bytes > {MAX_TEXT_INJECT_BYTES})", flush=True)
|
|
continue
|
|
content = Path(doc_path).read_text(errors="replace")
|
|
fname = Path(doc_path).name
|
|
# Remove the doc_<hex>_ prefix for display
|
|
display_name = fname
|
|
if "_" in fname:
|
|
parts = fname.split("_", 2)
|
|
if len(parts) >= 3:
|
|
display_name = parts[2]
|
|
injection = f"[Content of {display_name}]:\n{content}"
|
|
if body:
|
|
body = f"{injection}\n\n{body}"
|
|
else:
|
|
body = injection
|
|
print(f"[{self.name}] Injected text content from: {doc_path}", flush=True)
|
|
except Exception as e:
|
|
print(f"[{self.name}] Failed to read document text: {e}", flush=True)
|
|
|
|
return MessageEvent(
|
|
text=body,
|
|
message_type=msg_type,
|
|
source=source,
|
|
raw_message=data,
|
|
message_id=data.get("messageId"),
|
|
media_urls=cached_urls,
|
|
media_types=media_types,
|
|
)
|
|
except Exception as e:
|
|
print(f"[{self.name}] Error building event: {e}")
|
|
return None
|