""" WhatsApp platform adapter. WhatsApp integration is more complex than Telegram/Discord because: - No official bot API for personal accounts - Business API requires Meta Business verification - Most solutions use web-based automation This adapter supports multiple backends: 1. WhatsApp Business API (requires Meta verification) 2. whatsapp-web.js (via Node.js subprocess) - for personal accounts 3. Baileys (via Node.js subprocess) - alternative for personal accounts For simplicity, we'll implement a generic interface that can work with different backends via a bridge pattern. """ import asyncio import logging import os import platform import subprocess _IS_WINDOWS = platform.system() == "Windows" from pathlib import Path from typing import Dict, Optional, Any from hermes_cli.config import get_hermes_home from hermes_constants import get_hermes_dir logger = logging.getLogger(__name__) def _kill_port_process(port: int) -> None: """Kill any process listening on the given TCP port.""" try: if _IS_WINDOWS: # Use netstat to find the PID bound to this port, then taskkill result = subprocess.run( ["netstat", "-ano", "-p", "TCP"], capture_output=True, text=True, timeout=5, ) for line in result.stdout.splitlines(): parts = line.split() if len(parts) >= 5 and parts[3] == "LISTENING": local_addr = parts[1] if local_addr.endswith(f":{port}"): try: subprocess.run( ["taskkill", "/PID", parts[4], "/F"], capture_output=True, timeout=5, ) except subprocess.SubprocessError: pass else: result = subprocess.run( ["fuser", f"{port}/tcp"], capture_output=True, timeout=5, ) if result.returncode == 0: subprocess.run( ["fuser", "-k", f"{port}/tcp"], capture_output=True, timeout=5, ) except Exception: pass import sys sys.path.insert(0, str(Path(__file__).resolve().parents[2])) from gateway.config import Platform, PlatformConfig from gateway.platforms.base import ( BasePlatformAdapter, MessageEvent, MessageType, SendResult, SUPPORTED_DOCUMENT_TYPES, cache_image_from_url, cache_audio_from_url, ) def check_whatsapp_requirements() -> bool: """ Check if WhatsApp dependencies are available. WhatsApp requires a Node.js bridge for most implementations. """ # Check for Node.js try: result = subprocess.run( ["node", "--version"], capture_output=True, text=True, timeout=5 ) return result.returncode == 0 except Exception: return False class WhatsAppAdapter(BasePlatformAdapter): """ WhatsApp adapter. This implementation uses a simple HTTP bridge pattern where: 1. A Node.js process runs the WhatsApp Web client 2. Messages are forwarded via HTTP/IPC to this Python adapter 3. Responses are sent back through the bridge The actual Node.js bridge implementation can vary: - whatsapp-web.js based - Baileys based - Business API based Configuration: - bridge_script: Path to the Node.js bridge script - bridge_port: Port for HTTP communication (default: 3000) - session_path: Path to store WhatsApp session data """ # WhatsApp message limits MAX_MESSAGE_LENGTH = 65536 # WhatsApp allows longer messages # Default bridge location relative to the hermes-agent install _DEFAULT_BRIDGE_DIR = Path(__file__).resolve().parents[2] / "scripts" / "whatsapp-bridge" def __init__(self, config: PlatformConfig): super().__init__(config, Platform.WHATSAPP) self._bridge_process: Optional[subprocess.Popen] = None self._bridge_port: int = config.extra.get("bridge_port", 3000) self._bridge_script: Optional[str] = config.extra.get( "bridge_script", str(self._DEFAULT_BRIDGE_DIR / "bridge.js"), ) self._session_path: Path = Path(config.extra.get( "session_path", get_hermes_dir("platforms/whatsapp/session", "whatsapp/session") )) self._reply_prefix: Optional[str] = config.extra.get("reply_prefix") self._message_queue: asyncio.Queue = asyncio.Queue() self._bridge_log_fh = None self._bridge_log: Optional[Path] = None self._poll_task: Optional[asyncio.Task] = None self._http_session: Optional["aiohttp.ClientSession"] = None self._session_lock_identity: Optional[str] = None async def connect(self) -> bool: """ Start the WhatsApp bridge. This launches the Node.js bridge process and waits for it to be ready. """ if not check_whatsapp_requirements(): logger.warning("[%s] Node.js not found. WhatsApp requires Node.js.", self.name) return False bridge_path = Path(self._bridge_script) if not bridge_path.exists(): logger.warning("[%s] Bridge script not found: %s", self.name, bridge_path) return False logger.info("[%s] Bridge found at %s", self.name, bridge_path) # Acquire scoped lock to prevent duplicate sessions try: from gateway.status import acquire_scoped_lock self._session_lock_identity = str(self._session_path) acquired, existing = acquire_scoped_lock( "whatsapp-session", self._session_lock_identity, metadata={"platform": self.platform.value}, ) if not acquired: owner_pid = existing.get("pid") if isinstance(existing, dict) else None message = ( "Another local Hermes gateway is already using this WhatsApp session" + (f" (PID {owner_pid})." if owner_pid else ".") + " Stop the other gateway before starting a second WhatsApp bridge." ) logger.error("[%s] %s", self.name, message) self._set_fatal_error("whatsapp_session_lock", message, retryable=False) return False except Exception as e: logger.warning("[%s] Could not acquire session lock (non-fatal): %s", self.name, e) # Auto-install npm dependencies if node_modules doesn't exist bridge_dir = bridge_path.parent if not (bridge_dir / "node_modules").exists(): print(f"[{self.name}] Installing WhatsApp bridge dependencies...") try: install_result = subprocess.run( ["npm", "install", "--silent"], cwd=str(bridge_dir), capture_output=True, text=True, timeout=60, ) if install_result.returncode != 0: print(f"[{self.name}] npm install failed: {install_result.stderr}") return False print(f"[{self.name}] Dependencies installed") except Exception as e: print(f"[{self.name}] Failed to install dependencies: {e}") return False try: # Ensure session directory exists self._session_path.mkdir(parents=True, exist_ok=True) # Check if bridge is already running and connected import aiohttp import asyncio try: async with aiohttp.ClientSession() as session: async with session.get( f"http://127.0.0.1:{self._bridge_port}/health", timeout=aiohttp.ClientTimeout(total=2) ) as resp: if resp.status == 200: data = await resp.json() bridge_status = data.get("status", "unknown") if bridge_status == "connected": print(f"[{self.name}] Using existing bridge (status: {bridge_status})") self._mark_connected() self._bridge_process = None # Not managed by us self._http_session = aiohttp.ClientSession() self._poll_task = asyncio.create_task(self._poll_messages()) return True else: print(f"[{self.name}] Bridge found but not connected (status: {bridge_status}), restarting") except Exception: pass # Bridge not running, start a new one # Kill any orphaned bridge from a previous gateway run _kill_port_process(self._bridge_port) await asyncio.sleep(1) # Start the bridge process in its own process group. # Route output to a log file so QR codes, errors, and reconnection # messages are preserved for troubleshooting. whatsapp_mode = os.getenv("WHATSAPP_MODE", "self-chat") self._bridge_log = self._session_path.parent / "bridge.log" bridge_log_fh = open(self._bridge_log, "a") self._bridge_log_fh = bridge_log_fh # Build bridge subprocess environment. # Pass WHATSAPP_REPLY_PREFIX from config.yaml so the Node bridge # can use it without the user needing to set a separate env var. bridge_env = os.environ.copy() if self._reply_prefix is not None: bridge_env["WHATSAPP_REPLY_PREFIX"] = self._reply_prefix self._bridge_process = subprocess.Popen( [ "node", str(bridge_path), "--port", str(self._bridge_port), "--session", str(self._session_path), "--mode", whatsapp_mode, ], stdout=bridge_log_fh, stderr=bridge_log_fh, preexec_fn=None if _IS_WINDOWS else os.setsid, env=bridge_env, ) # Wait for the bridge to connect to WhatsApp. # Phase 1: wait for the HTTP server to come up (up to 15s). # Phase 2: wait for WhatsApp status: connected (up to 15s more). import aiohttp http_ready = False data = {} for attempt in range(15): await asyncio.sleep(1) if self._bridge_process.poll() is not None: print(f"[{self.name}] Bridge process died (exit code {self._bridge_process.returncode})") print(f"[{self.name}] Check log: {self._bridge_log}") self._close_bridge_log() return False try: async with aiohttp.ClientSession() as session: async with session.get( f"http://127.0.0.1:{self._bridge_port}/health", timeout=aiohttp.ClientTimeout(total=2) ) as resp: if resp.status == 200: http_ready = True data = await resp.json() if data.get("status") == "connected": print(f"[{self.name}] Bridge ready (status: connected)") break except Exception: continue if not http_ready: print(f"[{self.name}] Bridge HTTP server did not start in 15s") print(f"[{self.name}] Check log: {self._bridge_log}") self._close_bridge_log() return False # Phase 2: HTTP is up but WhatsApp may still be connecting. # Give it more time to authenticate with saved credentials. if data.get("status") != "connected": print(f"[{self.name}] Bridge HTTP ready, waiting for WhatsApp connection...") for attempt in range(15): await asyncio.sleep(1) if self._bridge_process.poll() is not None: print(f"[{self.name}] Bridge process died during connection") print(f"[{self.name}] Check log: {self._bridge_log}") self._close_bridge_log() return False try: async with aiohttp.ClientSession() as session: async with session.get( f"http://127.0.0.1:{self._bridge_port}/health", timeout=aiohttp.ClientTimeout(total=2) ) as resp: if resp.status == 200: data = await resp.json() if data.get("status") == "connected": print(f"[{self.name}] Bridge ready (status: connected)") break except Exception: continue else: # Still not connected — warn but proceed (bridge may # auto-reconnect later, e.g. after a code 515 restart). print(f"[{self.name}] ⚠ WhatsApp not connected after 30s") print(f"[{self.name}] Bridge log: {self._bridge_log}") print(f"[{self.name}] If session expired, re-pair: hermes whatsapp") # Create a persistent HTTP session for all bridge communication self._http_session = aiohttp.ClientSession() # Start message polling task self._poll_task = asyncio.create_task(self._poll_messages()) self._mark_connected() print(f"[{self.name}] Bridge started on port {self._bridge_port}") return True except Exception as e: if self._session_lock_identity: try: from gateway.status import release_scoped_lock release_scoped_lock("whatsapp-session", self._session_lock_identity) except Exception: pass logger.error("[%s] Failed to start bridge: %s", self.name, e, exc_info=True) self._close_bridge_log() return False def _close_bridge_log(self) -> None: """Close the bridge log file handle if open.""" if self._bridge_log_fh: try: self._bridge_log_fh.close() except Exception: pass self._bridge_log_fh = None async def _check_managed_bridge_exit(self) -> Optional[str]: """Return a fatal error message if the managed bridge child exited.""" if self._bridge_process is None: return None returncode = self._bridge_process.poll() if returncode is None: return None message = f"WhatsApp bridge process exited unexpectedly (code {returncode})." if not self.has_fatal_error: logger.error("[%s] %s", self.name, message) self._set_fatal_error("whatsapp_bridge_exited", message, retryable=True) self._close_bridge_log() await self._notify_fatal_error() return self.fatal_error_message or message async def disconnect(self) -> None: """Stop the WhatsApp bridge and clean up any orphaned processes.""" if self._bridge_process: try: # Kill the entire process group so child node processes die too import signal try: if _IS_WINDOWS: self._bridge_process.terminate() else: os.killpg(os.getpgid(self._bridge_process.pid), signal.SIGTERM) except (ProcessLookupError, PermissionError): self._bridge_process.terminate() await asyncio.sleep(1) if self._bridge_process.poll() is None: try: if _IS_WINDOWS: self._bridge_process.kill() else: os.killpg(os.getpgid(self._bridge_process.pid), signal.SIGKILL) except (ProcessLookupError, PermissionError): self._bridge_process.kill() except Exception as e: print(f"[{self.name}] Error stopping bridge: {e}") else: # Bridge was not started by us, don't kill it print(f"[{self.name}] Disconnecting (external bridge left running)") # Cancel the poll task explicitly if self._poll_task and not self._poll_task.done(): self._poll_task.cancel() try: await self._poll_task except (asyncio.CancelledError, Exception): pass self._poll_task = None # Close the persistent HTTP session if self._http_session and not self._http_session.closed: await self._http_session.close() self._http_session = None if self._session_lock_identity: try: from gateway.status import release_scoped_lock release_scoped_lock("whatsapp-session", self._session_lock_identity) except Exception as e: logger.warning("[%s] Error releasing WhatsApp session lock: %s", self.name, e, exc_info=True) self._mark_disconnected() self._bridge_process = None self._close_bridge_log() self._session_lock_identity = None print(f"[{self.name}] Disconnected") async def send( self, chat_id: str, content: str, reply_to: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None ) -> SendResult: """Send a message via the WhatsApp bridge.""" if not self._running or not self._http_session: return SendResult(success=False, error="Not connected") bridge_exit = await self._check_managed_bridge_exit() if bridge_exit: return SendResult(success=False, error=bridge_exit) try: import aiohttp payload = { "chatId": chat_id, "message": content, } if reply_to: payload["replyTo"] = reply_to async with self._http_session.post( f"http://127.0.0.1:{self._bridge_port}/send", json=payload, timeout=aiohttp.ClientTimeout(total=30) ) as resp: if resp.status == 200: data = await resp.json() return SendResult( success=True, message_id=data.get("messageId"), raw_response=data ) else: error = await resp.text() return SendResult(success=False, error=error) except Exception as e: return SendResult(success=False, error=str(e)) async def edit_message( self, chat_id: str, message_id: str, content: str, ) -> SendResult: """Edit a previously sent message via the WhatsApp bridge.""" if not self._running or not self._http_session: return SendResult(success=False, error="Not connected") bridge_exit = await self._check_managed_bridge_exit() if bridge_exit: return SendResult(success=False, error=bridge_exit) try: import aiohttp async with self._http_session.post( f"http://127.0.0.1:{self._bridge_port}/edit", json={ "chatId": chat_id, "messageId": message_id, "message": content, }, timeout=aiohttp.ClientTimeout(total=15) ) as resp: if resp.status == 200: return SendResult(success=True, message_id=message_id) else: error = await resp.text() return SendResult(success=False, error=error) except Exception as e: return SendResult(success=False, error=str(e)) async def _send_media_to_bridge( self, chat_id: str, file_path: str, media_type: str, caption: Optional[str] = None, file_name: Optional[str] = None, ) -> SendResult: """Send any media file via bridge /send-media endpoint.""" if not self._running or not self._http_session: return SendResult(success=False, error="Not connected") bridge_exit = await self._check_managed_bridge_exit() if bridge_exit: return SendResult(success=False, error=bridge_exit) try: import aiohttp if not os.path.exists(file_path): return SendResult(success=False, error=f"File not found: {file_path}") payload: Dict[str, Any] = { "chatId": chat_id, "filePath": file_path, "mediaType": media_type, } if caption: payload["caption"] = caption if file_name: payload["fileName"] = file_name async with self._http_session.post( f"http://127.0.0.1:{self._bridge_port}/send-media", json=payload, timeout=aiohttp.ClientTimeout(total=120), ) as resp: if resp.status == 200: data = await resp.json() return SendResult( success=True, message_id=data.get("messageId"), raw_response=data, ) else: error = await resp.text() return SendResult(success=False, error=error) except Exception as e: return SendResult(success=False, error=str(e)) async def send_image( self, chat_id: str, image_url: str, caption: Optional[str] = None, reply_to: Optional[str] = None, ) -> SendResult: """Download image URL to cache, send natively via bridge.""" try: local_path = await cache_image_from_url(image_url) return await self._send_media_to_bridge(chat_id, local_path, "image", caption) except Exception: return await super().send_image(chat_id, image_url, caption, reply_to) async def send_image_file( self, chat_id: str, image_path: str, caption: Optional[str] = None, reply_to: Optional[str] = None, **kwargs, ) -> SendResult: """Send a local image file natively via bridge.""" return await self._send_media_to_bridge(chat_id, image_path, "image", caption) async def send_video( self, chat_id: str, video_path: str, caption: Optional[str] = None, reply_to: Optional[str] = None, **kwargs, ) -> SendResult: """Send a video natively via bridge — plays inline in WhatsApp.""" return await self._send_media_to_bridge(chat_id, video_path, "video", caption) async def send_document( self, chat_id: str, file_path: str, caption: Optional[str] = None, file_name: Optional[str] = None, reply_to: Optional[str] = None, **kwargs, ) -> SendResult: """Send a document/file as a downloadable attachment via bridge.""" return await self._send_media_to_bridge( chat_id, file_path, "document", caption, file_name or os.path.basename(file_path), ) async def send_typing(self, chat_id: str, metadata=None) -> None: """Send typing indicator via bridge.""" if not self._running or not self._http_session: return if await self._check_managed_bridge_exit(): return try: import aiohttp await self._http_session.post( f"http://127.0.0.1:{self._bridge_port}/typing", json={"chatId": chat_id}, timeout=aiohttp.ClientTimeout(total=5) ) except Exception: pass # Ignore typing indicator failures async def get_chat_info(self, chat_id: str) -> Dict[str, Any]: """Get information about a WhatsApp chat.""" if not self._running or not self._http_session: return {"name": "Unknown", "type": "dm"} if await self._check_managed_bridge_exit(): return {"name": chat_id, "type": "dm"} try: import aiohttp async with self._http_session.get( f"http://127.0.0.1:{self._bridge_port}/chat/{chat_id}", timeout=aiohttp.ClientTimeout(total=10) ) as resp: if resp.status == 200: data = await resp.json() return { "name": data.get("name", chat_id), "type": "group" if data.get("isGroup") else "dm", "participants": data.get("participants", []), } except Exception as e: logger.debug("Could not get WhatsApp chat info for %s: %s", chat_id, e) return {"name": chat_id, "type": "dm"} async def _poll_messages(self) -> None: """Poll the bridge for incoming messages.""" import aiohttp while self._running: if not self._http_session: break bridge_exit = await self._check_managed_bridge_exit() if bridge_exit: print(f"[{self.name}] {bridge_exit}") break try: async with self._http_session.get( f"http://127.0.0.1:{self._bridge_port}/messages", timeout=aiohttp.ClientTimeout(total=30) ) as resp: if resp.status == 200: messages = await resp.json() for msg_data in messages: event = await self._build_message_event(msg_data) if event: await self.handle_message(event) except asyncio.CancelledError: break except Exception as e: bridge_exit = await self._check_managed_bridge_exit() if bridge_exit: print(f"[{self.name}] {bridge_exit}") break print(f"[{self.name}] Poll error: {e}") await asyncio.sleep(5) await asyncio.sleep(1) # Poll interval async def _build_message_event(self, data: Dict[str, Any]) -> Optional[MessageEvent]: """Build a MessageEvent from bridge message data, downloading images to cache.""" try: # Determine message type msg_type = MessageType.TEXT if data.get("hasMedia"): media_type = data.get("mediaType", "") if "image" in media_type: msg_type = MessageType.PHOTO elif "video" in media_type: msg_type = MessageType.VIDEO elif "audio" in media_type or "ptt" in media_type: # ptt = voice note msg_type = MessageType.VOICE else: msg_type = MessageType.DOCUMENT # Determine chat type is_group = data.get("isGroup", False) chat_type = "group" if is_group else "dm" # Build source source = self.build_source( chat_id=data.get("chatId", ""), chat_name=data.get("chatName"), chat_type=chat_type, user_id=data.get("senderId"), user_name=data.get("senderName"), ) # Download media URLs to the local cache so agent tools # can access them reliably regardless of URL expiration. raw_urls = data.get("mediaUrls", []) cached_urls = [] media_types = [] for url in raw_urls: if msg_type == MessageType.PHOTO and url.startswith(("http://", "https://")): try: cached_path = await cache_image_from_url(url, ext=".jpg") cached_urls.append(cached_path) media_types.append("image/jpeg") print(f"[{self.name}] Cached user image: {cached_path}", flush=True) except Exception as e: print(f"[{self.name}] Failed to cache image: {e}", flush=True) cached_urls.append(url) media_types.append("image/jpeg") elif msg_type == MessageType.PHOTO and os.path.isabs(url): # Local file path — bridge already downloaded the image cached_urls.append(url) media_types.append("image/jpeg") print(f"[{self.name}] Using bridge-cached image: {url}", flush=True) elif msg_type == MessageType.VOICE and url.startswith(("http://", "https://")): try: cached_path = await cache_audio_from_url(url, ext=".ogg") cached_urls.append(cached_path) media_types.append("audio/ogg") print(f"[{self.name}] Cached user voice: {cached_path}", flush=True) except Exception as e: print(f"[{self.name}] Failed to cache voice: {e}", flush=True) cached_urls.append(url) media_types.append("audio/ogg") elif msg_type == MessageType.VOICE and os.path.isabs(url): # Local file path — bridge already downloaded the audio cached_urls.append(url) media_types.append("audio/ogg") print(f"[{self.name}] Using bridge-cached audio: {url}", flush=True) elif msg_type == MessageType.DOCUMENT and os.path.isabs(url): # Local file path — bridge already downloaded the document cached_urls.append(url) ext = Path(url).suffix.lower() mime = SUPPORTED_DOCUMENT_TYPES.get(ext, "application/octet-stream") media_types.append(mime) print(f"[{self.name}] Using bridge-cached document: {url}", flush=True) elif msg_type == MessageType.VIDEO and os.path.isabs(url): cached_urls.append(url) media_types.append("video/mp4") print(f"[{self.name}] Using bridge-cached video: {url}", flush=True) else: cached_urls.append(url) media_types.append("unknown") # For text-readable documents, inject file content directly into # the message text so the agent can read it inline. # Cap at 100KB to match Telegram/Discord/Slack behaviour. body = data.get("body", "") MAX_TEXT_INJECT_BYTES = 100 * 1024 if msg_type == MessageType.DOCUMENT and cached_urls: for doc_path in cached_urls: ext = Path(doc_path).suffix.lower() if ext in (".txt", ".md", ".csv", ".json", ".xml", ".yaml", ".yml", ".log", ".py", ".js", ".ts", ".html", ".css"): try: file_size = Path(doc_path).stat().st_size if file_size > MAX_TEXT_INJECT_BYTES: print(f"[{self.name}] Skipping text injection for {doc_path} ({file_size} bytes > {MAX_TEXT_INJECT_BYTES})", flush=True) continue content = Path(doc_path).read_text(errors="replace") fname = Path(doc_path).name # Remove the doc__ prefix for display display_name = fname if "_" in fname: parts = fname.split("_", 2) if len(parts) >= 3: display_name = parts[2] injection = f"[Content of {display_name}]:\n{content}" if body: body = f"{injection}\n\n{body}" else: body = injection print(f"[{self.name}] Injected text content from: {doc_path}", flush=True) except Exception as e: print(f"[{self.name}] Failed to read document text: {e}", flush=True) return MessageEvent( text=body, message_type=msg_type, source=source, raw_message=data, message_id=data.get("messageId"), media_urls=cached_urls, media_types=media_types, ) except Exception as e: print(f"[{self.name}] Error building event: {e}") return None