diff --git a/cron/scheduler.py b/cron/scheduler.py index f73d6585..e4299836 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -163,6 +163,7 @@ def _deliver_result(job: dict, content: str) -> None: "homeassistant": Platform.HOMEASSISTANT, "dingtalk": Platform.DINGTALK, "feishu": Platform.FEISHU, + "wecom": Platform.WECOM, "email": Platform.EMAIL, "sms": Platform.SMS, } diff --git a/gateway/config.py b/gateway/config.py index 75db564a..a4ec65cc 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -58,6 +58,7 @@ class Platform(Enum): API_SERVER = "api_server" WEBHOOK = "webhook" FEISHU = "feishu" + WECOM = "wecom" @dataclass @@ -278,6 +279,9 @@ class GatewayConfig: # Feishu uses extra dict for app credentials elif platform == Platform.FEISHU and config.extra.get("app_id"): connected.append(platform) + # WeCom uses extra dict for bot credentials + elif platform == Platform.WECOM and config.extra.get("bot_id"): + connected.append(platform) return connected def get_home_channel(self, platform: Platform) -> Optional[HomeChannel]: @@ -841,6 +845,28 @@ def _apply_env_overrides(config: GatewayConfig) -> None: name=os.getenv("FEISHU_HOME_CHANNEL_NAME", "Home"), ) + # WeCom (Enterprise WeChat) + wecom_bot_id = os.getenv("WECOM_BOT_ID") + wecom_secret = os.getenv("WECOM_SECRET") + if wecom_bot_id and wecom_secret: + if Platform.WECOM not in config.platforms: + config.platforms[Platform.WECOM] = PlatformConfig() + config.platforms[Platform.WECOM].enabled = True + config.platforms[Platform.WECOM].extra.update({ + "bot_id": wecom_bot_id, + "secret": wecom_secret, + }) + wecom_ws_url = os.getenv("WECOM_WEBSOCKET_URL", "") + if wecom_ws_url: + config.platforms[Platform.WECOM].extra["websocket_url"] = wecom_ws_url + wecom_home = os.getenv("WECOM_HOME_CHANNEL") + if wecom_home: + config.platforms[Platform.WECOM].home_channel = HomeChannel( + platform=Platform.WECOM, + chat_id=wecom_home, + name=os.getenv("WECOM_HOME_CHANNEL_NAME", "Home"), + ) + # Session settings idle_minutes = os.getenv("SESSION_IDLE_MINUTES") if idle_minutes: diff --git a/gateway/platforms/wecom.py b/gateway/platforms/wecom.py new file mode 100644 index 00000000..d40b651c --- /dev/null +++ b/gateway/platforms/wecom.py @@ -0,0 +1,1338 @@ +""" +WeCom (Enterprise WeChat) platform adapter. + +Uses the WeCom AI Bot WebSocket gateway for inbound and outbound messages. +The adapter focuses on the core gateway path: + +- authenticate via ``aibot_subscribe`` +- receive inbound ``aibot_msg_callback`` events +- send outbound markdown messages via ``aibot_send_msg`` +- upload outbound media via ``aibot_upload_media_*`` and send native attachments +- best-effort download of inbound image/file attachments for agent context + +Configuration in config.yaml: + platforms: + wecom: + enabled: true + extra: + bot_id: "your-bot-id" # or WECOM_BOT_ID env var + secret: "your-secret" # or WECOM_SECRET env var + websocket_url: "wss://openws.work.weixin.qq.com" + dm_policy: "open" # open | allowlist | disabled | pairing + allow_from: ["user_id_1"] + group_policy: "open" # open | allowlist | disabled + group_allow_from: ["group_id_1"] + groups: + group_id_1: + allow_from: ["user_id_1"] +""" + +from __future__ import annotations + +import asyncio +import base64 +import hashlib +import json +import logging +import mimetypes +import os +import re +import time +import uuid +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple +from urllib.parse import unquote, urlparse + +try: + import aiohttp + AIOHTTP_AVAILABLE = True +except ImportError: + AIOHTTP_AVAILABLE = False + aiohttp = None # type: ignore[assignment] + +try: + import httpx + HTTPX_AVAILABLE = True +except ImportError: + HTTPX_AVAILABLE = False + httpx = None # type: ignore[assignment] + +from gateway.config import Platform, PlatformConfig +from gateway.platforms.base import ( + BasePlatformAdapter, + MessageEvent, + MessageType, + SendResult, + cache_document_from_bytes, + cache_image_from_bytes, +) + +logger = logging.getLogger(__name__) + +DEFAULT_WS_URL = "wss://openws.work.weixin.qq.com" + +APP_CMD_SUBSCRIBE = "aibot_subscribe" +APP_CMD_CALLBACK = "aibot_msg_callback" +APP_CMD_LEGACY_CALLBACK = "aibot_callback" +APP_CMD_EVENT_CALLBACK = "aibot_event_callback" +APP_CMD_SEND = "aibot_send_msg" +APP_CMD_RESPONSE = "aibot_respond_msg" +APP_CMD_PING = "ping" +APP_CMD_UPLOAD_MEDIA_INIT = "aibot_upload_media_init" +APP_CMD_UPLOAD_MEDIA_CHUNK = "aibot_upload_media_chunk" +APP_CMD_UPLOAD_MEDIA_FINISH = "aibot_upload_media_finish" + +CALLBACK_COMMANDS = {APP_CMD_CALLBACK, APP_CMD_LEGACY_CALLBACK} +NON_RESPONSE_COMMANDS = CALLBACK_COMMANDS | {APP_CMD_EVENT_CALLBACK} + +MAX_MESSAGE_LENGTH = 4000 +CONNECT_TIMEOUT_SECONDS = 20.0 +REQUEST_TIMEOUT_SECONDS = 15.0 +HEARTBEAT_INTERVAL_SECONDS = 30.0 +RECONNECT_BACKOFF = [2, 5, 10, 30, 60] + +DEDUP_WINDOW_SECONDS = 300 +DEDUP_MAX_SIZE = 1000 + +IMAGE_MAX_BYTES = 10 * 1024 * 1024 +VIDEO_MAX_BYTES = 10 * 1024 * 1024 +VOICE_MAX_BYTES = 2 * 1024 * 1024 +FILE_MAX_BYTES = 20 * 1024 * 1024 +ABSOLUTE_MAX_BYTES = FILE_MAX_BYTES +UPLOAD_CHUNK_SIZE = 512 * 1024 +MAX_UPLOAD_CHUNKS = 100 +VOICE_SUPPORTED_MIMES = {"audio/amr"} + + +def check_wecom_requirements() -> bool: + """Check if WeCom runtime dependencies are available.""" + return AIOHTTP_AVAILABLE and HTTPX_AVAILABLE + + +def _coerce_list(value: Any) -> List[str]: + """Coerce config values into a trimmed string list.""" + if value is None: + return [] + if isinstance(value, str): + return [item.strip() for item in value.split(",") if item.strip()] + if isinstance(value, (list, tuple, set)): + return [str(item).strip() for item in value if str(item).strip()] + return [str(value).strip()] if str(value).strip() else [] + + +def _normalize_entry(raw: str) -> str: + """Normalize allowlist entries such as ``wecom:user:foo``.""" + value = str(raw).strip() + value = re.sub(r"^wecom:", "", value, flags=re.IGNORECASE) + value = re.sub(r"^(user|group):", "", value, flags=re.IGNORECASE) + return value.strip() + + +def _entry_matches(entries: List[str], target: str) -> bool: + """Case-insensitive allowlist match with ``*`` support.""" + normalized_target = str(target).strip().lower() + for entry in entries: + normalized = _normalize_entry(entry).lower() + if normalized == "*" or normalized == normalized_target: + return True + return False + + +class WeComAdapter(BasePlatformAdapter): + """WeCom AI Bot adapter backed by a persistent WebSocket connection.""" + + MAX_MESSAGE_LENGTH = MAX_MESSAGE_LENGTH + + def __init__(self, config: PlatformConfig): + super().__init__(config, Platform.WECOM) + + extra = config.extra or {} + self._bot_id = str(extra.get("bot_id") or os.getenv("WECOM_BOT_ID", "")).strip() + self._secret = str(extra.get("secret") or os.getenv("WECOM_SECRET", "")).strip() + self._ws_url = str( + extra.get("websocket_url") + or extra.get("websocketUrl") + or os.getenv("WECOM_WEBSOCKET_URL", DEFAULT_WS_URL) + ).strip() or DEFAULT_WS_URL + + self._dm_policy = str(extra.get("dm_policy") or os.getenv("WECOM_DM_POLICY", "open")).strip().lower() + self._allow_from = _coerce_list(extra.get("allow_from") or extra.get("allowFrom")) + + self._group_policy = str(extra.get("group_policy") or os.getenv("WECOM_GROUP_POLICY", "open")).strip().lower() + self._group_allow_from = _coerce_list(extra.get("group_allow_from") or extra.get("groupAllowFrom")) + self._groups = extra.get("groups") if isinstance(extra.get("groups"), dict) else {} + + self._session: Optional["aiohttp.ClientSession"] = None + self._ws: Optional["aiohttp.ClientWebSocketResponse"] = None + self._http_client: Optional["httpx.AsyncClient"] = None + self._listen_task: Optional[asyncio.Task] = None + self._heartbeat_task: Optional[asyncio.Task] = None + self._pending_responses: Dict[str, asyncio.Future] = {} + self._seen_messages: Dict[str, float] = {} + self._reply_req_ids: Dict[str, str] = {} + + # ------------------------------------------------------------------ + # Connection lifecycle + # ------------------------------------------------------------------ + + async def connect(self) -> bool: + """Connect to the WeCom AI Bot gateway.""" + if not AIOHTTP_AVAILABLE: + message = "WeCom startup failed: aiohttp not installed" + self._set_fatal_error("wecom_missing_dependency", message, retryable=True) + logger.warning("[%s] %s. Run: pip install aiohttp", self.name, message) + return False + if not HTTPX_AVAILABLE: + message = "WeCom startup failed: httpx not installed" + self._set_fatal_error("wecom_missing_dependency", message, retryable=True) + logger.warning("[%s] %s. Run: pip install httpx", self.name, message) + return False + if not self._bot_id or not self._secret: + message = "WeCom startup failed: WECOM_BOT_ID and WECOM_SECRET are required" + self._set_fatal_error("wecom_missing_credentials", message, retryable=True) + logger.warning("[%s] %s", self.name, message) + return False + + try: + self._http_client = httpx.AsyncClient(timeout=30.0, follow_redirects=True) + await self._open_connection() + self._mark_connected() + self._listen_task = asyncio.create_task(self._listen_loop()) + self._heartbeat_task = asyncio.create_task(self._heartbeat_loop()) + logger.info("[%s] Connected to %s", self.name, self._ws_url) + return True + except Exception as exc: + message = f"WeCom startup failed: {exc}" + self._set_fatal_error("wecom_connect_error", message, retryable=True) + logger.error("[%s] Failed to connect: %s", self.name, exc, exc_info=True) + await self._cleanup_ws() + if self._http_client: + await self._http_client.aclose() + self._http_client = None + return False + + async def disconnect(self) -> None: + """Disconnect from WeCom.""" + self._running = False + self._mark_disconnected() + + if self._listen_task: + self._listen_task.cancel() + try: + await self._listen_task + except asyncio.CancelledError: + pass + self._listen_task = None + + if self._heartbeat_task: + self._heartbeat_task.cancel() + try: + await self._heartbeat_task + except asyncio.CancelledError: + pass + self._heartbeat_task = None + + self._fail_pending_responses(RuntimeError("WeCom adapter disconnected")) + await self._cleanup_ws() + + if self._http_client: + await self._http_client.aclose() + self._http_client = None + + self._seen_messages.clear() + logger.info("[%s] Disconnected", self.name) + + async def _cleanup_ws(self) -> None: + """Close the live websocket/session, if any.""" + if self._ws and not self._ws.closed: + await self._ws.close() + self._ws = None + + if self._session and not self._session.closed: + await self._session.close() + self._session = None + + async def _open_connection(self) -> None: + """Open and authenticate a websocket connection.""" + await self._cleanup_ws() + self._session = aiohttp.ClientSession() + self._ws = await self._session.ws_connect( + self._ws_url, + heartbeat=HEARTBEAT_INTERVAL_SECONDS * 2, + timeout=CONNECT_TIMEOUT_SECONDS, + ) + + req_id = self._new_req_id("subscribe") + await self._send_json( + { + "cmd": APP_CMD_SUBSCRIBE, + "headers": {"req_id": req_id}, + "body": {"bot_id": self._bot_id, "secret": self._secret}, + } + ) + + auth_payload = await self._wait_for_handshake(req_id) + errcode = auth_payload.get("errcode", 0) + if errcode not in (0, None): + errmsg = auth_payload.get("errmsg", "authentication failed") + raise RuntimeError(f"{errmsg} (errcode={errcode})") + + async def _wait_for_handshake(self, req_id: str) -> Dict[str, Any]: + """Wait for the subscribe acknowledgement.""" + if not self._ws: + raise RuntimeError("WebSocket not initialized") + + deadline = asyncio.get_running_loop().time() + CONNECT_TIMEOUT_SECONDS + while True: + remaining = deadline - asyncio.get_running_loop().time() + if remaining <= 0: + raise TimeoutError("Timed out waiting for WeCom subscribe acknowledgement") + + msg = await asyncio.wait_for(self._ws.receive(), timeout=remaining) + if msg.type == aiohttp.WSMsgType.TEXT: + payload = self._parse_json(msg.data) + if not payload: + continue + if payload.get("cmd") == APP_CMD_PING: + continue + if self._payload_req_id(payload) == req_id: + return payload + logger.debug("[%s] Ignoring pre-auth payload: %s", self.name, payload.get("cmd")) + elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.ERROR): + raise RuntimeError("WeCom websocket closed during authentication") + + async def _listen_loop(self) -> None: + """Read websocket events forever, reconnecting on errors.""" + backoff_idx = 0 + while self._running: + try: + await self._read_events() + backoff_idx = 0 + except asyncio.CancelledError: + return + except Exception as exc: + if not self._running: + return + logger.warning("[%s] WebSocket error: %s", self.name, exc) + self._fail_pending_responses(RuntimeError("WeCom connection interrupted")) + + delay = RECONNECT_BACKOFF[min(backoff_idx, len(RECONNECT_BACKOFF) - 1)] + backoff_idx += 1 + await asyncio.sleep(delay) + + try: + await self._open_connection() + backoff_idx = 0 + logger.info("[%s] Reconnected", self.name) + except Exception as reconnect_exc: + logger.warning("[%s] Reconnect failed: %s", self.name, reconnect_exc) + + async def _read_events(self) -> None: + """Read websocket frames until the connection closes.""" + if not self._ws: + raise RuntimeError("WebSocket not connected") + + while self._running and self._ws and not self._ws.closed: + msg = await self._ws.receive() + if msg.type == aiohttp.WSMsgType.TEXT: + payload = self._parse_json(msg.data) + if payload: + await self._dispatch_payload(payload) + elif msg.type in (aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): + raise RuntimeError("WeCom websocket closed") + + async def _heartbeat_loop(self) -> None: + """Send lightweight application-level pings.""" + try: + while self._running: + await asyncio.sleep(HEARTBEAT_INTERVAL_SECONDS) + if not self._ws or self._ws.closed: + continue + try: + await self._send_json( + { + "cmd": APP_CMD_PING, + "headers": {"req_id": self._new_req_id("ping")}, + "body": {}, + } + ) + except Exception as exc: + logger.debug("[%s] Heartbeat send failed: %s", self.name, exc) + except asyncio.CancelledError: + pass + + async def _dispatch_payload(self, payload: Dict[str, Any]) -> None: + """Route inbound websocket payloads.""" + req_id = self._payload_req_id(payload) + cmd = str(payload.get("cmd") or "") + + if req_id and req_id in self._pending_responses and cmd not in NON_RESPONSE_COMMANDS: + future = self._pending_responses.get(req_id) + if future and not future.done(): + future.set_result(payload) + return + + if cmd in CALLBACK_COMMANDS: + await self._on_message(payload) + return + if cmd in {APP_CMD_PING, APP_CMD_EVENT_CALLBACK}: + return + + logger.debug("[%s] Ignoring websocket payload: %s", self.name, cmd or payload) + + def _fail_pending_responses(self, exc: Exception) -> None: + """Fail all outstanding request futures.""" + for req_id, future in list(self._pending_responses.items()): + if not future.done(): + future.set_exception(exc) + self._pending_responses.pop(req_id, None) + + async def _send_json(self, payload: Dict[str, Any]) -> None: + """Send a raw JSON frame over the active websocket.""" + if not self._ws or self._ws.closed: + raise RuntimeError("WeCom websocket is not connected") + await self._ws.send_json(payload) + + async def _send_request(self, cmd: str, body: Dict[str, Any], timeout: float = REQUEST_TIMEOUT_SECONDS) -> Dict[str, Any]: + """Send a JSON request and await the correlated response.""" + if not self._ws or self._ws.closed: + raise RuntimeError("WeCom websocket is not connected") + + req_id = self._new_req_id(cmd) + future = asyncio.get_running_loop().create_future() + self._pending_responses[req_id] = future + try: + await self._send_json({"cmd": cmd, "headers": {"req_id": req_id}, "body": body}) + response = await asyncio.wait_for(future, timeout=timeout) + return response + finally: + self._pending_responses.pop(req_id, None) + + async def _send_reply_request( + self, + reply_req_id: str, + body: Dict[str, Any], + cmd: str = APP_CMD_RESPONSE, + timeout: float = REQUEST_TIMEOUT_SECONDS, + ) -> Dict[str, Any]: + """Send a reply frame correlated to an inbound callback req_id.""" + if not self._ws or self._ws.closed: + raise RuntimeError("WeCom websocket is not connected") + + normalized_req_id = str(reply_req_id or "").strip() + if not normalized_req_id: + raise ValueError("reply_req_id is required") + + future = asyncio.get_running_loop().create_future() + self._pending_responses[normalized_req_id] = future + try: + await self._send_json( + {"cmd": cmd, "headers": {"req_id": normalized_req_id}, "body": body} + ) + response = await asyncio.wait_for(future, timeout=timeout) + return response + finally: + self._pending_responses.pop(normalized_req_id, None) + + @staticmethod + def _new_req_id(prefix: str) -> str: + return f"{prefix}-{uuid.uuid4().hex}" + + @staticmethod + def _payload_req_id(payload: Dict[str, Any]) -> str: + headers = payload.get("headers") + if isinstance(headers, dict): + return str(headers.get("req_id") or "") + return "" + + @staticmethod + def _parse_json(raw: Any) -> Optional[Dict[str, Any]]: + try: + payload = json.loads(raw) + except Exception: + logger.debug("Failed to parse WeCom payload: %r", raw) + return None + return payload if isinstance(payload, dict) else None + + # ------------------------------------------------------------------ + # Inbound message parsing + # ------------------------------------------------------------------ + + async def _on_message(self, payload: Dict[str, Any]) -> None: + """Process an inbound WeCom message callback event.""" + body = payload.get("body") + if not isinstance(body, dict): + return + + msg_id = str(body.get("msgid") or self._payload_req_id(payload) or uuid.uuid4().hex) + if self._is_duplicate(msg_id): + logger.debug("[%s] Duplicate message %s ignored", self.name, msg_id) + return + self._remember_reply_req_id(msg_id, self._payload_req_id(payload)) + + sender = body.get("from") if isinstance(body.get("from"), dict) else {} + sender_id = str(sender.get("userid") or "").strip() + chat_id = str(body.get("chatid") or sender_id).strip() + if not chat_id: + logger.debug("[%s] Missing chat id, skipping message", self.name) + return + + is_group = str(body.get("chattype") or "").lower() == "group" + if is_group: + if not self._is_group_allowed(chat_id, sender_id): + logger.debug("[%s] Group %s / sender %s blocked by policy", self.name, chat_id, sender_id) + return + elif not self._is_dm_allowed(sender_id): + logger.debug("[%s] DM sender %s blocked by policy", self.name, sender_id) + return + + text, reply_text = self._extract_text(body) + media_urls, media_types = await self._extract_media(body) + message_type = self._derive_message_type(body, text, media_types) + has_reply_context = bool(reply_text and (text or media_urls)) + + if not text and reply_text and not media_urls: + text = reply_text + + if not text and not media_urls: + logger.debug("[%s] Empty WeCom message skipped", self.name) + return + + source = self.build_source( + chat_id=chat_id, + chat_type="group" if is_group else "dm", + user_id=sender_id or None, + user_name=sender_id or None, + ) + + event = MessageEvent( + text=text, + message_type=message_type, + source=source, + raw_message=payload, + message_id=msg_id, + media_urls=media_urls, + media_types=media_types, + reply_to_message_id=f"quote:{msg_id}" if has_reply_context else None, + reply_to_text=reply_text if has_reply_context else None, + timestamp=datetime.now(tz=timezone.utc), + ) + + await self.handle_message(event) + + @staticmethod + def _extract_text(body: Dict[str, Any]) -> Tuple[str, Optional[str]]: + """Extract plain text and quoted text from a callback payload.""" + text_parts: List[str] = [] + reply_text: Optional[str] = None + msgtype = str(body.get("msgtype") or "").lower() + + if msgtype == "mixed": + mixed = body.get("mixed") if isinstance(body.get("mixed"), dict) else {} + items = mixed.get("msg_item") if isinstance(mixed.get("msg_item"), list) else [] + for item in items: + if not isinstance(item, dict): + continue + if str(item.get("msgtype") or "").lower() == "text": + text_block = item.get("text") if isinstance(item.get("text"), dict) else {} + content = str(text_block.get("content") or "").strip() + if content: + text_parts.append(content) + else: + text_block = body.get("text") if isinstance(body.get("text"), dict) else {} + content = str(text_block.get("content") or "").strip() + if content: + text_parts.append(content) + + if msgtype == "voice": + voice_block = body.get("voice") if isinstance(body.get("voice"), dict) else {} + voice_text = str(voice_block.get("content") or "").strip() + if voice_text: + text_parts.append(voice_text) + + quote = body.get("quote") if isinstance(body.get("quote"), dict) else {} + quote_type = str(quote.get("msgtype") or "").lower() + if quote_type == "text": + quote_text = quote.get("text") if isinstance(quote.get("text"), dict) else {} + reply_text = str(quote_text.get("content") or "").strip() or None + elif quote_type == "voice": + quote_voice = quote.get("voice") if isinstance(quote.get("voice"), dict) else {} + reply_text = str(quote_voice.get("content") or "").strip() or None + + return "\n".join(part for part in text_parts if part).strip(), reply_text + + async def _extract_media(self, body: Dict[str, Any]) -> Tuple[List[str], List[str]]: + """Best-effort extraction of inbound media to local cache paths.""" + media_paths: List[str] = [] + media_types: List[str] = [] + refs: List[Tuple[str, Dict[str, Any]]] = [] + msgtype = str(body.get("msgtype") or "").lower() + + if msgtype == "mixed": + mixed = body.get("mixed") if isinstance(body.get("mixed"), dict) else {} + items = mixed.get("msg_item") if isinstance(mixed.get("msg_item"), list) else [] + for item in items: + if not isinstance(item, dict): + continue + item_type = str(item.get("msgtype") or "").lower() + if item_type == "image" and isinstance(item.get("image"), dict): + refs.append(("image", item["image"])) + else: + if isinstance(body.get("image"), dict): + refs.append(("image", body["image"])) + if msgtype == "file" and isinstance(body.get("file"), dict): + refs.append(("file", body["file"])) + + quote = body.get("quote") if isinstance(body.get("quote"), dict) else {} + quote_type = str(quote.get("msgtype") or "").lower() + if quote_type == "image" and isinstance(quote.get("image"), dict): + refs.append(("image", quote["image"])) + elif quote_type == "file" and isinstance(quote.get("file"), dict): + refs.append(("file", quote["file"])) + + for kind, ref in refs: + cached = await self._cache_media(kind, ref) + if cached: + path, content_type = cached + media_paths.append(path) + media_types.append(content_type) + + return media_paths, media_types + + async def _cache_media(self, kind: str, media: Dict[str, Any]) -> Optional[Tuple[str, str]]: + """Cache an inbound image/file/media reference to local storage.""" + if "base64" in media and media.get("base64"): + try: + raw = self._decode_base64(media["base64"]) + except Exception as exc: + logger.debug("[%s] Failed to decode %s base64 media: %s", self.name, kind, exc) + return None + + if kind == "image": + ext = self._detect_image_ext(raw) + return cache_image_from_bytes(raw, ext), self._mime_for_ext(ext, fallback="image/jpeg") + + filename = str(media.get("filename") or media.get("name") or "wecom_file") + return cache_document_from_bytes(raw, filename), mimetypes.guess_type(filename)[0] or "application/octet-stream" + + url = str(media.get("url") or "").strip() + if not url: + return None + + try: + raw, headers = await self._download_remote_bytes(url, max_bytes=ABSOLUTE_MAX_BYTES) + except Exception as exc: + logger.debug("[%s] Failed to download %s from %s: %s", self.name, kind, url, exc) + return None + + aes_key = str(media.get("aeskey") or "").strip() + if aes_key: + try: + raw = self._decrypt_file_bytes(raw, aes_key) + except Exception as exc: + logger.debug("[%s] Failed to decrypt %s from %s: %s", self.name, kind, url, exc) + return None + + content_type = str(headers.get("content-type") or "").split(";", 1)[0].strip() or "application/octet-stream" + if kind == "image": + ext = self._guess_extension(url, content_type, fallback=self._detect_image_ext(raw)) + return cache_image_from_bytes(raw, ext), content_type or self._mime_for_ext(ext, fallback="image/jpeg") + + filename = self._guess_filename(url, headers.get("content-disposition"), content_type) + return cache_document_from_bytes(raw, filename), content_type + + @staticmethod + def _decode_base64(data: str) -> bytes: + payload = data.split(",", 1)[-1].strip() + return base64.b64decode(payload) + + @staticmethod + def _detect_image_ext(data: bytes) -> str: + if data.startswith(b"\x89PNG\r\n\x1a\n"): + return ".png" + if data.startswith(b"\xff\xd8\xff"): + return ".jpg" + if data.startswith(b"GIF87a") or data.startswith(b"GIF89a"): + return ".gif" + if data.startswith(b"RIFF") and data[8:12] == b"WEBP": + return ".webp" + return ".jpg" + + @staticmethod + def _mime_for_ext(ext: str, fallback: str = "application/octet-stream") -> str: + return mimetypes.types_map.get(ext.lower(), fallback) + + @staticmethod + def _guess_extension(url: str, content_type: str, fallback: str) -> str: + ext = mimetypes.guess_extension(content_type) if content_type else None + if ext: + return ext + path_ext = Path(urlparse(url).path).suffix + if path_ext: + return path_ext + return fallback + + @staticmethod + def _guess_filename(url: str, content_disposition: Optional[str], content_type: str) -> str: + if content_disposition: + match = re.search(r'filename="?([^";]+)"?', content_disposition) + if match: + return match.group(1) + + name = Path(urlparse(url).path).name or "document" + if "." not in name: + ext = mimetypes.guess_extension(content_type) or ".bin" + name = f"{name}{ext}" + return name + + @staticmethod + def _derive_message_type(body: Dict[str, Any], text: str, media_types: List[str]) -> MessageType: + """Choose the normalized inbound message type.""" + if any(mtype.startswith("application/") or mtype.startswith("text/") for mtype in media_types): + return MessageType.DOCUMENT + if any(mtype.startswith("image/") for mtype in media_types): + return MessageType.TEXT if text else MessageType.PHOTO + if str(body.get("msgtype") or "").lower() == "voice": + return MessageType.VOICE + return MessageType.TEXT + + # ------------------------------------------------------------------ + # Policy helpers + # ------------------------------------------------------------------ + + def _is_dm_allowed(self, sender_id: str) -> bool: + if self._dm_policy == "disabled": + return False + if self._dm_policy == "allowlist": + return _entry_matches(self._allow_from, sender_id) + return True + + def _is_group_allowed(self, chat_id: str, sender_id: str) -> bool: + if self._group_policy == "disabled": + return False + if self._group_policy == "allowlist" and not _entry_matches(self._group_allow_from, chat_id): + return False + + group_cfg = self._resolve_group_cfg(chat_id) + sender_allow = _coerce_list(group_cfg.get("allow_from") or group_cfg.get("allowFrom")) + if sender_allow: + return _entry_matches(sender_allow, sender_id) + return True + + def _resolve_group_cfg(self, chat_id: str) -> Dict[str, Any]: + if not isinstance(self._groups, dict): + return {} + if chat_id in self._groups and isinstance(self._groups[chat_id], dict): + return self._groups[chat_id] + lowered = chat_id.lower() + for key, value in self._groups.items(): + if isinstance(key, str) and key.lower() == lowered and isinstance(value, dict): + return value + wildcard = self._groups.get("*") + return wildcard if isinstance(wildcard, dict) else {} + + def _is_duplicate(self, msg_id: str) -> bool: + now = time.time() + if len(self._seen_messages) > DEDUP_MAX_SIZE: + cutoff = now - DEDUP_WINDOW_SECONDS + self._seen_messages = { + key: ts for key, ts in self._seen_messages.items() if ts > cutoff + } + if self._reply_req_ids: + self._reply_req_ids = { + key: value for key, value in self._reply_req_ids.items() if key in self._seen_messages + } + + if msg_id in self._seen_messages: + return True + + self._seen_messages[msg_id] = now + return False + + def _remember_reply_req_id(self, message_id: str, req_id: str) -> None: + normalized_message_id = str(message_id or "").strip() + normalized_req_id = str(req_id or "").strip() + if not normalized_message_id or not normalized_req_id: + return + self._reply_req_ids[normalized_message_id] = normalized_req_id + while len(self._reply_req_ids) > DEDUP_MAX_SIZE: + self._reply_req_ids.pop(next(iter(self._reply_req_ids))) + + def _reply_req_id_for_message(self, reply_to: Optional[str]) -> Optional[str]: + normalized = str(reply_to or "").strip() + if not normalized or normalized.startswith("quote:"): + return None + return self._reply_req_ids.get(normalized) + + # ------------------------------------------------------------------ + # Outbound messaging + # ------------------------------------------------------------------ + + @staticmethod + def _guess_mime_type(filename: str) -> str: + mime_type = mimetypes.guess_type(filename)[0] + if mime_type: + return mime_type + if Path(filename).suffix.lower() == ".amr": + return "audio/amr" + return "application/octet-stream" + + @staticmethod + def _normalize_content_type(content_type: str, filename: str) -> str: + normalized = str(content_type or "").split(";", 1)[0].strip().lower() + guessed = WeComAdapter._guess_mime_type(filename) + if not normalized: + return guessed + if normalized in {"application/octet-stream", "text/plain"}: + return guessed + return normalized + + @staticmethod + def _detect_wecom_media_type(content_type: str) -> str: + mime_type = str(content_type or "").strip().lower() + if mime_type.startswith("image/"): + return "image" + if mime_type.startswith("video/"): + return "video" + if mime_type.startswith("audio/") or mime_type == "application/ogg": + return "voice" + return "file" + + @staticmethod + def _apply_file_size_limits(file_size: int, detected_type: str, content_type: Optional[str] = None) -> Dict[str, Any]: + file_size_mb = file_size / (1024 * 1024) + normalized_type = str(detected_type or "file").lower() + normalized_content_type = str(content_type or "").strip().lower() + + if file_size > ABSOLUTE_MAX_BYTES: + return { + "final_type": normalized_type, + "rejected": True, + "reject_reason": ( + f"文件大小 {file_size_mb:.2f}MB 超过了企业微信允许的最大限制 20MB,无法发送。" + "请尝试压缩文件或减小文件大小。" + ), + "downgraded": False, + "downgrade_note": None, + } + + if normalized_type == "image" and file_size > IMAGE_MAX_BYTES: + return { + "final_type": "file", + "rejected": False, + "reject_reason": None, + "downgraded": True, + "downgrade_note": f"图片大小 {file_size_mb:.2f}MB 超过 10MB 限制,已转为文件格式发送", + } + + if normalized_type == "video" and file_size > VIDEO_MAX_BYTES: + return { + "final_type": "file", + "rejected": False, + "reject_reason": None, + "downgraded": True, + "downgrade_note": f"视频大小 {file_size_mb:.2f}MB 超过 10MB 限制,已转为文件格式发送", + } + + if normalized_type == "voice": + if normalized_content_type and normalized_content_type not in VOICE_SUPPORTED_MIMES: + return { + "final_type": "file", + "rejected": False, + "reject_reason": None, + "downgraded": True, + "downgrade_note": ( + f"语音格式 {normalized_content_type} 不支持,企微仅支持 AMR 格式,已转为文件格式发送" + ), + } + if file_size > VOICE_MAX_BYTES: + return { + "final_type": "file", + "rejected": False, + "reject_reason": None, + "downgraded": True, + "downgrade_note": f"语音大小 {file_size_mb:.2f}MB 超过 2MB 限制,已转为文件格式发送", + } + + return { + "final_type": normalized_type, + "rejected": False, + "reject_reason": None, + "downgraded": False, + "downgrade_note": None, + } + + @staticmethod + def _response_error(response: Dict[str, Any]) -> Optional[str]: + errcode = response.get("errcode", 0) + if errcode in (0, None): + return None + errmsg = str(response.get("errmsg") or "unknown error") + return f"WeCom errcode {errcode}: {errmsg}" + + @classmethod + def _raise_for_wecom_error(cls, response: Dict[str, Any], operation: str) -> None: + error = cls._response_error(response) + if error: + raise RuntimeError(f"{operation} failed: {error}") + + @staticmethod + def _decrypt_file_bytes(encrypted_data: bytes, aes_key: str) -> bytes: + if not encrypted_data: + raise ValueError("encrypted_data is empty") + if not aes_key: + raise ValueError("aes_key is required") + + key = base64.b64decode(aes_key) + if len(key) != 32: + raise ValueError(f"Invalid WeCom AES key length: expected 32 bytes, got {len(key)}") + + try: + from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + except ImportError as exc: # pragma: no cover - dependency is environment-specific + raise RuntimeError("cryptography is required for WeCom media decryption") from exc + + cipher = Cipher(algorithms.AES(key), modes.CBC(key[:16])) + decryptor = cipher.decryptor() + decrypted = decryptor.update(encrypted_data) + decryptor.finalize() + + pad_len = decrypted[-1] + if pad_len < 1 or pad_len > 32 or pad_len > len(decrypted): + raise ValueError(f"Invalid PKCS#7 padding value: {pad_len}") + if any(byte != pad_len for byte in decrypted[-pad_len:]): + raise ValueError("Invalid PKCS#7 padding: padding bytes mismatch") + + return decrypted[:-pad_len] + + async def _download_remote_bytes( + self, + url: str, + max_bytes: int, + ) -> Tuple[bytes, Dict[str, str]]: + if not HTTPX_AVAILABLE: + raise RuntimeError("httpx is required for WeCom media download") + + client = self._http_client or httpx.AsyncClient(timeout=30.0, follow_redirects=True) + created_client = client is not self._http_client + try: + async with client.stream( + "GET", + url, + headers={ + "User-Agent": "HermesAgent/1.0", + "Accept": "*/*", + }, + ) as response: + response.raise_for_status() + headers = {key.lower(): value for key, value in response.headers.items()} + content_length = headers.get("content-length") + if content_length and content_length.isdigit() and int(content_length) > max_bytes: + raise ValueError( + f"Remote media exceeds WeCom limit: {int(content_length)} bytes > {max_bytes} bytes" + ) + + data = bytearray() + async for chunk in response.aiter_bytes(): + data.extend(chunk) + if len(data) > max_bytes: + raise ValueError( + f"Remote media exceeds WeCom limit while downloading: {len(data)} bytes > {max_bytes} bytes" + ) + + return bytes(data), headers + finally: + if created_client: + await client.aclose() + + @staticmethod + def _looks_like_url(media_source: str) -> bool: + parsed = urlparse(str(media_source or "")) + return parsed.scheme in {"http", "https"} + + async def _load_outbound_media( + self, + media_source: str, + file_name: Optional[str] = None, + ) -> Tuple[bytes, str, str]: + source = str(media_source or "").strip() + if not source: + raise ValueError("media source is required") + if re.fullmatch(r"<[^>\n]+>", source): + raise ValueError(f"Media placeholder was not replaced with a real file path: {source}") + + parsed = urlparse(source) + if parsed.scheme in {"http", "https"}: + data, headers = await self._download_remote_bytes(source, max_bytes=ABSOLUTE_MAX_BYTES) + content_disposition = headers.get("content-disposition") + resolved_name = file_name or self._guess_filename(source, content_disposition, headers.get("content-type", "")) + content_type = self._normalize_content_type(headers.get("content-type", ""), resolved_name) + return data, content_type, resolved_name + + if parsed.scheme == "file": + local_path = Path(unquote(parsed.path)).expanduser() + else: + local_path = Path(source).expanduser() + + if not local_path.is_absolute(): + local_path = (Path.cwd() / local_path).resolve() + + if not local_path.exists() or not local_path.is_file(): + raise FileNotFoundError(f"Media file not found: {local_path}") + + data = local_path.read_bytes() + resolved_name = file_name or local_path.name + content_type = self._normalize_content_type("", resolved_name) + return data, content_type, resolved_name + + async def _prepare_outbound_media( + self, + media_source: str, + file_name: Optional[str] = None, + ) -> Dict[str, Any]: + data, content_type, resolved_name = await self._load_outbound_media(media_source, file_name=file_name) + detected_type = self._detect_wecom_media_type(content_type) + size_check = self._apply_file_size_limits(len(data), detected_type, content_type) + return { + "data": data, + "content_type": content_type, + "file_name": resolved_name, + "detected_type": detected_type, + **size_check, + } + + async def _upload_media_bytes(self, data: bytes, media_type: str, filename: str) -> Dict[str, Any]: + if not data: + raise ValueError("Cannot upload empty media") + + total_size = len(data) + total_chunks = (total_size + UPLOAD_CHUNK_SIZE - 1) // UPLOAD_CHUNK_SIZE + if total_chunks > MAX_UPLOAD_CHUNKS: + raise ValueError( + f"File too large: {total_chunks} chunks exceeds maximum of {MAX_UPLOAD_CHUNKS} chunks" + ) + + init_response = await self._send_request( + APP_CMD_UPLOAD_MEDIA_INIT, + { + "type": media_type, + "filename": filename, + "total_size": total_size, + "total_chunks": total_chunks, + "md5": hashlib.md5(data).hexdigest(), + }, + ) + self._raise_for_wecom_error(init_response, "media upload init") + + init_body = init_response.get("body") if isinstance(init_response.get("body"), dict) else {} + upload_id = str(init_body.get("upload_id") or "").strip() + if not upload_id: + raise RuntimeError(f"media upload init failed: missing upload_id in response {init_response}") + + for chunk_index, start in enumerate(range(0, total_size, UPLOAD_CHUNK_SIZE)): + chunk = data[start : start + UPLOAD_CHUNK_SIZE] + chunk_response = await self._send_request( + APP_CMD_UPLOAD_MEDIA_CHUNK, + { + "upload_id": upload_id, + # Match the official SDK implementation, which currently uses 0-based chunk indexes. + "chunk_index": chunk_index, + "base64_data": base64.b64encode(chunk).decode("ascii"), + }, + ) + self._raise_for_wecom_error(chunk_response, f"media upload chunk {chunk_index}") + + finish_response = await self._send_request( + APP_CMD_UPLOAD_MEDIA_FINISH, + {"upload_id": upload_id}, + ) + self._raise_for_wecom_error(finish_response, "media upload finish") + + finish_body = finish_response.get("body") if isinstance(finish_response.get("body"), dict) else {} + media_id = str(finish_body.get("media_id") or "").strip() + if not media_id: + raise RuntimeError(f"media upload finish failed: missing media_id in response {finish_response}") + + return { + "type": str(finish_body.get("type") or media_type), + "media_id": media_id, + "created_at": finish_body.get("created_at"), + } + + async def _send_media_message(self, chat_id: str, media_type: str, media_id: str) -> Dict[str, Any]: + response = await self._send_request( + APP_CMD_SEND, + { + "chatid": chat_id, + "msgtype": media_type, + media_type: {"media_id": media_id}, + }, + ) + self._raise_for_wecom_error(response, "send media message") + return response + + async def _send_reply_stream(self, reply_req_id: str, content: str) -> Dict[str, Any]: + response = await self._send_reply_request( + reply_req_id, + { + "msgtype": "stream", + "stream": { + "id": self._new_req_id("stream"), + "finish": True, + "content": content[:self.MAX_MESSAGE_LENGTH], + }, + }, + ) + self._raise_for_wecom_error(response, "send reply stream") + return response + + async def _send_reply_media_message( + self, + reply_req_id: str, + media_type: str, + media_id: str, + ) -> Dict[str, Any]: + response = await self._send_reply_request( + reply_req_id, + { + "msgtype": media_type, + media_type: {"media_id": media_id}, + }, + ) + self._raise_for_wecom_error(response, "send reply media message") + return response + + async def _send_followup_markdown( + self, + chat_id: str, + content: str, + reply_to: Optional[str] = None, + ) -> Optional[SendResult]: + if not content: + return None + result = await self.send(chat_id=chat_id, content=content, reply_to=reply_to) + if not result.success: + logger.warning("[%s] Follow-up markdown send failed: %s", self.name, result.error) + return result + + async def _send_media_source( + self, + chat_id: str, + media_source: str, + caption: Optional[str] = None, + file_name: Optional[str] = None, + reply_to: Optional[str] = None, + ) -> SendResult: + if not chat_id: + return SendResult(success=False, error="chat_id is required") + + try: + prepared = await self._prepare_outbound_media(media_source, file_name=file_name) + except FileNotFoundError as exc: + return SendResult(success=False, error=str(exc)) + except Exception as exc: + logger.error("[%s] Failed to prepare outbound media %s: %s", self.name, media_source, exc) + return SendResult(success=False, error=str(exc)) + + if prepared["rejected"]: + await self._send_followup_markdown( + chat_id, + f"⚠️ {prepared['reject_reason']}", + reply_to=reply_to, + ) + return SendResult(success=False, error=prepared["reject_reason"]) + + reply_req_id = self._reply_req_id_for_message(reply_to) + try: + upload_result = await self._upload_media_bytes( + prepared["data"], + prepared["final_type"], + prepared["file_name"], + ) + if reply_req_id: + media_response = await self._send_reply_media_message( + reply_req_id, + prepared["final_type"], + upload_result["media_id"], + ) + else: + media_response = await self._send_media_message( + chat_id, + prepared["final_type"], + upload_result["media_id"], + ) + except asyncio.TimeoutError: + return SendResult(success=False, error="Timeout sending media to WeCom") + except Exception as exc: + logger.error("[%s] Failed to send media %s: %s", self.name, media_source, exc) + return SendResult(success=False, error=str(exc)) + + caption_result = None + downgrade_result = None + if caption: + caption_result = await self._send_followup_markdown( + chat_id, + caption, + reply_to=reply_to, + ) + if prepared["downgraded"] and prepared["downgrade_note"]: + downgrade_result = await self._send_followup_markdown( + chat_id, + f"ℹ️ {prepared['downgrade_note']}", + reply_to=reply_to, + ) + + return SendResult( + success=True, + message_id=self._payload_req_id(media_response) or uuid.uuid4().hex[:12], + raw_response={ + "upload": upload_result, + "media": media_response, + "caption": caption_result.raw_response if caption_result else None, + "caption_error": caption_result.error if caption_result and not caption_result.success else None, + "downgrade": downgrade_result.raw_response if downgrade_result else None, + "downgrade_error": downgrade_result.error if downgrade_result and not downgrade_result.success else None, + }, + ) + + async def send( + self, + chat_id: str, + content: str, + reply_to: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> SendResult: + """Send markdown to a WeCom chat via proactive ``aibot_send_msg``.""" + del metadata + + if not chat_id: + return SendResult(success=False, error="chat_id is required") + + try: + reply_req_id = self._reply_req_id_for_message(reply_to) + if reply_req_id: + response = await self._send_reply_stream(reply_req_id, content) + else: + response = await self._send_request( + APP_CMD_SEND, + { + "chatid": chat_id, + "msgtype": "markdown", + "markdown": {"content": content[:self.MAX_MESSAGE_LENGTH]}, + }, + ) + except asyncio.TimeoutError: + return SendResult(success=False, error="Timeout sending message to WeCom") + except Exception as exc: + logger.error("[%s] Send failed: %s", self.name, exc) + return SendResult(success=False, error=str(exc)) + + error = self._response_error(response) + if error: + return SendResult(success=False, error=error) + + return SendResult( + success=True, + message_id=self._payload_req_id(response) or uuid.uuid4().hex[:12], + raw_response=response, + ) + + async def send_image( + self, + chat_id: str, + image_url: str, + caption: Optional[str] = None, + reply_to: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> SendResult: + del metadata + + result = await self._send_media_source( + chat_id=chat_id, + media_source=image_url, + caption=caption, + reply_to=reply_to, + ) + if result.success or not self._looks_like_url(image_url): + return result + + logger.warning("[%s] Falling back to text send for image URL %s: %s", self.name, image_url, result.error) + fallback_text = f"{caption}\n{image_url}" if caption else image_url + return await self.send(chat_id=chat_id, content=fallback_text, reply_to=reply_to) + + async def send_image_file( + self, + chat_id: str, + image_path: str, + caption: Optional[str] = None, + reply_to: Optional[str] = None, + **kwargs, + ) -> SendResult: + del kwargs + return await self._send_media_source( + chat_id=chat_id, + media_source=image_path, + caption=caption, + reply_to=reply_to, + ) + + async def send_document( + self, + chat_id: str, + file_path: str, + caption: Optional[str] = None, + file_name: Optional[str] = None, + reply_to: Optional[str] = None, + **kwargs, + ) -> SendResult: + del kwargs + return await self._send_media_source( + chat_id=chat_id, + media_source=file_path, + caption=caption, + file_name=file_name, + reply_to=reply_to, + ) + + async def send_voice( + self, + chat_id: str, + audio_path: str, + caption: Optional[str] = None, + reply_to: Optional[str] = None, + **kwargs, + ) -> SendResult: + del kwargs + return await self._send_media_source( + chat_id=chat_id, + media_source=audio_path, + caption=caption, + reply_to=reply_to, + ) + + async def send_video( + self, + chat_id: str, + video_path: str, + caption: Optional[str] = None, + reply_to: Optional[str] = None, + **kwargs, + ) -> SendResult: + del kwargs + return await self._send_media_source( + chat_id=chat_id, + media_source=video_path, + caption=caption, + reply_to=reply_to, + ) + + async def send_typing(self, chat_id: str, metadata=None) -> None: + """WeCom does not expose typing indicators in this adapter.""" + del chat_id, metadata + + async def get_chat_info(self, chat_id: str) -> Dict[str, Any]: + """Return minimal chat info.""" + return { + "name": chat_id, + "type": "group" if chat_id and chat_id.lower().startswith("group") else "dm", + } diff --git a/gateway/run.py b/gateway/run.py index 97d85364..3c0ca181 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1027,6 +1027,7 @@ class GatewayRunner: "SMS_ALLOWED_USERS", "MATTERMOST_ALLOWED_USERS", "MATRIX_ALLOWED_USERS", "DINGTALK_ALLOWED_USERS", "FEISHU_ALLOWED_USERS", + "WECOM_ALLOWED_USERS", "GATEWAY_ALLOWED_USERS") ) _allow_all = os.getenv("GATEWAY_ALLOW_ALL_USERS", "").lower() in ("true", "1", "yes") or any( @@ -1036,7 +1037,8 @@ class GatewayRunner: "SIGNAL_ALLOW_ALL_USERS", "EMAIL_ALLOW_ALL_USERS", "SMS_ALLOW_ALL_USERS", "MATTERMOST_ALLOW_ALL_USERS", "MATRIX_ALLOW_ALL_USERS", "DINGTALK_ALLOW_ALL_USERS", - "FEISHU_ALLOW_ALL_USERS") + "FEISHU_ALLOW_ALL_USERS", + "WECOM_ALLOW_ALL_USERS") ) if not _any_allowlist and not _allow_all: logger.warning( @@ -1486,6 +1488,13 @@ class GatewayRunner: return None return FeishuAdapter(config) + elif platform == Platform.WECOM: + from gateway.platforms.wecom import WeComAdapter, check_wecom_requirements + if not check_wecom_requirements(): + logger.warning("WeCom: aiohttp not installed or WECOM_BOT_ID/SECRET not set") + return None + return WeComAdapter(config) + elif platform == Platform.MATTERMOST: from gateway.platforms.mattermost import MattermostAdapter, check_mattermost_requirements if not check_mattermost_requirements(): @@ -1553,6 +1562,7 @@ class GatewayRunner: Platform.MATRIX: "MATRIX_ALLOWED_USERS", Platform.DINGTALK: "DINGTALK_ALLOWED_USERS", Platform.FEISHU: "FEISHU_ALLOWED_USERS", + Platform.WECOM: "WECOM_ALLOWED_USERS", } platform_allow_all_map = { Platform.TELEGRAM: "TELEGRAM_ALLOW_ALL_USERS", @@ -1566,6 +1576,7 @@ class GatewayRunner: Platform.MATRIX: "MATRIX_ALLOW_ALL_USERS", Platform.DINGTALK: "DINGTALK_ALLOW_ALL_USERS", Platform.FEISHU: "FEISHU_ALLOW_ALL_USERS", + Platform.WECOM: "WECOM_ALLOW_ALL_USERS", } # Per-platform allow-all flag (e.g., DISCORD_ALLOW_ALL_USERS=true) diff --git a/hermes_cli/config.py b/hermes_cli/config.py index c2fc7a3b..4cdbb0af 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -35,6 +35,7 @@ _EXTRA_ENV_KEYS = frozenset({ "SIGNAL_ALLOWED_USERS", "SIGNAL_GROUP_ALLOWED_USERS", "DINGTALK_CLIENT_ID", "DINGTALK_CLIENT_SECRET", "FEISHU_APP_ID", "FEISHU_APP_SECRET", "FEISHU_ENCRYPT_KEY", "FEISHU_VERIFICATION_TOKEN", + "WECOM_BOT_ID", "WECOM_SECRET", "TERMINAL_ENV", "TERMINAL_SSH_KEY", "TERMINAL_SSH_PORT", "WHATSAPP_MODE", "WHATSAPP_ENABLED", "MATTERMOST_HOME_CHANNEL", "MATTERMOST_REPLY_MODE", diff --git a/hermes_cli/gateway.py b/hermes_cli/gateway.py index db15fcc7..ba292277 100644 --- a/hermes_cli/gateway.py +++ b/hermes_cli/gateway.py @@ -1351,6 +1351,30 @@ _PLATFORMS = [ "help": "Chat ID for scheduled results and notifications."}, ], }, + { + "key": "wecom", + "label": "WeCom (Enterprise WeChat)", + "emoji": "💬", + "token_var": "WECOM_BOT_ID", + "setup_instructions": [ + "1. Go to WeCom Admin Console → Applications → Create AI Bot", + "2. Copy the Bot ID and Secret from the bot's credentials page", + "3. The bot connects via WebSocket — no public endpoint needed", + "4. Add the bot to a group chat or message it directly in WeCom", + "5. Restrict access with WECOM_ALLOWED_USERS for production use", + ], + "vars": [ + {"name": "WECOM_BOT_ID", "prompt": "Bot ID", "password": False, + "help": "The Bot ID from your WeCom AI Bot."}, + {"name": "WECOM_SECRET", "prompt": "Secret", "password": True, + "help": "The secret from your WeCom AI Bot."}, + {"name": "WECOM_ALLOWED_USERS", "prompt": "Allowed user IDs (comma-separated, or empty)", "password": False, + "is_allowlist": True, + "help": "Restrict which WeCom users can interact with the bot."}, + {"name": "WECOM_HOME_CHANNEL", "prompt": "Home chat ID (optional, for cron/notifications)", "password": False, + "help": "Chat ID for scheduled results and notifications."}, + ], + }, ] diff --git a/hermes_cli/skills_config.py b/hermes_cli/skills_config.py index 2fd0227a..07ccd0af 100644 --- a/hermes_cli/skills_config.py +++ b/hermes_cli/skills_config.py @@ -29,6 +29,7 @@ PLATFORMS = { "matrix": "💬 Matrix", "dingtalk": "💬 DingTalk", "feishu": "🪽 Feishu", + "wecom": "💬 WeCom", } # ─── Config Helpers ─────────────────────────────────────────────────────────── diff --git a/hermes_cli/status.py b/hermes_cli/status.py index c622fca7..3a03aabb 100644 --- a/hermes_cli/status.py +++ b/hermes_cli/status.py @@ -254,6 +254,9 @@ def show_status(args): "Slack": ("SLACK_BOT_TOKEN", None), "Email": ("EMAIL_ADDRESS", "EMAIL_HOME_ADDRESS"), "SMS": ("TWILIO_ACCOUNT_SID", "SMS_HOME_CHANNEL"), + "DingTalk": ("DINGTALK_CLIENT_ID", None), + "Feishu": ("FEISHU_APP_ID", "FEISHU_HOME_CHANNEL"), + "WeCom": ("WECOM_BOT_ID", "WECOM_HOME_CHANNEL"), } for name, (token_var, home_var) in platforms.items(): diff --git a/hermes_cli/tools_config.py b/hermes_cli/tools_config.py index d8fa47ce..91496d45 100644 --- a/hermes_cli/tools_config.py +++ b/hermes_cli/tools_config.py @@ -142,6 +142,7 @@ PLATFORMS = { "matrix": {"label": "💬 Matrix", "default_toolset": "hermes-matrix"}, "dingtalk": {"label": "💬 DingTalk", "default_toolset": "hermes-dingtalk"}, "feishu": {"label": "🪽 Feishu", "default_toolset": "hermes-feishu"}, + "wecom": {"label": "💬 WeCom", "default_toolset": "hermes-wecom"}, "api_server": {"label": "🌐 API Server", "default_toolset": "hermes-api-server"}, "mattermost": {"label": "💬 Mattermost", "default_toolset": "hermes-mattermost"}, } diff --git a/tests/gateway/test_allowlist_startup_check.py b/tests/gateway/test_allowlist_startup_check.py index 24df941a..96441c05 100644 --- a/tests/gateway/test_allowlist_startup_check.py +++ b/tests/gateway/test_allowlist_startup_check.py @@ -13,7 +13,7 @@ def _would_warn(): "SIGNAL_ALLOWED_USERS", "SIGNAL_GROUP_ALLOWED_USERS", "EMAIL_ALLOWED_USERS", "SMS_ALLOWED_USERS", "MATTERMOST_ALLOWED_USERS", - "MATRIX_ALLOWED_USERS", "DINGTALK_ALLOWED_USERS", "FEISHU_ALLOWED_USERS", + "MATRIX_ALLOWED_USERS", "DINGTALK_ALLOWED_USERS", "FEISHU_ALLOWED_USERS", "WECOM_ALLOWED_USERS", "GATEWAY_ALLOWED_USERS") ) _allow_all = os.getenv("GATEWAY_ALLOW_ALL_USERS", "").lower() in ("true", "1", "yes") or any( @@ -22,7 +22,7 @@ def _would_warn(): "WHATSAPP_ALLOW_ALL_USERS", "SLACK_ALLOW_ALL_USERS", "SIGNAL_ALLOW_ALL_USERS", "EMAIL_ALLOW_ALL_USERS", "SMS_ALLOW_ALL_USERS", "MATTERMOST_ALLOW_ALL_USERS", - "MATRIX_ALLOW_ALL_USERS", "DINGTALK_ALLOW_ALL_USERS", "FEISHU_ALLOW_ALL_USERS") + "MATRIX_ALLOW_ALL_USERS", "DINGTALK_ALLOW_ALL_USERS", "FEISHU_ALLOW_ALL_USERS", "WECOM_ALLOW_ALL_USERS") ) return not _any_allowlist and not _allow_all diff --git a/tests/gateway/test_unauthorized_dm_behavior.py b/tests/gateway/test_unauthorized_dm_behavior.py index 6f4a9ff0..02aae301 100644 --- a/tests/gateway/test_unauthorized_dm_behavior.py +++ b/tests/gateway/test_unauthorized_dm_behavior.py @@ -20,7 +20,7 @@ def _clear_auth_env(monkeypatch) -> None: "SMS_ALLOWED_USERS", "MATTERMOST_ALLOWED_USERS", "MATRIX_ALLOWED_USERS", - "DINGTALK_ALLOWED_USERS", "FEISHU_ALLOWED_USERS", + "DINGTALK_ALLOWED_USERS", "FEISHU_ALLOWED_USERS", "WECOM_ALLOWED_USERS", "GATEWAY_ALLOWED_USERS", "TELEGRAM_ALLOW_ALL_USERS", "DISCORD_ALLOW_ALL_USERS", @@ -31,7 +31,7 @@ def _clear_auth_env(monkeypatch) -> None: "SMS_ALLOW_ALL_USERS", "MATTERMOST_ALLOW_ALL_USERS", "MATRIX_ALLOW_ALL_USERS", - "DINGTALK_ALLOW_ALL_USERS", "FEISHU_ALLOW_ALL_USERS", + "DINGTALK_ALLOW_ALL_USERS", "FEISHU_ALLOW_ALL_USERS", "WECOM_ALLOW_ALL_USERS", "GATEWAY_ALLOW_ALL_USERS", ): monkeypatch.delenv(key, raising=False) diff --git a/tests/gateway/test_wecom.py b/tests/gateway/test_wecom.py new file mode 100644 index 00000000..a7101c69 --- /dev/null +++ b/tests/gateway/test_wecom.py @@ -0,0 +1,596 @@ +"""Tests for the WeCom platform adapter.""" + +import base64 +import os +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import AsyncMock + +import pytest + +from gateway.config import Platform, PlatformConfig +from gateway.platforms.base import SendResult + + +class TestWeComRequirements: + def test_returns_false_without_aiohttp(self, monkeypatch): + monkeypatch.setattr("gateway.platforms.wecom.AIOHTTP_AVAILABLE", False) + monkeypatch.setattr("gateway.platforms.wecom.HTTPX_AVAILABLE", True) + from gateway.platforms.wecom import check_wecom_requirements + + assert check_wecom_requirements() is False + + def test_returns_false_without_httpx(self, monkeypatch): + monkeypatch.setattr("gateway.platforms.wecom.AIOHTTP_AVAILABLE", True) + monkeypatch.setattr("gateway.platforms.wecom.HTTPX_AVAILABLE", False) + from gateway.platforms.wecom import check_wecom_requirements + + assert check_wecom_requirements() is False + + def test_returns_true_when_available(self, monkeypatch): + monkeypatch.setattr("gateway.platforms.wecom.AIOHTTP_AVAILABLE", True) + monkeypatch.setattr("gateway.platforms.wecom.HTTPX_AVAILABLE", True) + from gateway.platforms.wecom import check_wecom_requirements + + assert check_wecom_requirements() is True + + +class TestWeComAdapterInit: + def test_reads_config_from_extra(self): + from gateway.platforms.wecom import WeComAdapter + + config = PlatformConfig( + enabled=True, + extra={ + "bot_id": "cfg-bot", + "secret": "cfg-secret", + "websocket_url": "wss://custom.wecom.example/ws", + "group_policy": "allowlist", + "group_allow_from": ["group-1"], + }, + ) + adapter = WeComAdapter(config) + + assert adapter._bot_id == "cfg-bot" + assert adapter._secret == "cfg-secret" + assert adapter._ws_url == "wss://custom.wecom.example/ws" + assert adapter._group_policy == "allowlist" + assert adapter._group_allow_from == ["group-1"] + + def test_falls_back_to_env_vars(self, monkeypatch): + monkeypatch.setenv("WECOM_BOT_ID", "env-bot") + monkeypatch.setenv("WECOM_SECRET", "env-secret") + monkeypatch.setenv("WECOM_WEBSOCKET_URL", "wss://env.example/ws") + from gateway.platforms.wecom import WeComAdapter + + adapter = WeComAdapter(PlatformConfig(enabled=True)) + assert adapter._bot_id == "env-bot" + assert adapter._secret == "env-secret" + assert adapter._ws_url == "wss://env.example/ws" + + +class TestWeComConnect: + @pytest.mark.asyncio + async def test_connect_records_missing_credentials(self, monkeypatch): + import gateway.platforms.wecom as wecom_module + from gateway.platforms.wecom import WeComAdapter + + monkeypatch.setattr(wecom_module, "AIOHTTP_AVAILABLE", True) + monkeypatch.setattr(wecom_module, "HTTPX_AVAILABLE", True) + + adapter = WeComAdapter(PlatformConfig(enabled=True)) + + success = await adapter.connect() + + assert success is False + assert adapter.has_fatal_error is True + assert adapter.fatal_error_code == "wecom_missing_credentials" + assert "WECOM_BOT_ID" in (adapter.fatal_error_message or "") + + @pytest.mark.asyncio + async def test_connect_records_handshake_failure_details(self, monkeypatch): + import gateway.platforms.wecom as wecom_module + from gateway.platforms.wecom import WeComAdapter + + class DummyClient: + async def aclose(self): + return None + + monkeypatch.setattr(wecom_module, "AIOHTTP_AVAILABLE", True) + monkeypatch.setattr(wecom_module, "HTTPX_AVAILABLE", True) + monkeypatch.setattr( + wecom_module, + "httpx", + SimpleNamespace(AsyncClient=lambda **kwargs: DummyClient()), + ) + + adapter = WeComAdapter( + PlatformConfig(enabled=True, extra={"bot_id": "bot-1", "secret": "secret-1"}) + ) + adapter._open_connection = AsyncMock(side_effect=RuntimeError("invalid secret (errcode=40013)")) + + success = await adapter.connect() + + assert success is False + assert adapter.has_fatal_error is True + assert adapter.fatal_error_code == "wecom_connect_error" + assert "invalid secret" in (adapter.fatal_error_message or "") + + +class TestWeComReplyMode: + @pytest.mark.asyncio + async def test_send_uses_passive_reply_stream_when_reply_context_exists(self): + from gateway.platforms.wecom import WeComAdapter + + adapter = WeComAdapter(PlatformConfig(enabled=True)) + adapter._reply_req_ids["msg-1"] = "req-1" + adapter._send_reply_request = AsyncMock( + return_value={"headers": {"req_id": "req-1"}, "errcode": 0} + ) + + result = await adapter.send("chat-123", "hello from reply", reply_to="msg-1") + + assert result.success is True + adapter._send_reply_request.assert_awaited_once() + args = adapter._send_reply_request.await_args.args + assert args[0] == "req-1" + assert args[1]["msgtype"] == "stream" + assert args[1]["stream"]["finish"] is True + assert args[1]["stream"]["content"] == "hello from reply" + + @pytest.mark.asyncio + async def test_send_image_file_uses_passive_reply_media_when_reply_context_exists(self): + from gateway.platforms.wecom import WeComAdapter + + adapter = WeComAdapter(PlatformConfig(enabled=True)) + adapter._reply_req_ids["msg-1"] = "req-1" + adapter._prepare_outbound_media = AsyncMock( + return_value={ + "data": b"image-bytes", + "content_type": "image/png", + "file_name": "demo.png", + "detected_type": "image", + "final_type": "image", + "rejected": False, + "reject_reason": None, + "downgraded": False, + "downgrade_note": None, + } + ) + adapter._upload_media_bytes = AsyncMock(return_value={"media_id": "media-1", "type": "image"}) + adapter._send_reply_request = AsyncMock( + return_value={"headers": {"req_id": "req-1"}, "errcode": 0} + ) + + result = await adapter.send_image_file("chat-123", "/tmp/demo.png", reply_to="msg-1") + + assert result.success is True + adapter._send_reply_request.assert_awaited_once() + args = adapter._send_reply_request.await_args.args + assert args[0] == "req-1" + assert args[1] == {"msgtype": "image", "image": {"media_id": "media-1"}} + + +class TestExtractText: + def test_extracts_plain_text(self): + from gateway.platforms.wecom import WeComAdapter + + body = { + "msgtype": "text", + "text": {"content": " hello world "}, + } + text, reply_text = WeComAdapter._extract_text(body) + assert text == "hello world" + assert reply_text is None + + def test_extracts_mixed_text(self): + from gateway.platforms.wecom import WeComAdapter + + body = { + "msgtype": "mixed", + "mixed": { + "msg_item": [ + {"msgtype": "text", "text": {"content": "part1"}}, + {"msgtype": "image", "image": {"url": "https://example.com/x.png"}}, + {"msgtype": "text", "text": {"content": "part2"}}, + ] + }, + } + text, _reply_text = WeComAdapter._extract_text(body) + assert text == "part1\npart2" + + def test_extracts_voice_and_quote(self): + from gateway.platforms.wecom import WeComAdapter + + body = { + "msgtype": "voice", + "voice": {"content": "spoken text"}, + "quote": {"msgtype": "text", "text": {"content": "quoted"}}, + } + text, reply_text = WeComAdapter._extract_text(body) + assert text == "spoken text" + assert reply_text == "quoted" + + +class TestCallbackDispatch: + @pytest.mark.asyncio + @pytest.mark.parametrize("cmd", ["aibot_msg_callback", "aibot_callback"]) + async def test_dispatch_accepts_new_and_legacy_callback_cmds(self, cmd): + from gateway.platforms.wecom import WeComAdapter + + adapter = WeComAdapter(PlatformConfig(enabled=True)) + adapter._on_message = AsyncMock() + + await adapter._dispatch_payload({"cmd": cmd, "headers": {"req_id": "req-1"}, "body": {}}) + + adapter._on_message.assert_awaited_once() + + +class TestPolicyHelpers: + def test_dm_allowlist(self): + from gateway.platforms.wecom import WeComAdapter + + adapter = WeComAdapter( + PlatformConfig(enabled=True, extra={"dm_policy": "allowlist", "allow_from": ["user-1"]}) + ) + assert adapter._is_dm_allowed("user-1") is True + assert adapter._is_dm_allowed("user-2") is False + + def test_group_allowlist_and_per_group_sender_allowlist(self): + from gateway.platforms.wecom import WeComAdapter + + adapter = WeComAdapter( + PlatformConfig( + enabled=True, + extra={ + "group_policy": "allowlist", + "group_allow_from": ["group-1"], + "groups": {"group-1": {"allow_from": ["user-1"]}}, + }, + ) + ) + + assert adapter._is_group_allowed("group-1", "user-1") is True + assert adapter._is_group_allowed("group-1", "user-2") is False + assert adapter._is_group_allowed("group-2", "user-1") is False + + +class TestMediaHelpers: + def test_detect_wecom_media_type(self): + from gateway.platforms.wecom import WeComAdapter + + assert WeComAdapter._detect_wecom_media_type("image/png") == "image" + assert WeComAdapter._detect_wecom_media_type("video/mp4") == "video" + assert WeComAdapter._detect_wecom_media_type("audio/amr") == "voice" + assert WeComAdapter._detect_wecom_media_type("application/pdf") == "file" + + def test_voice_non_amr_downgrades_to_file(self): + from gateway.platforms.wecom import WeComAdapter + + result = WeComAdapter._apply_file_size_limits(128, "voice", "audio/mpeg") + + assert result["final_type"] == "file" + assert result["downgraded"] is True + assert "AMR" in (result["downgrade_note"] or "") + + def test_oversized_file_is_rejected(self): + from gateway.platforms.wecom import ABSOLUTE_MAX_BYTES, WeComAdapter + + result = WeComAdapter._apply_file_size_limits(ABSOLUTE_MAX_BYTES + 1, "file", "application/pdf") + + assert result["rejected"] is True + assert "20MB" in (result["reject_reason"] or "") + + def test_decrypt_file_bytes_round_trip(self): + from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + from gateway.platforms.wecom import WeComAdapter + + plaintext = b"wecom-secret" + key = os.urandom(32) + pad_len = 32 - (len(plaintext) % 32) + padded = plaintext + bytes([pad_len]) * pad_len + encryptor = Cipher(algorithms.AES(key), modes.CBC(key[:16])).encryptor() + encrypted = encryptor.update(padded) + encryptor.finalize() + + decrypted = WeComAdapter._decrypt_file_bytes(encrypted, base64.b64encode(key).decode("ascii")) + + assert decrypted == plaintext + + @pytest.mark.asyncio + async def test_load_outbound_media_rejects_placeholder_path(self): + from gateway.platforms.wecom import WeComAdapter + + adapter = WeComAdapter(PlatformConfig(enabled=True)) + + with pytest.raises(ValueError, match="placeholder was not replaced"): + await adapter._load_outbound_media("") + + +class TestMediaUpload: + @pytest.mark.asyncio + async def test_upload_media_bytes_uses_sdk_sequence(self, monkeypatch): + import gateway.platforms.wecom as wecom_module + from gateway.platforms.wecom import ( + APP_CMD_UPLOAD_MEDIA_CHUNK, + APP_CMD_UPLOAD_MEDIA_FINISH, + APP_CMD_UPLOAD_MEDIA_INIT, + WeComAdapter, + ) + + adapter = WeComAdapter(PlatformConfig(enabled=True)) + calls = [] + + async def fake_send_request(cmd, body, timeout=0): + calls.append((cmd, body)) + if cmd == APP_CMD_UPLOAD_MEDIA_INIT: + return {"errcode": 0, "body": {"upload_id": "upload-1"}} + if cmd == APP_CMD_UPLOAD_MEDIA_CHUNK: + return {"errcode": 0} + if cmd == APP_CMD_UPLOAD_MEDIA_FINISH: + return { + "errcode": 0, + "body": { + "media_id": "media-1", + "type": "file", + "created_at": "2026-03-18T00:00:00Z", + }, + } + raise AssertionError(f"unexpected cmd {cmd}") + + monkeypatch.setattr(wecom_module, "UPLOAD_CHUNK_SIZE", 4) + adapter._send_request = fake_send_request + + result = await adapter._upload_media_bytes(b"abcdefghij", "file", "demo.bin") + + assert result["media_id"] == "media-1" + assert [cmd for cmd, _body in calls] == [ + APP_CMD_UPLOAD_MEDIA_INIT, + APP_CMD_UPLOAD_MEDIA_CHUNK, + APP_CMD_UPLOAD_MEDIA_CHUNK, + APP_CMD_UPLOAD_MEDIA_CHUNK, + APP_CMD_UPLOAD_MEDIA_FINISH, + ] + assert calls[1][1]["chunk_index"] == 0 + assert calls[2][1]["chunk_index"] == 1 + assert calls[3][1]["chunk_index"] == 2 + + @pytest.mark.asyncio + async def test_download_remote_bytes_rejects_large_content_length(self): + from gateway.platforms.wecom import WeComAdapter + + class FakeResponse: + headers = {"content-length": "10"} + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return None + + def raise_for_status(self): + return None + + async def aiter_bytes(self): + yield b"abc" + + class FakeClient: + def stream(self, method, url, headers=None): + return FakeResponse() + + adapter = WeComAdapter(PlatformConfig(enabled=True)) + adapter._http_client = FakeClient() + + with pytest.raises(ValueError, match="exceeds WeCom limit"): + await adapter._download_remote_bytes("https://example.com/file.bin", max_bytes=4) + + @pytest.mark.asyncio + async def test_cache_media_decrypts_url_payload_before_writing(self): + from gateway.platforms.wecom import WeComAdapter + + adapter = WeComAdapter(PlatformConfig(enabled=True)) + plaintext = b"secret document bytes" + key = os.urandom(32) + pad_len = 32 - (len(plaintext) % 32) + padded = plaintext + bytes([pad_len]) * pad_len + + from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + + encryptor = Cipher(algorithms.AES(key), modes.CBC(key[:16])).encryptor() + encrypted = encryptor.update(padded) + encryptor.finalize() + adapter._download_remote_bytes = AsyncMock( + return_value=( + encrypted, + { + "content-type": "application/octet-stream", + "content-disposition": 'attachment; filename="secret.bin"', + }, + ) + ) + + cached = await adapter._cache_media( + "file", + { + "url": "https://example.com/secret.bin", + "aeskey": base64.b64encode(key).decode("ascii"), + }, + ) + + assert cached is not None + cached_path, content_type = cached + assert Path(cached_path).read_bytes() == plaintext + assert content_type == "application/octet-stream" + + +class TestSend: + @pytest.mark.asyncio + async def test_send_uses_proactive_payload(self): + from gateway.platforms.wecom import APP_CMD_SEND, WeComAdapter + + adapter = WeComAdapter(PlatformConfig(enabled=True)) + adapter._send_request = AsyncMock(return_value={"headers": {"req_id": "req-1"}, "errcode": 0}) + + result = await adapter.send("chat-123", "Hello WeCom") + + assert result.success is True + adapter._send_request.assert_awaited_once_with( + APP_CMD_SEND, + { + "chatid": "chat-123", + "msgtype": "markdown", + "markdown": {"content": "Hello WeCom"}, + }, + ) + + @pytest.mark.asyncio + async def test_send_reports_wecom_errors(self): + from gateway.platforms.wecom import WeComAdapter + + adapter = WeComAdapter(PlatformConfig(enabled=True)) + adapter._send_request = AsyncMock(return_value={"errcode": 40001, "errmsg": "bad request"}) + + result = await adapter.send("chat-123", "Hello WeCom") + + assert result.success is False + assert "40001" in (result.error or "") + + @pytest.mark.asyncio + async def test_send_image_falls_back_to_text_for_remote_url(self): + from gateway.platforms.wecom import WeComAdapter + + adapter = WeComAdapter(PlatformConfig(enabled=True)) + adapter._send_media_source = AsyncMock(return_value=SendResult(success=False, error="upload failed")) + adapter.send = AsyncMock(return_value=SendResult(success=True, message_id="msg-1")) + + result = await adapter.send_image("chat-123", "https://example.com/demo.png", caption="demo") + + assert result.success is True + adapter.send.assert_awaited_once_with(chat_id="chat-123", content="demo\nhttps://example.com/demo.png", reply_to=None) + + @pytest.mark.asyncio + async def test_send_voice_sends_caption_and_downgrade_note(self): + from gateway.platforms.wecom import WeComAdapter + + adapter = WeComAdapter(PlatformConfig(enabled=True)) + adapter._prepare_outbound_media = AsyncMock( + return_value={ + "data": b"voice-bytes", + "content_type": "audio/mpeg", + "file_name": "voice.mp3", + "detected_type": "voice", + "final_type": "file", + "rejected": False, + "reject_reason": None, + "downgraded": True, + "downgrade_note": "语音格式 audio/mpeg 不支持,企微仅支持 AMR 格式,已转为文件格式发送", + } + ) + adapter._upload_media_bytes = AsyncMock(return_value={"media_id": "media-1", "type": "file"}) + adapter._send_media_message = AsyncMock(return_value={"headers": {"req_id": "req-media"}, "errcode": 0}) + adapter.send = AsyncMock(return_value=SendResult(success=True, message_id="msg-1")) + + result = await adapter.send_voice("chat-123", "/tmp/voice.mp3", caption="listen") + + assert result.success is True + adapter._send_media_message.assert_awaited_once_with("chat-123", "file", "media-1") + assert adapter.send.await_count == 2 + adapter.send.assert_any_await(chat_id="chat-123", content="listen", reply_to=None) + adapter.send.assert_any_await( + chat_id="chat-123", + content="ℹ️ 语音格式 audio/mpeg 不支持,企微仅支持 AMR 格式,已转为文件格式发送", + reply_to=None, + ) + + +class TestInboundMessages: + @pytest.mark.asyncio + async def test_on_message_builds_event(self): + from gateway.platforms.wecom import WeComAdapter + + adapter = WeComAdapter(PlatformConfig(enabled=True)) + adapter.handle_message = AsyncMock() + adapter._extract_media = AsyncMock(return_value=(["/tmp/test.png"], ["image/png"])) + + payload = { + "cmd": "aibot_msg_callback", + "headers": {"req_id": "req-1"}, + "body": { + "msgid": "msg-1", + "chatid": "group-1", + "chattype": "group", + "from": {"userid": "user-1"}, + "msgtype": "text", + "text": {"content": "hello"}, + }, + } + + await adapter._on_message(payload) + + adapter.handle_message.assert_awaited_once() + event = adapter.handle_message.await_args.args[0] + assert event.text == "hello" + assert event.source.chat_id == "group-1" + assert event.source.user_id == "user-1" + assert event.media_urls == ["/tmp/test.png"] + assert event.media_types == ["image/png"] + + @pytest.mark.asyncio + async def test_on_message_preserves_quote_context(self): + from gateway.platforms.wecom import WeComAdapter + + adapter = WeComAdapter(PlatformConfig(enabled=True)) + adapter.handle_message = AsyncMock() + adapter._extract_media = AsyncMock(return_value=([], [])) + + payload = { + "cmd": "aibot_msg_callback", + "headers": {"req_id": "req-1"}, + "body": { + "msgid": "msg-1", + "chatid": "group-1", + "chattype": "group", + "from": {"userid": "user-1"}, + "msgtype": "text", + "text": {"content": "follow up"}, + "quote": {"msgtype": "text", "text": {"content": "quoted message"}}, + }, + } + + await adapter._on_message(payload) + + event = adapter.handle_message.await_args.args[0] + assert event.reply_to_text == "quoted message" + assert event.reply_to_message_id == "quote:msg-1" + + @pytest.mark.asyncio + async def test_on_message_respects_group_policy(self): + from gateway.platforms.wecom import WeComAdapter + + adapter = WeComAdapter( + PlatformConfig( + enabled=True, + extra={"group_policy": "allowlist", "group_allow_from": ["group-allowed"]}, + ) + ) + adapter.handle_message = AsyncMock() + adapter._extract_media = AsyncMock(return_value=([], [])) + + payload = { + "cmd": "aibot_callback", + "headers": {"req_id": "req-1"}, + "body": { + "msgid": "msg-1", + "chatid": "group-blocked", + "chattype": "group", + "from": {"userid": "user-1"}, + "msgtype": "text", + "text": {"content": "hello"}, + }, + } + + await adapter._on_message(payload) + adapter.handle_message.assert_not_awaited() + + +class TestPlatformEnum: + def test_wecom_in_platform_enum(self): + assert Platform.WECOM.value == "wecom" diff --git a/tools/cronjob_tools.py b/tools/cronjob_tools.py index 5f209e16..84054c6e 100644 --- a/tools/cronjob_tools.py +++ b/tools/cronjob_tools.py @@ -372,7 +372,7 @@ Important safety rule: cron-run sessions should not recursively schedule more cr }, "deliver": { "type": "string", - "description": "Delivery target: origin, local, telegram, discord, slack, whatsapp, signal, matrix, mattermost, homeassistant, dingtalk, feishu, email, sms, or platform:chat_id or platform:chat_id:thread_id for Telegram topics. Examples: 'origin', 'local', 'telegram', 'telegram:-1001234567890:17585', 'discord:#engineering'" + "description": "Delivery target: origin, local, telegram, discord, slack, whatsapp, signal, matrix, mattermost, homeassistant, dingtalk, feishu, wecom, email, sms, or platform:chat_id or platform:chat_id:thread_id for Telegram topics. Examples: 'origin', 'local', 'telegram', 'telegram:-1001234567890:17585', 'discord:#engineering'" }, "model": { "type": "string", diff --git a/tools/send_message_tool.py b/tools/send_message_tool.py index e0384735..d12eed50 100644 --- a/tools/send_message_tool.py +++ b/tools/send_message_tool.py @@ -130,6 +130,7 @@ def _handle_send(args): "homeassistant": Platform.HOMEASSISTANT, "dingtalk": Platform.DINGTALK, "feishu": Platform.FEISHU, + "wecom": Platform.WECOM, "email": Platform.EMAIL, "sms": Platform.SMS, } @@ -368,6 +369,8 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None, result = await _send_dingtalk(pconfig.extra, chat_id, chunk) elif platform == Platform.FEISHU: result = await _send_feishu(pconfig, chat_id, chunk, thread_id=thread_id) + elif platform == Platform.WECOM: + result = await _send_wecom(pconfig.extra, chat_id, chunk) else: result = {"error": f"Direct sending not yet implemented for {platform.value}"} @@ -794,6 +797,33 @@ async def _send_dingtalk(extra, chat_id, message): return {"error": f"DingTalk send failed: {e}"} +async def _send_wecom(extra, chat_id, message): + """Send via WeCom using the adapter's WebSocket send pipeline.""" + try: + from gateway.platforms.wecom import WeComAdapter, check_wecom_requirements + if not check_wecom_requirements(): + return {"error": "WeCom requirements not met. Need aiohttp + WECOM_BOT_ID/SECRET."} + except ImportError: + return {"error": "WeCom adapter not available."} + + try: + from gateway.config import PlatformConfig + pconfig = PlatformConfig(extra=extra) + adapter = WeComAdapter(pconfig) + connected = await adapter.connect() + if not connected: + return {"error": f"WeCom: failed to connect — {adapter.fatal_error_message or 'unknown error'}"} + try: + result = await adapter.send(chat_id, message) + if not result.success: + return {"error": f"WeCom send failed: {result.error}"} + return {"success": True, "platform": "wecom", "chat_id": chat_id, "message_id": result.message_id} + finally: + await adapter.disconnect() + except Exception as e: + return {"error": f"WeCom send failed: {e}"} + + async def _send_feishu(pconfig, chat_id, message, media_files=None, thread_id=None): """Send via Feishu/Lark using the adapter's send pipeline.""" try: diff --git a/toolsets.py b/toolsets.py index a08fe38c..ad762555 100644 --- a/toolsets.py +++ b/toolsets.py @@ -357,6 +357,12 @@ TOOLSETS = { "includes": [] }, + "hermes-wecom": { + "description": "WeCom bot toolset - enterprise WeChat messaging (full access)", + "tools": _HERMES_CORE_TOOLS, + "includes": [] + }, + "hermes-sms": { "description": "SMS bot toolset - interact with Hermes via SMS (Twilio)", "tools": _HERMES_CORE_TOOLS, @@ -366,7 +372,7 @@ TOOLSETS = { "hermes-gateway": { "description": "Gateway toolset - union of all messaging platform tools", "tools": [], - "includes": ["hermes-telegram", "hermes-discord", "hermes-whatsapp", "hermes-slack", "hermes-signal", "hermes-homeassistant", "hermes-email", "hermes-sms", "hermes-mattermost", "hermes-matrix", "hermes-dingtalk", "hermes-feishu"] + "includes": ["hermes-telegram", "hermes-discord", "hermes-whatsapp", "hermes-slack", "hermes-signal", "hermes-homeassistant", "hermes-email", "hermes-sms", "hermes-mattermost", "hermes-matrix", "hermes-dingtalk", "hermes-feishu", "hermes-wecom"] } } diff --git a/website/docs/reference/environment-variables.md b/website/docs/reference/environment-variables.md index b60b0598..715c9fbc 100644 --- a/website/docs/reference/environment-variables.md +++ b/website/docs/reference/environment-variables.md @@ -196,6 +196,19 @@ For native Anthropic auth, Hermes prefers Claude Code's own credential files whe | `DINGTALK_CLIENT_ID` | DingTalk bot AppKey from developer portal ([open.dingtalk.com](https://open.dingtalk.com)) | | `DINGTALK_CLIENT_SECRET` | DingTalk bot AppSecret from developer portal | | `DINGTALK_ALLOWED_USERS` | Comma-separated DingTalk user IDs allowed to message the bot | +| `FEISHU_APP_ID` | Feishu/Lark bot App ID from [open.feishu.cn](https://open.feishu.cn/) | +| `FEISHU_APP_SECRET` | Feishu/Lark bot App Secret | +| `FEISHU_DOMAIN` | `feishu` (China) or `lark` (international). Default: `feishu` | +| `FEISHU_CONNECTION_MODE` | `websocket` (recommended) or `webhook`. Default: `websocket` | +| `FEISHU_ENCRYPT_KEY` | Optional encryption key for webhook mode | +| `FEISHU_VERIFICATION_TOKEN` | Optional verification token for webhook mode | +| `FEISHU_ALLOWED_USERS` | Comma-separated Feishu user IDs allowed to message the bot | +| `FEISHU_HOME_CHANNEL` | Feishu chat ID for cron delivery and notifications | +| `WECOM_BOT_ID` | WeCom AI Bot ID from admin console | +| `WECOM_SECRET` | WeCom AI Bot secret | +| `WECOM_WEBSOCKET_URL` | Custom WebSocket URL (default: `wss://openws.work.weixin.qq.com`) | +| `WECOM_ALLOWED_USERS` | Comma-separated WeCom user IDs allowed to message the bot | +| `WECOM_HOME_CHANNEL` | WeCom chat ID for cron delivery and notifications | | `MATTERMOST_URL` | Mattermost server URL (e.g. `https://mm.example.com`) | | `MATTERMOST_TOKEN` | Bot token or personal access token for Mattermost | | `MATTERMOST_ALLOWED_USERS` | Comma-separated Mattermost user IDs allowed to message the bot | diff --git a/website/docs/reference/toolsets-reference.md b/website/docs/reference/toolsets-reference.md index 133870eb..83cf92e4 100644 --- a/website/docs/reference/toolsets-reference.md +++ b/website/docs/reference/toolsets-reference.md @@ -22,6 +22,7 @@ Toolsets are named bundles of tools that you can enable with `hermes chat --tool | `hermes-api-server` | platform | _(same as hermes-cli)_ | | `hermes-dingtalk` | platform | _(same as hermes-cli)_ | | `hermes-feishu` | platform | _(same as hermes-cli)_ | +| `hermes-wecom` | platform | _(same as hermes-cli)_ | | `hermes-discord` | platform | _(same as hermes-cli)_ | | `hermes-email` | platform | _(same as hermes-cli)_ | | `hermes-gateway` | composite | Union of all messaging platform toolsets | diff --git a/website/docs/user-guide/messaging/index.md b/website/docs/user-guide/messaging/index.md index 3dc0e9cd..9073e45f 100644 --- a/website/docs/user-guide/messaging/index.md +++ b/website/docs/user-guide/messaging/index.md @@ -6,7 +6,7 @@ description: "Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, # Messaging Gateway -Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, SMS, Email, Home Assistant, Mattermost, Matrix, DingTalk, Feishu/Lark, or your browser. The gateway is a single background process that connects to all your configured platforms, handles sessions, runs cron jobs, and delivers voice messages. +Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, SMS, Email, Home Assistant, Mattermost, Matrix, DingTalk, Feishu/Lark, WeCom, or your browser. The gateway is a single background process that connects to all your configured platforms, handles sessions, runs cron jobs, and delivers voice messages. For the full voice feature set — including CLI microphone mode, spoken replies in messaging, and Discord voice-channel conversations — see [Voice Mode](/docs/user-guide/features/voice-mode) and [Use Voice Mode with Hermes](/docs/guides/use-voice-mode-with-hermes). @@ -28,6 +28,7 @@ flowchart TB mx[Matrix] dt[DingTalk] fs[Feishu/Lark] + wc[WeCom] api["API Server
(OpenAI-compatible)"] wh[Webhooks] end @@ -330,6 +331,7 @@ Each platform has its own toolset: | Matrix | `hermes-matrix` | Full tools including terminal | | DingTalk | `hermes-dingtalk` | Full tools including terminal | | Feishu/Lark | `hermes-feishu` | Full tools including terminal | +| WeCom | `hermes-wecom` | Full tools including terminal | | API Server | `hermes` (default) | Full tools including terminal | | Webhooks | `hermes-webhook` | Full tools including terminal | @@ -347,5 +349,6 @@ Each platform has its own toolset: - [Matrix Setup](matrix.md) - [DingTalk Setup](dingtalk.md) - [Feishu/Lark Setup](feishu.md) +- [WeCom Setup](wecom.md) - [Open WebUI + API Server](open-webui.md) - [Webhooks](webhooks.md) diff --git a/website/docs/user-guide/messaging/wecom.md b/website/docs/user-guide/messaging/wecom.md new file mode 100644 index 00000000..e5a551b8 --- /dev/null +++ b/website/docs/user-guide/messaging/wecom.md @@ -0,0 +1,86 @@ +--- +sidebar_position: 14 +title: "WeCom (Enterprise WeChat)" +description: "Connect Hermes Agent to WeCom via the AI Bot WebSocket gateway" +--- + +# WeCom (Enterprise WeChat) + +Connect Hermes to [WeCom](https://work.weixin.qq.com/) (企业微信), Tencent's enterprise messaging platform. The adapter uses WeCom's AI Bot WebSocket gateway for real-time bidirectional communication — no public endpoint or webhook needed. + +## Prerequisites + +- A WeCom organization account +- An AI Bot created in the WeCom Admin Console +- The Bot ID and Secret from the bot's credentials page + +## Setup + +### 1. Create an AI Bot + +1. Log in to the [WeCom Admin Console](https://work.weixin.qq.com/wework_admin/frame) +2. Navigate to **Applications** → **Create Application** → **AI Bot** +3. Configure the bot name and description +4. Copy the **Bot ID** and **Secret** from the credentials page + +### 2. Configure Hermes + +Run the interactive setup: + +```bash +hermes gateway setup +``` + +Select **WeCom** and enter your Bot ID and Secret. + +Or set environment variables in `~/.hermes/.env`: + +```bash +WECOM_BOT_ID=your-bot-id +WECOM_SECRET=your-secret + +# Optional: restrict access +WECOM_ALLOWED_USERS=user_id_1,user_id_2 + +# Optional: home channel for cron/notifications +WECOM_HOME_CHANNEL=chat_id +``` + +### 3. Start the gateway + +```bash +hermes gateway start +``` + +## Features + +- **WebSocket transport** — persistent connection, no public endpoint needed +- **DM and group messaging** — configurable access policies +- **Media support** — images, files, voice, video upload and download +- **AES-encrypted media** — automatic decryption for inbound attachments +- **Quote context** — preserves reply threading +- **Markdown rendering** — rich text responses +- **Auto-reconnect** — exponential backoff on connection drops + +## Configuration Options + +Set these in `config.yaml` under `platforms.wecom.extra`: + +| Key | Default | Description | +|-----|---------|-------------| +| `bot_id` | — | WeCom AI Bot ID (required) | +| `secret` | — | WeCom AI Bot Secret (required) | +| `websocket_url` | `wss://openws.work.weixin.qq.com` | WebSocket gateway URL | +| `dm_policy` | `open` | DM access: `open`, `allowlist`, `disabled`, `pairing` | +| `group_policy` | `open` | Group access: `open`, `allowlist`, `disabled` | +| `allow_from` | `[]` | User IDs allowed for DMs (when dm_policy=allowlist) | +| `group_allow_from` | `[]` | Group IDs allowed (when group_policy=allowlist) | + +## Troubleshooting + +| Problem | Fix | +|---------|-----| +| "WECOM_BOT_ID and WECOM_SECRET are required" | Set both env vars or configure in setup wizard | +| "invalid secret (errcode=40013)" | Verify the secret matches your bot's credentials | +| "Timed out waiting for subscribe acknowledgement" | Check network connectivity to `openws.work.weixin.qq.com` | +| Bot doesn't respond in groups | Check `group_policy` setting and group allowlist | diff --git a/website/sidebars.ts b/website/sidebars.ts index a2fafdfc..73c94303 100644 --- a/website/sidebars.ts +++ b/website/sidebars.ts @@ -55,6 +55,7 @@ const sidebars: SidebarsConfig = { 'user-guide/messaging/matrix', 'user-guide/messaging/dingtalk', 'user-guide/messaging/feishu', + 'user-guide/messaging/wecom', 'user-guide/messaging/open-webui', 'user-guide/messaging/webhooks', ],