Adds Feishu (ByteDance's enterprise messaging platform) as a gateway platform adapter with full feature parity: WebSocket + webhook transports, message batching, dedup, rate limiting, rich post/card content parsing, media handling (images/audio/files/video), group @mention gating, reaction routing, and interactive card button support. Cherry-picked from PR #1793 by penwyp with: - Moved to current main (PR was 458 commits behind) - Fixed _send_with_retry shadowing BasePlatformAdapter method (renamed to _feishu_send_with_retry to avoid signature mismatch crash) - Fixed import structure: aiohttp/websockets imported independently of lark_oapi so they remain available when SDK is missing - Fixed get_hermes_home import (hermes_constants, not hermes_cli.config) - Added skip decorators for tests requiring lark_oapi SDK - All 16 integration points added surgically to current main New dependency: lark-oapi>=1.5.3,<2 (optional, pip install hermes-agent[feishu]) Fixes #1788 Co-authored-by: penwyp <penwyp@users.noreply.github.com>
3256 lines
133 KiB
Python
3256 lines
133 KiB
Python
"""
|
|
Feishu/Lark platform adapter.
|
|
|
|
Supports:
|
|
- WebSocket long connection and Webhook transport
|
|
- Direct-message and group @mention-gated text receive/send
|
|
- Inbound image/file/audio/media caching
|
|
- Gateway allowlist integration via FEISHU_ALLOWED_USERS
|
|
- Persistent dedup state across restarts
|
|
- Per-chat serial message processing (matches openclaw createChatQueue)
|
|
- Persistent ACK emoji reaction on inbound messages
|
|
- Reaction events routed as synthetic text events (matches openclaw)
|
|
- Interactive card button-click events routed as synthetic COMMAND events
|
|
- Webhook anomaly tracking (matches openclaw createWebhookAnomalyTracker)
|
|
- Verification token validation as second auth layer (matches openclaw)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import logging
|
|
import mimetypes
|
|
import os
|
|
import re
|
|
import threading
|
|
import time
|
|
import uuid
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
# aiohttp/websockets are independent optional deps — import outside lark_oapi
|
|
# so they remain available for tests and webhook mode even if lark_oapi is missing.
|
|
try:
|
|
import aiohttp
|
|
from aiohttp import web
|
|
except ImportError:
|
|
aiohttp = None # type: ignore[assignment]
|
|
web = None # type: ignore[assignment]
|
|
|
|
try:
|
|
import websockets
|
|
except ImportError:
|
|
websockets = None # type: ignore[assignment]
|
|
|
|
try:
|
|
import lark_oapi as lark
|
|
from lark_oapi.api.application.v6 import GetApplicationRequest
|
|
from lark_oapi.api.im.v1 import (
|
|
CreateFileRequest,
|
|
CreateFileRequestBody,
|
|
CreateImageRequest,
|
|
CreateImageRequestBody,
|
|
CreateMessageRequest,
|
|
CreateMessageRequestBody,
|
|
GetChatRequest,
|
|
GetMessageRequest,
|
|
GetImageRequest,
|
|
GetMessageResourceRequest,
|
|
P2ImMessageMessageReadV1,
|
|
ReplyMessageRequest,
|
|
ReplyMessageRequestBody,
|
|
UpdateMessageRequest,
|
|
UpdateMessageRequestBody,
|
|
)
|
|
from lark_oapi.core.const import FEISHU_DOMAIN, LARK_DOMAIN
|
|
from lark_oapi.event.callback.model.p2_card_action_trigger import P2CardActionTriggerResponse
|
|
from lark_oapi.event.dispatcher_handler import EventDispatcherHandler
|
|
from lark_oapi.ws import Client as FeishuWSClient
|
|
|
|
FEISHU_AVAILABLE = True
|
|
except ImportError:
|
|
FEISHU_AVAILABLE = False
|
|
lark = None # type: ignore[assignment]
|
|
P2CardActionTriggerResponse = None # type: ignore[assignment]
|
|
EventDispatcherHandler = None # type: ignore[assignment]
|
|
FeishuWSClient = None # type: ignore[assignment]
|
|
FEISHU_DOMAIN = None # type: ignore[assignment]
|
|
LARK_DOMAIN = None # type: ignore[assignment]
|
|
|
|
FEISHU_WEBSOCKET_AVAILABLE = websockets is not None
|
|
FEISHU_WEBHOOK_AVAILABLE = aiohttp is not None
|
|
|
|
from gateway.config import Platform, PlatformConfig
|
|
from gateway.platforms.base import (
|
|
BasePlatformAdapter,
|
|
MessageEvent,
|
|
MessageType,
|
|
SendResult,
|
|
SUPPORTED_DOCUMENT_TYPES,
|
|
cache_document_from_bytes,
|
|
cache_image_from_url,
|
|
cache_audio_from_bytes,
|
|
cache_image_from_bytes,
|
|
)
|
|
from gateway.status import acquire_scoped_lock, release_scoped_lock
|
|
from hermes_constants import get_hermes_home
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Regex patterns
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_MARKDOWN_HINT_RE = re.compile(
|
|
r"(^#{1,6}\s)|(^\s*[-*]\s)|(^\s*\d+\.\s)|(^\s*---+\s*$)|(```)|(`[^`\n]+`)|(\*\*[^*\n].+?\*\*)|(~~[^~\n].+?~~)|(<u>.+?</u>)|(\*[^*\n]+\*)|(\[[^\]]+\]\([^)]+\))|(^>\s)",
|
|
re.MULTILINE,
|
|
)
|
|
_MARKDOWN_LINK_RE = re.compile(r"\[([^\]]+)\]\(([^)]+)\)")
|
|
_MENTION_RE = re.compile(r"@_user_\d+")
|
|
_MULTISPACE_RE = re.compile(r"[ \t]{2,}")
|
|
_POST_CONTENT_INVALID_RE = re.compile(r"content format of the post type is incorrect", re.IGNORECASE)
|
|
# ---------------------------------------------------------------------------
|
|
# Media type sets and upload constants
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"}
|
|
_AUDIO_EXTENSIONS = {".ogg", ".mp3", ".wav", ".m4a", ".aac", ".flac", ".opus", ".webm"}
|
|
_VIDEO_EXTENSIONS = {".mp4", ".mov", ".avi", ".mkv", ".webm", ".m4v", ".3gp"}
|
|
_DOCUMENT_MIME_TO_EXT = {mime: ext for ext, mime in SUPPORTED_DOCUMENT_TYPES.items()}
|
|
_FEISHU_IMAGE_UPLOAD_TYPE = "message"
|
|
_FEISHU_FILE_UPLOAD_TYPE = "stream"
|
|
_FEISHU_OPUS_UPLOAD_EXTENSIONS = {".ogg", ".opus"}
|
|
_FEISHU_MEDIA_UPLOAD_EXTENSIONS = {".mp4", ".mov", ".avi", ".m4v"}
|
|
_FEISHU_DOC_UPLOAD_TYPES = {
|
|
".pdf": "pdf",
|
|
".doc": "doc",
|
|
".docx": "doc",
|
|
".xls": "xls",
|
|
".xlsx": "xls",
|
|
".ppt": "ppt",
|
|
".pptx": "ppt",
|
|
}
|
|
# ---------------------------------------------------------------------------
|
|
# Connection, retry and batching tuning
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_MAX_TEXT_INJECT_BYTES = 100 * 1024
|
|
_FEISHU_CONNECT_ATTEMPTS = 3
|
|
_FEISHU_SEND_ATTEMPTS = 3
|
|
_FEISHU_APP_LOCK_SCOPE = "feishu-app-id"
|
|
_DEFAULT_TEXT_BATCH_DELAY_SECONDS = 0.6
|
|
_DEFAULT_TEXT_BATCH_MAX_MESSAGES = 8
|
|
_DEFAULT_TEXT_BATCH_MAX_CHARS = 4000
|
|
_DEFAULT_MEDIA_BATCH_DELAY_SECONDS = 0.8
|
|
_DEFAULT_DEDUP_CACHE_SIZE = 2048
|
|
_DEFAULT_WEBHOOK_HOST = "127.0.0.1"
|
|
_DEFAULT_WEBHOOK_PORT = 8765
|
|
_DEFAULT_WEBHOOK_PATH = "/feishu/webhook"
|
|
# ---------------------------------------------------------------------------
|
|
# TTL, rate-limit and webhook security constants
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_FEISHU_DEDUP_TTL_SECONDS = 24 * 60 * 60 # 24 hours — matches openclaw
|
|
_FEISHU_SENDER_NAME_TTL_SECONDS = 10 * 60 # 10 minutes sender-name cache
|
|
_FEISHU_WEBHOOK_MAX_BODY_BYTES = 1 * 1024 * 1024 # 1 MB body limit
|
|
_FEISHU_WEBHOOK_RATE_WINDOW_SECONDS = 60 # sliding window for rate limiter
|
|
_FEISHU_WEBHOOK_RATE_LIMIT_MAX = 120 # max requests per window per IP — matches openclaw
|
|
_FEISHU_WEBHOOK_RATE_MAX_KEYS = 4096 # max tracked keys (prevents unbounded growth)
|
|
_FEISHU_WEBHOOK_BODY_TIMEOUT_SECONDS = 30 # max seconds to read request body
|
|
_FEISHU_WEBHOOK_ANOMALY_THRESHOLD = 25 # consecutive error responses before WARNING log
|
|
_FEISHU_WEBHOOK_ANOMALY_TTL_SECONDS = 6 * 60 * 60 # anomaly tracker TTL (6 hours) — matches openclaw
|
|
_FEISHU_CARD_ACTION_DEDUP_TTL_SECONDS = 15 * 60 # card action token dedup window (15 min)
|
|
_FEISHU_BOT_MSG_TRACK_SIZE = 512 # LRU size for tracking sent message IDs
|
|
_FEISHU_REPLY_FALLBACK_CODES = frozenset({230011, 231003}) # reply target withdrawn/missing → create fallback
|
|
_FEISHU_ACK_EMOJI = "OK"
|
|
# ---------------------------------------------------------------------------
|
|
# Fallback display strings
|
|
# ---------------------------------------------------------------------------
|
|
|
|
FALLBACK_POST_TEXT = "[Rich text message]"
|
|
FALLBACK_FORWARD_TEXT = "[Merged forward message]"
|
|
FALLBACK_SHARE_CHAT_TEXT = "[Shared chat]"
|
|
FALLBACK_INTERACTIVE_TEXT = "[Interactive message]"
|
|
FALLBACK_IMAGE_TEXT = "[Image]"
|
|
FALLBACK_ATTACHMENT_TEXT = "[Attachment]"
|
|
# ---------------------------------------------------------------------------
|
|
# Post/card parsing helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_PREFERRED_LOCALES = ("zh_cn", "en_us")
|
|
_MARKDOWN_SPECIAL_CHARS_RE = re.compile(r"([\\`*_{}\[\]()#+\-!|>~])")
|
|
_MENTION_PLACEHOLDER_RE = re.compile(r"@_user_\d+")
|
|
_WHITESPACE_RE = re.compile(r"\s+")
|
|
_SUPPORTED_CARD_TEXT_KEYS = (
|
|
"title",
|
|
"text",
|
|
"content",
|
|
"label",
|
|
"value",
|
|
"name",
|
|
"summary",
|
|
"subtitle",
|
|
"description",
|
|
"placeholder",
|
|
"hint",
|
|
)
|
|
_SKIP_TEXT_KEYS = {
|
|
"tag",
|
|
"type",
|
|
"msg_type",
|
|
"message_type",
|
|
"chat_id",
|
|
"open_chat_id",
|
|
"share_chat_id",
|
|
"file_key",
|
|
"image_key",
|
|
"user_id",
|
|
"open_id",
|
|
"union_id",
|
|
"url",
|
|
"href",
|
|
"link",
|
|
"token",
|
|
"template",
|
|
"locale",
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class FeishuPostMediaRef:
|
|
file_key: str
|
|
file_name: str = ""
|
|
resource_type: str = "file"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class FeishuPostParseResult:
|
|
text_content: str
|
|
image_keys: List[str] = field(default_factory=list)
|
|
media_refs: List[FeishuPostMediaRef] = field(default_factory=list)
|
|
mentioned_ids: List[str] = field(default_factory=list)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class FeishuNormalizedMessage:
|
|
raw_type: str
|
|
text_content: str
|
|
preferred_message_type: str = "text"
|
|
image_keys: List[str] = field(default_factory=list)
|
|
media_refs: List[FeishuPostMediaRef] = field(default_factory=list)
|
|
mentioned_ids: List[str] = field(default_factory=list)
|
|
relation_kind: str = "plain"
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class FeishuAdapterSettings:
|
|
app_id: str
|
|
app_secret: str
|
|
domain_name: str
|
|
connection_mode: str
|
|
encrypt_key: str
|
|
verification_token: str
|
|
group_policy: str
|
|
allowed_group_users: frozenset[str]
|
|
bot_open_id: str
|
|
bot_user_id: str
|
|
bot_name: str
|
|
dedup_cache_size: int
|
|
text_batch_delay_seconds: float
|
|
text_batch_max_messages: int
|
|
text_batch_max_chars: int
|
|
media_batch_delay_seconds: float
|
|
webhook_host: str
|
|
webhook_port: int
|
|
webhook_path: str
|
|
|
|
|
|
@dataclass
|
|
class FeishuBatchState:
|
|
events: Dict[str, MessageEvent] = field(default_factory=dict)
|
|
tasks: Dict[str, asyncio.Task] = field(default_factory=dict)
|
|
counts: Dict[str, int] = field(default_factory=dict)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Markdown rendering helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _escape_markdown_text(text: str) -> str:
|
|
return _MARKDOWN_SPECIAL_CHARS_RE.sub(r"\\\1", text)
|
|
|
|
|
|
def _to_boolean(value: Any) -> bool:
|
|
return value is True or value == 1 or value == "true"
|
|
|
|
|
|
def _is_style_enabled(style: Dict[str, Any] | None, key: str) -> bool:
|
|
if not style:
|
|
return False
|
|
return _to_boolean(style.get(key))
|
|
|
|
|
|
def _wrap_inline_code(text: str) -> str:
|
|
max_run = max([0, *[len(run) for run in re.findall(r"`+", text)]])
|
|
fence = "`" * (max_run + 1)
|
|
body = f" {text} " if text.startswith("`") or text.endswith("`") else text
|
|
return f"{fence}{body}{fence}"
|
|
|
|
|
|
def _sanitize_fence_language(language: str) -> str:
|
|
return language.strip().replace("\n", " ").replace("\r", " ")
|
|
|
|
|
|
def _render_text_element(element: Dict[str, Any]) -> str:
|
|
text = str(element.get("text", "") or "")
|
|
style = element.get("style")
|
|
style_dict = style if isinstance(style, dict) else None
|
|
|
|
if _is_style_enabled(style_dict, "code"):
|
|
return _wrap_inline_code(text)
|
|
|
|
rendered = _escape_markdown_text(text)
|
|
if not rendered:
|
|
return ""
|
|
if _is_style_enabled(style_dict, "bold"):
|
|
rendered = f"**{rendered}**"
|
|
if _is_style_enabled(style_dict, "italic"):
|
|
rendered = f"*{rendered}*"
|
|
if _is_style_enabled(style_dict, "underline"):
|
|
rendered = f"<u>{rendered}</u>"
|
|
if _is_style_enabled(style_dict, "strikethrough"):
|
|
rendered = f"~~{rendered}~~"
|
|
return rendered
|
|
|
|
|
|
def _render_code_block_element(element: Dict[str, Any]) -> str:
|
|
language = _sanitize_fence_language(
|
|
str(element.get("language", "") or "") or str(element.get("lang", "") or "")
|
|
)
|
|
code = (
|
|
str(element.get("text", "") or "") or str(element.get("content", "") or "")
|
|
).replace("\r\n", "\n")
|
|
trailing_newline = "" if code.endswith("\n") else "\n"
|
|
return f"```{language}\n{code}{trailing_newline}```"
|
|
|
|
|
|
def _strip_markdown_to_plain_text(text: str) -> str:
|
|
plain = text.replace("\r\n", "\n")
|
|
plain = _MARKDOWN_LINK_RE.sub(lambda m: f"{m.group(1)} ({m.group(2).strip()})", plain)
|
|
plain = re.sub(r"^#{1,6}\s+", "", plain, flags=re.MULTILINE)
|
|
plain = re.sub(r"^>\s?", "", plain, flags=re.MULTILINE)
|
|
plain = re.sub(r"^\s*---+\s*$", "---", plain, flags=re.MULTILINE)
|
|
plain = re.sub(r"```(?:[^\n]*\n)?([\s\S]*?)```", lambda m: m.group(1).strip("\n"), plain)
|
|
plain = re.sub(r"`([^`\n]+)`", r"\1", plain)
|
|
plain = re.sub(r"\*\*([^*\n]+)\*\*", r"\1", plain)
|
|
plain = re.sub(r"\*([^*\n]+)\*", r"\1", plain)
|
|
plain = re.sub(r"~~([^~\n]+)~~", r"\1", plain)
|
|
plain = re.sub(r"<u>([\s\S]*?)</u>", r"\1", plain)
|
|
plain = re.sub(r"\n{3,}", "\n\n", plain)
|
|
return plain.strip()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Post payload builders and parsers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _build_markdown_post_payload(content: str) -> str:
|
|
return json.dumps(
|
|
{
|
|
"zh_cn": {
|
|
"content": [
|
|
[
|
|
{
|
|
"tag": "md",
|
|
"text": content,
|
|
}
|
|
]
|
|
],
|
|
}
|
|
},
|
|
ensure_ascii=False,
|
|
)
|
|
|
|
|
|
def parse_feishu_post_content(raw_content: str) -> FeishuPostParseResult:
|
|
try:
|
|
parsed = json.loads(raw_content) if raw_content else {}
|
|
except json.JSONDecodeError:
|
|
return FeishuPostParseResult(text_content=FALLBACK_POST_TEXT)
|
|
return parse_feishu_post_payload(parsed)
|
|
|
|
|
|
def parse_feishu_post_payload(payload: Any) -> FeishuPostParseResult:
|
|
resolved = _resolve_post_payload(payload)
|
|
if not resolved:
|
|
return FeishuPostParseResult(text_content=FALLBACK_POST_TEXT)
|
|
|
|
image_keys: List[str] = []
|
|
media_refs: List[FeishuPostMediaRef] = []
|
|
mentioned_ids: List[str] = []
|
|
parts: List[str] = []
|
|
|
|
title = _normalize_feishu_text(str(resolved.get("title", "")).strip())
|
|
if title:
|
|
parts.append(title)
|
|
|
|
for row in resolved.get("content", []) or []:
|
|
if not isinstance(row, list):
|
|
continue
|
|
row_text = _normalize_feishu_text(
|
|
"".join(_render_post_element(item, image_keys, media_refs, mentioned_ids) for item in row)
|
|
)
|
|
if row_text:
|
|
parts.append(row_text)
|
|
|
|
return FeishuPostParseResult(
|
|
text_content="\n".join(parts).strip() or FALLBACK_POST_TEXT,
|
|
image_keys=image_keys,
|
|
media_refs=media_refs,
|
|
mentioned_ids=mentioned_ids,
|
|
)
|
|
|
|
|
|
def _resolve_post_payload(payload: Any) -> Dict[str, Any]:
|
|
direct = _to_post_payload(payload)
|
|
if direct:
|
|
return direct
|
|
if not isinstance(payload, dict):
|
|
return {}
|
|
|
|
wrapped = payload.get("post")
|
|
wrapped_direct = _resolve_locale_payload(wrapped)
|
|
if wrapped_direct:
|
|
return wrapped_direct
|
|
return _resolve_locale_payload(payload)
|
|
|
|
|
|
def _resolve_locale_payload(payload: Any) -> Dict[str, Any]:
|
|
direct = _to_post_payload(payload)
|
|
if direct:
|
|
return direct
|
|
if not isinstance(payload, dict):
|
|
return {}
|
|
|
|
for key in _PREFERRED_LOCALES:
|
|
candidate = _to_post_payload(payload.get(key))
|
|
if candidate:
|
|
return candidate
|
|
for value in payload.values():
|
|
candidate = _to_post_payload(value)
|
|
if candidate:
|
|
return candidate
|
|
return {}
|
|
|
|
|
|
def _to_post_payload(candidate: Any) -> Dict[str, Any]:
|
|
if not isinstance(candidate, dict):
|
|
return {}
|
|
content = candidate.get("content")
|
|
if not isinstance(content, list):
|
|
return {}
|
|
return {
|
|
"title": str(candidate.get("title", "") or ""),
|
|
"content": content,
|
|
}
|
|
|
|
|
|
def _render_post_element(
|
|
element: Any,
|
|
image_keys: List[str],
|
|
media_refs: List[FeishuPostMediaRef],
|
|
mentioned_ids: List[str],
|
|
) -> str:
|
|
if isinstance(element, str):
|
|
return element
|
|
if not isinstance(element, dict):
|
|
return ""
|
|
|
|
tag = str(element.get("tag", "")).strip().lower()
|
|
if tag == "text":
|
|
return _render_text_element(element)
|
|
if tag == "a":
|
|
href = str(element.get("href", "")).strip()
|
|
label = str(element.get("text", href) or "").strip()
|
|
if not label:
|
|
return ""
|
|
escaped_label = _escape_markdown_text(label)
|
|
return f"[{escaped_label}]({href})" if href else escaped_label
|
|
if tag == "at":
|
|
mentioned_id = (
|
|
str(element.get("open_id", "")).strip()
|
|
or str(element.get("user_id", "")).strip()
|
|
)
|
|
if mentioned_id and mentioned_id not in mentioned_ids:
|
|
mentioned_ids.append(mentioned_id)
|
|
display_name = (
|
|
str(element.get("user_name", "")).strip()
|
|
or str(element.get("name", "")).strip()
|
|
or str(element.get("text", "")).strip()
|
|
or mentioned_id
|
|
)
|
|
return f"@{_escape_markdown_text(display_name)}" if display_name else "@"
|
|
if tag in {"img", "image"}:
|
|
image_key = str(element.get("image_key", "")).strip()
|
|
if image_key and image_key not in image_keys:
|
|
image_keys.append(image_key)
|
|
alt = str(element.get("text", "")).strip() or str(element.get("alt", "")).strip()
|
|
return f"[Image: {alt}]" if alt else "[Image]"
|
|
if tag in {"media", "file", "audio", "video"}:
|
|
file_key = str(element.get("file_key", "")).strip()
|
|
file_name = (
|
|
str(element.get("file_name", "")).strip()
|
|
or str(element.get("title", "")).strip()
|
|
or str(element.get("text", "")).strip()
|
|
)
|
|
if file_key:
|
|
media_refs.append(
|
|
FeishuPostMediaRef(
|
|
file_key=file_key,
|
|
file_name=file_name,
|
|
resource_type=tag if tag in {"audio", "video"} else "file",
|
|
)
|
|
)
|
|
return f"[Attachment: {file_name}]" if file_name else "[Attachment]"
|
|
if tag in {"emotion", "emoji"}:
|
|
label = str(element.get("text", "")).strip() or str(element.get("emoji_type", "")).strip()
|
|
return f":{_escape_markdown_text(label)}:" if label else "[Emoji]"
|
|
if tag == "br":
|
|
return "\n"
|
|
if tag in {"hr", "divider"}:
|
|
return "\n\n---\n\n"
|
|
if tag == "code":
|
|
code = str(element.get("text", "") or "") or str(element.get("content", "") or "")
|
|
return _wrap_inline_code(code) if code else ""
|
|
if tag in {"code_block", "pre"}:
|
|
return _render_code_block_element(element)
|
|
|
|
nested_parts: List[str] = []
|
|
for key in ("text", "title", "content", "children", "elements"):
|
|
value = element.get(key)
|
|
extracted = _render_nested_post(value, image_keys, media_refs, mentioned_ids)
|
|
if extracted:
|
|
nested_parts.append(extracted)
|
|
return " ".join(part for part in nested_parts if part)
|
|
|
|
|
|
def _render_nested_post(
|
|
value: Any,
|
|
image_keys: List[str],
|
|
media_refs: List[FeishuPostMediaRef],
|
|
mentioned_ids: List[str],
|
|
) -> str:
|
|
if isinstance(value, str):
|
|
return _escape_markdown_text(value)
|
|
if isinstance(value, list):
|
|
return " ".join(
|
|
part
|
|
for item in value
|
|
for part in [_render_nested_post(item, image_keys, media_refs, mentioned_ids)]
|
|
if part
|
|
)
|
|
if isinstance(value, dict):
|
|
direct = _render_post_element(value, image_keys, media_refs, mentioned_ids)
|
|
if direct:
|
|
return direct
|
|
return " ".join(
|
|
part
|
|
for item in value.values()
|
|
for part in [_render_nested_post(item, image_keys, media_refs, mentioned_ids)]
|
|
if part
|
|
)
|
|
return ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Message normalization
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def normalize_feishu_message(*, message_type: str, raw_content: str) -> FeishuNormalizedMessage:
|
|
normalized_type = str(message_type or "").strip().lower()
|
|
payload = _load_feishu_payload(raw_content)
|
|
|
|
if normalized_type == "text":
|
|
return FeishuNormalizedMessage(
|
|
raw_type=normalized_type,
|
|
text_content=_normalize_feishu_text(str(payload.get("text", "") or "")),
|
|
)
|
|
if normalized_type == "post":
|
|
parsed_post = parse_feishu_post_payload(payload)
|
|
return FeishuNormalizedMessage(
|
|
raw_type=normalized_type,
|
|
text_content=parsed_post.text_content,
|
|
image_keys=list(parsed_post.image_keys),
|
|
media_refs=list(parsed_post.media_refs),
|
|
mentioned_ids=list(parsed_post.mentioned_ids),
|
|
relation_kind="post",
|
|
)
|
|
if normalized_type == "image":
|
|
image_key = str(payload.get("image_key", "") or "").strip()
|
|
alt_text = _normalize_feishu_text(
|
|
str(payload.get("text", "") or "")
|
|
or str(payload.get("alt", "") or "")
|
|
or FALLBACK_IMAGE_TEXT
|
|
)
|
|
return FeishuNormalizedMessage(
|
|
raw_type=normalized_type,
|
|
text_content=alt_text if alt_text != FALLBACK_IMAGE_TEXT else "",
|
|
preferred_message_type="photo",
|
|
image_keys=[image_key] if image_key else [],
|
|
relation_kind="image",
|
|
)
|
|
if normalized_type in {"file", "audio", "media"}:
|
|
media_ref = _build_media_ref_from_payload(payload, resource_type=normalized_type)
|
|
placeholder = _attachment_placeholder(media_ref.file_name)
|
|
return FeishuNormalizedMessage(
|
|
raw_type=normalized_type,
|
|
text_content="",
|
|
preferred_message_type="audio" if normalized_type == "audio" else "document",
|
|
media_refs=[media_ref] if media_ref.file_key else [],
|
|
relation_kind=normalized_type,
|
|
metadata={"placeholder_text": placeholder},
|
|
)
|
|
if normalized_type == "merge_forward":
|
|
return _normalize_merge_forward_message(payload)
|
|
if normalized_type == "share_chat":
|
|
return _normalize_share_chat_message(payload)
|
|
if normalized_type in {"interactive", "card"}:
|
|
return _normalize_interactive_message(normalized_type, payload)
|
|
|
|
return FeishuNormalizedMessage(raw_type=normalized_type, text_content="")
|
|
|
|
|
|
def _load_feishu_payload(raw_content: str) -> Dict[str, Any]:
|
|
try:
|
|
parsed = json.loads(raw_content) if raw_content else {}
|
|
except json.JSONDecodeError:
|
|
return {"text": raw_content}
|
|
return parsed if isinstance(parsed, dict) else {"content": parsed}
|
|
|
|
|
|
def _normalize_merge_forward_message(payload: Dict[str, Any]) -> FeishuNormalizedMessage:
|
|
title = _first_non_empty_text(
|
|
payload.get("title"),
|
|
payload.get("summary"),
|
|
payload.get("preview"),
|
|
_find_first_text(payload, keys=("title", "summary", "preview", "description")),
|
|
)
|
|
entries = _collect_forward_entries(payload)
|
|
lines: List[str] = []
|
|
if title:
|
|
lines.append(title)
|
|
lines.extend(entries[:8])
|
|
text_content = "\n".join(lines).strip() or FALLBACK_FORWARD_TEXT
|
|
return FeishuNormalizedMessage(
|
|
raw_type="merge_forward",
|
|
text_content=text_content,
|
|
relation_kind="merge_forward",
|
|
metadata={"entry_count": len(entries), "title": title},
|
|
)
|
|
|
|
|
|
def _normalize_share_chat_message(payload: Dict[str, Any]) -> FeishuNormalizedMessage:
|
|
chat_name = _first_non_empty_text(
|
|
payload.get("chat_name"),
|
|
payload.get("name"),
|
|
payload.get("title"),
|
|
_find_first_text(payload, keys=("chat_name", "name", "title")),
|
|
)
|
|
share_id = _first_non_empty_text(
|
|
payload.get("chat_id"),
|
|
payload.get("open_chat_id"),
|
|
payload.get("share_chat_id"),
|
|
)
|
|
lines = []
|
|
if chat_name:
|
|
lines.append(f"Shared chat: {chat_name}")
|
|
else:
|
|
lines.append(FALLBACK_SHARE_CHAT_TEXT)
|
|
if share_id:
|
|
lines.append(f"Chat ID: {share_id}")
|
|
text_content = "\n".join(lines)
|
|
return FeishuNormalizedMessage(
|
|
raw_type="share_chat",
|
|
text_content=text_content,
|
|
relation_kind="share_chat",
|
|
metadata={"chat_id": share_id, "chat_name": chat_name},
|
|
)
|
|
|
|
|
|
def _normalize_interactive_message(message_type: str, payload: Dict[str, Any]) -> FeishuNormalizedMessage:
|
|
card_payload = payload.get("card") if isinstance(payload.get("card"), dict) else payload
|
|
title = _first_non_empty_text(
|
|
_find_header_title(card_payload),
|
|
payload.get("title"),
|
|
_find_first_text(card_payload, keys=("title", "summary", "subtitle")),
|
|
)
|
|
body_lines = _collect_card_lines(card_payload)
|
|
actions = _collect_action_labels(card_payload)
|
|
|
|
lines: List[str] = []
|
|
if title:
|
|
lines.append(title)
|
|
for line in body_lines:
|
|
if line != title:
|
|
lines.append(line)
|
|
if actions:
|
|
lines.append(f"Actions: {', '.join(actions)}")
|
|
|
|
text_content = "\n".join(lines[:12]).strip() or FALLBACK_INTERACTIVE_TEXT
|
|
return FeishuNormalizedMessage(
|
|
raw_type=message_type,
|
|
text_content=text_content,
|
|
relation_kind="interactive",
|
|
metadata={"title": title, "actions": actions},
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Content extraction utilities (card / forward / text walking)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _collect_forward_entries(payload: Dict[str, Any]) -> List[str]:
|
|
candidates: List[Any] = []
|
|
for key in ("messages", "items", "message_list", "records", "content"):
|
|
value = payload.get(key)
|
|
if isinstance(value, list):
|
|
candidates.extend(value)
|
|
entries: List[str] = []
|
|
for item in candidates:
|
|
if not isinstance(item, dict):
|
|
text = _normalize_feishu_text(str(item or ""))
|
|
if text:
|
|
entries.append(f"- {text}")
|
|
continue
|
|
sender = _first_non_empty_text(
|
|
item.get("sender_name"),
|
|
item.get("user_name"),
|
|
item.get("sender"),
|
|
item.get("name"),
|
|
)
|
|
nested_type = str(item.get("message_type", "") or item.get("msg_type", "")).strip().lower()
|
|
if nested_type == "post":
|
|
body = parse_feishu_post_payload(item.get("content") or item).text_content
|
|
else:
|
|
body = _first_non_empty_text(
|
|
item.get("text"),
|
|
item.get("summary"),
|
|
item.get("preview"),
|
|
item.get("content"),
|
|
_find_first_text(item, keys=("text", "content", "summary", "preview", "title")),
|
|
)
|
|
body = _normalize_feishu_text(body)
|
|
if sender and body:
|
|
entries.append(f"- {sender}: {body}")
|
|
elif body:
|
|
entries.append(f"- {body}")
|
|
return _unique_lines(entries)
|
|
|
|
|
|
def _collect_card_lines(payload: Any) -> List[str]:
|
|
lines = _collect_text_segments(payload, in_rich_block=False)
|
|
normalized = [_normalize_feishu_text(line) for line in lines]
|
|
return _unique_lines([line for line in normalized if line])
|
|
|
|
|
|
def _collect_action_labels(payload: Any) -> List[str]:
|
|
labels: List[str] = []
|
|
for item in _walk_nodes(payload):
|
|
if not isinstance(item, dict):
|
|
continue
|
|
tag = str(item.get("tag", "") or item.get("type", "")).strip().lower()
|
|
if tag not in {"button", "select_static", "overflow", "date_picker", "picker"}:
|
|
continue
|
|
label = _first_non_empty_text(
|
|
item.get("text"),
|
|
item.get("name"),
|
|
item.get("value"),
|
|
_find_first_text(item, keys=("text", "content", "name", "value")),
|
|
)
|
|
if label:
|
|
labels.append(label)
|
|
return _unique_lines(labels)
|
|
|
|
|
|
def _collect_text_segments(value: Any, *, in_rich_block: bool) -> List[str]:
|
|
if isinstance(value, str):
|
|
return [_normalize_feishu_text(value)] if in_rich_block else []
|
|
if isinstance(value, list):
|
|
segments: List[str] = []
|
|
for item in value:
|
|
segments.extend(_collect_text_segments(item, in_rich_block=in_rich_block))
|
|
return segments
|
|
if not isinstance(value, dict):
|
|
return []
|
|
|
|
tag = str(value.get("tag", "") or value.get("type", "")).strip().lower()
|
|
next_in_rich_block = in_rich_block or tag in {
|
|
"plain_text",
|
|
"lark_md",
|
|
"markdown",
|
|
"note",
|
|
"div",
|
|
"column_set",
|
|
"column",
|
|
"action",
|
|
"button",
|
|
"select_static",
|
|
"date_picker",
|
|
}
|
|
|
|
segments: List[str] = []
|
|
for key in _SUPPORTED_CARD_TEXT_KEYS:
|
|
item = value.get(key)
|
|
if isinstance(item, str) and next_in_rich_block:
|
|
normalized = _normalize_feishu_text(item)
|
|
if normalized:
|
|
segments.append(normalized)
|
|
|
|
for key, item in value.items():
|
|
if key in _SKIP_TEXT_KEYS:
|
|
continue
|
|
segments.extend(_collect_text_segments(item, in_rich_block=next_in_rich_block))
|
|
return segments
|
|
|
|
|
|
def _build_media_ref_from_payload(payload: Dict[str, Any], *, resource_type: str) -> FeishuPostMediaRef:
|
|
file_key = str(payload.get("file_key", "") or "").strip()
|
|
file_name = _first_non_empty_text(
|
|
payload.get("file_name"),
|
|
payload.get("title"),
|
|
payload.get("text"),
|
|
)
|
|
effective_type = resource_type if resource_type in {"audio", "video"} else "file"
|
|
return FeishuPostMediaRef(file_key=file_key, file_name=file_name, resource_type=effective_type)
|
|
|
|
|
|
def _attachment_placeholder(file_name: str) -> str:
|
|
normalized_name = _normalize_feishu_text(file_name)
|
|
return f"[Attachment: {normalized_name}]" if normalized_name else FALLBACK_ATTACHMENT_TEXT
|
|
|
|
|
|
def _find_header_title(payload: Any) -> str:
|
|
if not isinstance(payload, dict):
|
|
return ""
|
|
header = payload.get("header")
|
|
if not isinstance(header, dict):
|
|
return ""
|
|
title = header.get("title")
|
|
if isinstance(title, dict):
|
|
return _first_non_empty_text(title.get("content"), title.get("text"), title.get("name"))
|
|
return _normalize_feishu_text(str(title or ""))
|
|
|
|
|
|
def _find_first_text(payload: Any, *, keys: tuple[str, ...]) -> str:
|
|
for node in _walk_nodes(payload):
|
|
if not isinstance(node, dict):
|
|
continue
|
|
for key in keys:
|
|
value = node.get(key)
|
|
if isinstance(value, str):
|
|
normalized = _normalize_feishu_text(value)
|
|
if normalized:
|
|
return normalized
|
|
return ""
|
|
|
|
|
|
def _walk_nodes(value: Any):
|
|
if isinstance(value, dict):
|
|
yield value
|
|
for item in value.values():
|
|
yield from _walk_nodes(item)
|
|
elif isinstance(value, list):
|
|
for item in value:
|
|
yield from _walk_nodes(item)
|
|
|
|
|
|
def _first_non_empty_text(*values: Any) -> str:
|
|
for value in values:
|
|
if isinstance(value, str):
|
|
normalized = _normalize_feishu_text(value)
|
|
if normalized:
|
|
return normalized
|
|
elif value is not None and not isinstance(value, (dict, list)):
|
|
normalized = _normalize_feishu_text(str(value))
|
|
if normalized:
|
|
return normalized
|
|
return ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# General text utilities
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _normalize_feishu_text(text: str) -> str:
|
|
cleaned = _MENTION_PLACEHOLDER_RE.sub(" ", text or "")
|
|
cleaned = cleaned.replace("\r\n", "\n").replace("\r", "\n")
|
|
cleaned = "\n".join(_WHITESPACE_RE.sub(" ", line).strip() for line in cleaned.split("\n"))
|
|
cleaned = "\n".join(line for line in cleaned.split("\n") if line)
|
|
cleaned = _MULTISPACE_RE.sub(" ", cleaned)
|
|
return cleaned.strip()
|
|
|
|
|
|
def _unique_lines(lines: List[str]) -> List[str]:
|
|
seen: set[str] = set()
|
|
unique: List[str] = []
|
|
for line in lines:
|
|
if not line or line in seen:
|
|
continue
|
|
seen.add(line)
|
|
unique.append(line)
|
|
return unique
|
|
|
|
|
|
def _run_official_feishu_ws_client(ws_client: Any) -> None:
|
|
"""Run the official Lark WS client in its own thread-local event loop."""
|
|
import lark_oapi.ws.client as ws_client_module
|
|
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
ws_client_module.loop = loop
|
|
ws_client.start()
|
|
|
|
|
|
def check_feishu_requirements() -> bool:
|
|
"""Check if Feishu/Lark dependencies are available."""
|
|
return FEISHU_AVAILABLE
|
|
|
|
|
|
class FeishuAdapter(BasePlatformAdapter):
|
|
"""Feishu/Lark bot adapter."""
|
|
|
|
MAX_MESSAGE_LENGTH = 8000
|
|
|
|
# =========================================================================
|
|
# Lifecycle — init / settings / connect / disconnect
|
|
# =========================================================================
|
|
|
|
def __init__(self, config: PlatformConfig):
|
|
super().__init__(config, Platform.FEISHU)
|
|
|
|
self._settings = self._load_settings(config.extra or {})
|
|
self._apply_settings(self._settings)
|
|
self._client: Optional[Any] = None
|
|
self._ws_client: Optional[Any] = None
|
|
self._ws_future: Optional[asyncio.Future] = None
|
|
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
|
self._webhook_runner: Optional[Any] = None
|
|
self._webhook_site: Optional[Any] = None
|
|
self._event_handler = self._build_event_handler()
|
|
self._seen_message_ids: Dict[str, float] = {} # message_id → seen_at (time.time())
|
|
self._seen_message_order: List[str] = []
|
|
self._dedup_state_path = get_hermes_home() / "feishu_seen_message_ids.json"
|
|
self._dedup_lock = threading.Lock()
|
|
self._sender_name_cache: Dict[str, tuple[str, float]] = {} # sender_id → (name, expire_at)
|
|
self._webhook_rate_counts: Dict[str, tuple[int, float]] = {} # rate_key → (count, window_start)
|
|
self._webhook_anomaly_counts: Dict[str, tuple[int, str, float]] = {} # ip → (count, last_status, first_seen)
|
|
self._card_action_tokens: Dict[str, float] = {} # token → first_seen_time
|
|
self._chat_locks: Dict[str, asyncio.Lock] = {} # chat_id → lock (per-chat serial processing)
|
|
self._sent_message_ids_to_chat: Dict[str, str] = {} # message_id → chat_id (for reaction routing)
|
|
self._sent_message_id_order: List[str] = [] # LRU order for _sent_message_ids_to_chat
|
|
self._chat_info_cache: Dict[str, Dict[str, Any]] = {}
|
|
self._message_text_cache: Dict[str, Optional[str]] = {}
|
|
self._app_lock_identity: Optional[str] = None
|
|
self._text_batch_state = FeishuBatchState()
|
|
self._pending_text_batches = self._text_batch_state.events
|
|
self._pending_text_batch_tasks = self._text_batch_state.tasks
|
|
self._pending_text_batch_counts = self._text_batch_state.counts
|
|
self._media_batch_state = FeishuBatchState()
|
|
self._pending_media_batches = self._media_batch_state.events
|
|
self._pending_media_batch_tasks = self._media_batch_state.tasks
|
|
self._load_seen_message_ids()
|
|
|
|
@staticmethod
|
|
def _load_settings(extra: Dict[str, Any]) -> FeishuAdapterSettings:
|
|
return FeishuAdapterSettings(
|
|
app_id=str(extra.get("app_id") or os.getenv("FEISHU_APP_ID", "")).strip(),
|
|
app_secret=str(extra.get("app_secret") or os.getenv("FEISHU_APP_SECRET", "")).strip(),
|
|
domain_name=str(extra.get("domain") or os.getenv("FEISHU_DOMAIN", "feishu")).strip().lower(),
|
|
connection_mode=str(
|
|
extra.get("connection_mode") or os.getenv("FEISHU_CONNECTION_MODE", "websocket")
|
|
).strip().lower(),
|
|
encrypt_key=os.getenv("FEISHU_ENCRYPT_KEY", "").strip(),
|
|
verification_token=os.getenv("FEISHU_VERIFICATION_TOKEN", "").strip(),
|
|
group_policy=os.getenv("FEISHU_GROUP_POLICY", "allowlist").strip().lower(),
|
|
allowed_group_users=frozenset(
|
|
item.strip()
|
|
for item in os.getenv("FEISHU_ALLOWED_USERS", "").split(",")
|
|
if item.strip()
|
|
),
|
|
bot_open_id=os.getenv("FEISHU_BOT_OPEN_ID", "").strip(),
|
|
bot_user_id=os.getenv("FEISHU_BOT_USER_ID", "").strip(),
|
|
bot_name=os.getenv("FEISHU_BOT_NAME", "").strip(),
|
|
dedup_cache_size=max(
|
|
32,
|
|
int(os.getenv("HERMES_FEISHU_DEDUP_CACHE_SIZE", str(_DEFAULT_DEDUP_CACHE_SIZE))),
|
|
),
|
|
text_batch_delay_seconds=float(
|
|
os.getenv("HERMES_FEISHU_TEXT_BATCH_DELAY_SECONDS", str(_DEFAULT_TEXT_BATCH_DELAY_SECONDS))
|
|
),
|
|
text_batch_max_messages=max(
|
|
1,
|
|
int(os.getenv("HERMES_FEISHU_TEXT_BATCH_MAX_MESSAGES", str(_DEFAULT_TEXT_BATCH_MAX_MESSAGES))),
|
|
),
|
|
text_batch_max_chars=max(
|
|
1,
|
|
int(os.getenv("HERMES_FEISHU_TEXT_BATCH_MAX_CHARS", str(_DEFAULT_TEXT_BATCH_MAX_CHARS))),
|
|
),
|
|
media_batch_delay_seconds=float(
|
|
os.getenv("HERMES_FEISHU_MEDIA_BATCH_DELAY_SECONDS", str(_DEFAULT_MEDIA_BATCH_DELAY_SECONDS))
|
|
),
|
|
webhook_host=str(
|
|
extra.get("webhook_host") or os.getenv("FEISHU_WEBHOOK_HOST", _DEFAULT_WEBHOOK_HOST)
|
|
).strip(),
|
|
webhook_port=int(
|
|
extra.get("webhook_port") or os.getenv("FEISHU_WEBHOOK_PORT", str(_DEFAULT_WEBHOOK_PORT))
|
|
),
|
|
webhook_path=(
|
|
str(extra.get("webhook_path") or os.getenv("FEISHU_WEBHOOK_PATH", _DEFAULT_WEBHOOK_PATH)).strip()
|
|
or _DEFAULT_WEBHOOK_PATH
|
|
),
|
|
)
|
|
|
|
def _apply_settings(self, settings: FeishuAdapterSettings) -> None:
|
|
self._app_id = settings.app_id
|
|
self._app_secret = settings.app_secret
|
|
self._domain_name = settings.domain_name
|
|
self._connection_mode = settings.connection_mode
|
|
self._encrypt_key = settings.encrypt_key
|
|
self._verification_token = settings.verification_token
|
|
self._group_policy = settings.group_policy
|
|
self._allowed_group_users = set(settings.allowed_group_users)
|
|
self._bot_open_id = settings.bot_open_id
|
|
self._bot_user_id = settings.bot_user_id
|
|
self._bot_name = settings.bot_name
|
|
self._dedup_cache_size = settings.dedup_cache_size
|
|
self._text_batch_delay_seconds = settings.text_batch_delay_seconds
|
|
self._text_batch_max_messages = settings.text_batch_max_messages
|
|
self._text_batch_max_chars = settings.text_batch_max_chars
|
|
self._media_batch_delay_seconds = settings.media_batch_delay_seconds
|
|
self._webhook_host = settings.webhook_host
|
|
self._webhook_port = settings.webhook_port
|
|
self._webhook_path = settings.webhook_path
|
|
|
|
def _build_event_handler(self) -> Any:
|
|
if EventDispatcherHandler is None:
|
|
return None
|
|
return (
|
|
EventDispatcherHandler.builder(
|
|
self._encrypt_key,
|
|
self._verification_token,
|
|
)
|
|
.register_p2_im_message_message_read_v1(self._on_message_read_event)
|
|
.register_p2_im_message_receive_v1(self._on_message_event)
|
|
.register_p2_im_message_reaction_created_v1(
|
|
lambda data: self._on_reaction_event("im.message.reaction.created_v1", data)
|
|
)
|
|
.register_p2_im_message_reaction_deleted_v1(
|
|
lambda data: self._on_reaction_event("im.message.reaction.deleted_v1", data)
|
|
)
|
|
.register_p2_card_action_trigger(self._on_card_action_trigger)
|
|
.build()
|
|
)
|
|
|
|
async def connect(self) -> bool:
|
|
"""Connect to Feishu/Lark."""
|
|
if not FEISHU_AVAILABLE:
|
|
logger.error("[Feishu] lark-oapi not installed")
|
|
return False
|
|
if not self._app_id or not self._app_secret:
|
|
logger.error("[Feishu] FEISHU_APP_ID or FEISHU_APP_SECRET not set")
|
|
return False
|
|
if self._connection_mode not in {"websocket", "webhook"}:
|
|
logger.error(
|
|
"[Feishu] Unsupported FEISHU_CONNECTION_MODE=%s. Supported modes: websocket, webhook.",
|
|
self._connection_mode,
|
|
)
|
|
return False
|
|
|
|
try:
|
|
self._app_lock_identity = self._app_id
|
|
acquired, existing = acquire_scoped_lock(
|
|
_FEISHU_APP_LOCK_SCOPE,
|
|
self._app_lock_identity,
|
|
metadata={"platform": self.platform.value},
|
|
)
|
|
if not acquired:
|
|
owner_pid = existing.get("pid") if isinstance(existing, dict) else None
|
|
message = (
|
|
"Another local Hermes gateway is already using this Feishu app_id"
|
|
+ (f" (PID {owner_pid})." if owner_pid else ".")
|
|
+ " Stop the other gateway before starting a second Feishu websocket client."
|
|
)
|
|
logger.error("[Feishu] %s", message)
|
|
self._set_fatal_error("feishu_app_lock", message, retryable=False)
|
|
return False
|
|
|
|
self._loop = asyncio.get_running_loop()
|
|
await self._connect_with_retry()
|
|
self._mark_connected()
|
|
logger.info("[Feishu] Connected in %s mode (%s)", self._connection_mode, self._domain_name)
|
|
return True
|
|
except Exception as exc:
|
|
await self._release_app_lock()
|
|
message = f"Feishu startup failed: {exc}"
|
|
self._set_fatal_error("feishu_connect_error", message, retryable=True)
|
|
logger.error("[Feishu] Failed to connect: %s", exc, exc_info=True)
|
|
return False
|
|
|
|
async def disconnect(self) -> None:
|
|
"""Disconnect from Feishu/Lark."""
|
|
self._running = False
|
|
await self._cancel_pending_tasks(self._pending_text_batch_tasks)
|
|
await self._cancel_pending_tasks(self._pending_media_batch_tasks)
|
|
self._reset_batch_buffers()
|
|
self._disable_websocket_auto_reconnect()
|
|
await self._stop_webhook_server()
|
|
self._ws_future = None
|
|
self._loop = None
|
|
self._persist_seen_message_ids()
|
|
await self._release_app_lock()
|
|
|
|
self._mark_disconnected()
|
|
logger.info("[Feishu] Disconnected")
|
|
|
|
async def _cancel_pending_tasks(self, tasks: Dict[str, asyncio.Task]) -> None:
|
|
pending = [task for task in tasks.values() if task and not task.done()]
|
|
for task in pending:
|
|
task.cancel()
|
|
if pending:
|
|
await asyncio.gather(*pending, return_exceptions=True)
|
|
tasks.clear()
|
|
|
|
def _reset_batch_buffers(self) -> None:
|
|
self._pending_text_batches.clear()
|
|
self._pending_text_batch_counts.clear()
|
|
self._pending_media_batches.clear()
|
|
|
|
def _disable_websocket_auto_reconnect(self) -> None:
|
|
if self._ws_client is None:
|
|
return
|
|
try:
|
|
setattr(self._ws_client, "_auto_reconnect", False)
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
self._ws_client = None
|
|
|
|
async def _stop_webhook_server(self) -> None:
|
|
if self._webhook_runner is None:
|
|
return
|
|
try:
|
|
await self._webhook_runner.cleanup()
|
|
finally:
|
|
self._webhook_runner = None
|
|
self._webhook_site = None
|
|
|
|
# =========================================================================
|
|
# Outbound — send / edit / send_image / send_voice / …
|
|
# =========================================================================
|
|
|
|
async def send(
|
|
self,
|
|
chat_id: str,
|
|
content: str,
|
|
reply_to: Optional[str] = None,
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
) -> SendResult:
|
|
"""Send a Feishu message."""
|
|
if not self._client:
|
|
return SendResult(success=False, error="Not connected")
|
|
|
|
formatted = self.format_message(content)
|
|
chunks = self.truncate_message(formatted, self.MAX_MESSAGE_LENGTH)
|
|
last_response = None
|
|
|
|
try:
|
|
for chunk in chunks:
|
|
msg_type, payload = self._build_outbound_payload(chunk)
|
|
try:
|
|
response = await self._feishu_send_with_retry(
|
|
chat_id=chat_id,
|
|
msg_type=msg_type,
|
|
payload=payload,
|
|
reply_to=reply_to,
|
|
metadata=metadata,
|
|
)
|
|
except Exception as exc:
|
|
if msg_type != "post" or not _POST_CONTENT_INVALID_RE.search(str(exc)):
|
|
raise
|
|
logger.warning("[Feishu] Invalid post payload rejected by API; falling back to plain text")
|
|
response = await self._feishu_send_with_retry(
|
|
chat_id=chat_id,
|
|
msg_type="text",
|
|
payload=json.dumps({"text": _strip_markdown_to_plain_text(chunk)}, ensure_ascii=False),
|
|
reply_to=reply_to,
|
|
metadata=metadata,
|
|
)
|
|
if (
|
|
msg_type == "post"
|
|
and not self._response_succeeded(response)
|
|
and _POST_CONTENT_INVALID_RE.search(str(getattr(response, "msg", "") or ""))
|
|
):
|
|
logger.warning("[Feishu] Post payload rejected by API response; falling back to plain text")
|
|
response = await self._feishu_send_with_retry(
|
|
chat_id=chat_id,
|
|
msg_type="text",
|
|
payload=json.dumps({"text": _strip_markdown_to_plain_text(chunk)}, ensure_ascii=False),
|
|
reply_to=reply_to,
|
|
metadata=metadata,
|
|
)
|
|
last_response = response
|
|
|
|
return self._finalize_send_result(last_response, "send failed")
|
|
except Exception as exc:
|
|
logger.error("[Feishu] Send error: %s", exc, exc_info=True)
|
|
return SendResult(success=False, error=str(exc))
|
|
|
|
async def edit_message(
|
|
self,
|
|
chat_id: str,
|
|
message_id: str,
|
|
content: str,
|
|
) -> SendResult:
|
|
"""Edit a previously sent Feishu text/post message."""
|
|
if not self._client:
|
|
return SendResult(success=False, error="Not connected")
|
|
|
|
try:
|
|
msg_type, payload = self._build_outbound_payload(content)
|
|
body = self._build_update_message_body(msg_type=msg_type, content=payload)
|
|
request = self._build_update_message_request(message_id=message_id, request_body=body)
|
|
response = await asyncio.to_thread(self._client.im.v1.message.update, request)
|
|
result = self._finalize_send_result(response, "update failed")
|
|
if not result.success and msg_type == "post" and _POST_CONTENT_INVALID_RE.search(result.error or ""):
|
|
logger.warning("[Feishu] Invalid post update payload rejected by API; falling back to plain text")
|
|
fallback_body = self._build_update_message_body(
|
|
msg_type="text",
|
|
content=json.dumps({"text": _strip_markdown_to_plain_text(content)}, ensure_ascii=False),
|
|
)
|
|
fallback_request = self._build_update_message_request(message_id=message_id, request_body=fallback_body)
|
|
fallback_response = await asyncio.to_thread(self._client.im.v1.message.update, fallback_request)
|
|
result = self._finalize_send_result(fallback_response, "update failed")
|
|
if result.success:
|
|
result.message_id = message_id
|
|
return result
|
|
except Exception as exc:
|
|
logger.error("[Feishu] Failed to edit message %s: %s", message_id, exc, exc_info=True)
|
|
return SendResult(success=False, error=str(exc))
|
|
|
|
async def send_voice(
|
|
self,
|
|
chat_id: str,
|
|
audio_path: str,
|
|
caption: Optional[str] = None,
|
|
reply_to: Optional[str] = None,
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
**kwargs,
|
|
) -> SendResult:
|
|
"""Send audio to Feishu as a file attachment plus optional caption."""
|
|
return await self._send_uploaded_file_message(
|
|
chat_id=chat_id,
|
|
file_path=audio_path,
|
|
reply_to=reply_to,
|
|
metadata=metadata,
|
|
caption=caption,
|
|
outbound_message_type="audio",
|
|
)
|
|
|
|
async def send_document(
|
|
self,
|
|
chat_id: str,
|
|
file_path: str,
|
|
caption: Optional[str] = None,
|
|
file_name: Optional[str] = None,
|
|
reply_to: Optional[str] = None,
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
**kwargs,
|
|
) -> SendResult:
|
|
"""Send a document/file attachment to Feishu."""
|
|
return await self._send_uploaded_file_message(
|
|
chat_id=chat_id,
|
|
file_path=file_path,
|
|
reply_to=reply_to,
|
|
metadata=metadata,
|
|
caption=caption,
|
|
file_name=file_name,
|
|
)
|
|
|
|
async def send_video(
|
|
self,
|
|
chat_id: str,
|
|
video_path: str,
|
|
caption: Optional[str] = None,
|
|
reply_to: Optional[str] = None,
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
**kwargs,
|
|
) -> SendResult:
|
|
"""Send a video file to Feishu."""
|
|
return await self._send_uploaded_file_message(
|
|
chat_id=chat_id,
|
|
file_path=video_path,
|
|
reply_to=reply_to,
|
|
metadata=metadata,
|
|
caption=caption,
|
|
outbound_message_type="media",
|
|
)
|
|
|
|
async def send_image_file(
|
|
self,
|
|
chat_id: str,
|
|
image_path: str,
|
|
caption: Optional[str] = None,
|
|
reply_to: Optional[str] = None,
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
**kwargs,
|
|
) -> SendResult:
|
|
"""Send a local image file to Feishu."""
|
|
if not self._client:
|
|
return SendResult(success=False, error="Not connected")
|
|
if not os.path.exists(image_path):
|
|
return SendResult(success=False, error=f"Image file not found: {image_path}")
|
|
|
|
try:
|
|
with open(image_path, "rb") as image_file:
|
|
body = self._build_image_upload_body(
|
|
image_type=_FEISHU_IMAGE_UPLOAD_TYPE,
|
|
image=image_file,
|
|
)
|
|
request = self._build_image_upload_request(body)
|
|
upload_response = await asyncio.to_thread(self._client.im.v1.image.create, request)
|
|
image_key = self._extract_response_field(upload_response, "image_key")
|
|
if not image_key:
|
|
return self._response_error_result(
|
|
upload_response,
|
|
default_message="image upload failed",
|
|
override_error="Feishu image upload missing image_key",
|
|
)
|
|
|
|
if caption:
|
|
post_payload = self._build_media_post_payload(
|
|
caption=caption,
|
|
media_tag={"tag": "img", "image_key": image_key},
|
|
)
|
|
message_response = await self._feishu_send_with_retry(
|
|
chat_id=chat_id,
|
|
msg_type="post",
|
|
payload=post_payload,
|
|
reply_to=reply_to,
|
|
metadata=metadata,
|
|
)
|
|
else:
|
|
message_response = await self._feishu_send_with_retry(
|
|
chat_id=chat_id,
|
|
msg_type="image",
|
|
payload=json.dumps({"image_key": image_key}, ensure_ascii=False),
|
|
reply_to=reply_to,
|
|
metadata=metadata,
|
|
)
|
|
return self._finalize_send_result(message_response, "image send failed")
|
|
except Exception as exc:
|
|
logger.error("[Feishu] Failed to send image %s: %s", image_path, exc, exc_info=True)
|
|
return SendResult(success=False, error=str(exc))
|
|
|
|
async def send_typing(self, chat_id: str, metadata=None) -> None:
|
|
"""Feishu bot API does not expose a typing indicator."""
|
|
return None
|
|
|
|
async def send_image(
|
|
self,
|
|
chat_id: str,
|
|
image_url: str,
|
|
caption: Optional[str] = None,
|
|
reply_to: Optional[str] = None,
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
) -> SendResult:
|
|
"""Download a remote image then send it through the native Feishu image flow."""
|
|
try:
|
|
image_path = await self._download_remote_image(image_url)
|
|
except Exception as exc:
|
|
logger.error("[Feishu] Failed to download image %s: %s", image_url, exc, exc_info=True)
|
|
return await super().send_image(
|
|
chat_id=chat_id,
|
|
image_url=image_url,
|
|
caption=caption,
|
|
reply_to=reply_to,
|
|
metadata=metadata,
|
|
)
|
|
return await self.send_image_file(
|
|
chat_id=chat_id,
|
|
image_path=image_path,
|
|
caption=caption,
|
|
reply_to=reply_to,
|
|
metadata=metadata,
|
|
)
|
|
|
|
async def send_animation(
|
|
self,
|
|
chat_id: str,
|
|
animation_url: str,
|
|
caption: Optional[str] = None,
|
|
reply_to: Optional[str] = None,
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
) -> SendResult:
|
|
"""Feishu has no native GIF bubble; degrade to a downloadable file."""
|
|
try:
|
|
file_path, file_name = await self._download_remote_document(
|
|
animation_url,
|
|
default_ext=".gif",
|
|
preferred_name="animation.gif",
|
|
)
|
|
except Exception as exc:
|
|
logger.error("[Feishu] Failed to download animation %s: %s", animation_url, exc, exc_info=True)
|
|
return await super().send_animation(
|
|
chat_id=chat_id,
|
|
animation_url=animation_url,
|
|
caption=caption,
|
|
reply_to=reply_to,
|
|
metadata=metadata,
|
|
)
|
|
degraded_caption = f"[GIF downgraded to file]\n{caption}" if caption else "[GIF downgraded to file]"
|
|
return await self.send_document(
|
|
chat_id=chat_id,
|
|
file_path=file_path,
|
|
file_name=file_name,
|
|
caption=degraded_caption,
|
|
reply_to=reply_to,
|
|
metadata=metadata,
|
|
)
|
|
|
|
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
|
|
"""Return real chat metadata from Feishu when available."""
|
|
fallback = {
|
|
"chat_id": chat_id,
|
|
"name": chat_id,
|
|
"type": "dm",
|
|
}
|
|
if not self._client:
|
|
return fallback
|
|
|
|
cached = self._chat_info_cache.get(chat_id)
|
|
if cached is not None:
|
|
return dict(cached)
|
|
|
|
try:
|
|
request = self._build_get_chat_request(chat_id)
|
|
response = await asyncio.to_thread(self._client.im.v1.chat.get, request)
|
|
if not response or getattr(response, "success", lambda: False)() is False:
|
|
code = getattr(response, "code", "unknown")
|
|
msg = getattr(response, "msg", "chat lookup failed")
|
|
logger.warning("[Feishu] Failed to get chat info for %s: [%s] %s", chat_id, code, msg)
|
|
return fallback
|
|
|
|
data = getattr(response, "data", None)
|
|
raw_chat_type = str(getattr(data, "chat_type", "") or "").strip().lower()
|
|
info = {
|
|
"chat_id": chat_id,
|
|
"name": str(getattr(data, "name", None) or chat_id),
|
|
"type": self._map_chat_type(raw_chat_type),
|
|
"raw_type": raw_chat_type or None,
|
|
}
|
|
self._chat_info_cache[chat_id] = info
|
|
return dict(info)
|
|
except Exception:
|
|
logger.warning("[Feishu] Failed to get chat info for %s", chat_id, exc_info=True)
|
|
return fallback
|
|
|
|
def format_message(self, content: str) -> str:
|
|
"""Feishu text messages are plain text by default."""
|
|
return content.strip()
|
|
|
|
# =========================================================================
|
|
# Inbound event handlers
|
|
# =========================================================================
|
|
|
|
def _on_message_event(self, data: Any) -> None:
|
|
"""Normalize Feishu inbound events into MessageEvent."""
|
|
if self._loop is None:
|
|
logger.warning("[Feishu] Dropping inbound message before adapter loop is ready")
|
|
return
|
|
future = asyncio.run_coroutine_threadsafe(
|
|
self._handle_message_event_data(data),
|
|
self._loop,
|
|
)
|
|
future.add_done_callback(self._log_background_failure)
|
|
|
|
async def _handle_message_event_data(self, data: Any) -> None:
|
|
"""Shared inbound message handling for websocket and webhook transports."""
|
|
event = getattr(data, "event", None)
|
|
message = getattr(event, "message", None)
|
|
sender = getattr(event, "sender", None)
|
|
sender_id = getattr(sender, "sender_id", None)
|
|
if not message or not sender_id:
|
|
logger.debug("[Feishu] Dropping malformed inbound event: missing message or sender_id")
|
|
return
|
|
|
|
message_id = getattr(message, "message_id", None)
|
|
if not message_id or self._is_duplicate(message_id):
|
|
logger.debug("[Feishu] Dropping duplicate/missing message_id: %s", message_id)
|
|
return
|
|
if getattr(sender, "sender_type", "") == "bot":
|
|
logger.debug("[Feishu] Dropping bot-originated event: %s", message_id)
|
|
return
|
|
|
|
chat_type = getattr(message, "chat_type", "p2p")
|
|
if chat_type != "p2p" and not self._should_accept_group_message(message, sender_id):
|
|
logger.debug("[Feishu] Dropping group message that failed mention/policy gate: %s", message_id)
|
|
return
|
|
await self._process_inbound_message(
|
|
data=data,
|
|
message=message,
|
|
sender_id=sender_id,
|
|
chat_type=chat_type,
|
|
message_id=message_id,
|
|
)
|
|
|
|
def _on_message_read_event(self, data: P2ImMessageMessageReadV1) -> None:
|
|
"""Ignore read-receipt events that Hermes does not act on."""
|
|
event = getattr(data, "event", None)
|
|
message = getattr(event, "message", None)
|
|
message_id = getattr(message, "message_id", None) or ""
|
|
logger.debug("[Feishu] Ignoring message_read event: %s", message_id)
|
|
|
|
def _on_bot_added_to_chat(self, data: Any) -> None:
|
|
"""Handle bot being added to a group chat."""
|
|
event = getattr(data, "event", None)
|
|
chat_id = str(getattr(event, "chat_id", "") or "")
|
|
logger.info("[Feishu] Bot added to chat: %s", chat_id)
|
|
self._chat_info_cache.pop(chat_id, None)
|
|
|
|
def _on_bot_removed_from_chat(self, data: Any) -> None:
|
|
"""Handle bot being removed from a group chat."""
|
|
event = getattr(data, "event", None)
|
|
chat_id = str(getattr(event, "chat_id", "") or "")
|
|
logger.info("[Feishu] Bot removed from chat: %s", chat_id)
|
|
self._chat_info_cache.pop(chat_id, None)
|
|
|
|
def _on_reaction_event(self, event_type: str, data: Any) -> None:
|
|
"""Route user reactions on bot messages as synthetic text events."""
|
|
event = getattr(data, "event", None)
|
|
message_id = str(getattr(event, "message_id", "") or "")
|
|
operator_type = str(getattr(event, "operator_type", "") or "")
|
|
reaction_type_obj = getattr(event, "reaction_type", None)
|
|
emoji_type = str(getattr(reaction_type_obj, "emoji_type", "") or "")
|
|
action = "added" if "created" in event_type else "removed"
|
|
logger.debug(
|
|
"[Feishu] Reaction %s on message %s (operator_type=%s, emoji=%s)",
|
|
action,
|
|
message_id,
|
|
operator_type,
|
|
emoji_type,
|
|
)
|
|
# Only process reactions from real users. Ignore app/bot-generated reactions
|
|
# and Hermes' own ACK emoji to avoid feedback loops.
|
|
if (
|
|
operator_type in {"bot", "app"}
|
|
or emoji_type == _FEISHU_ACK_EMOJI
|
|
or not message_id
|
|
or self._loop is None
|
|
):
|
|
return
|
|
future = asyncio.run_coroutine_threadsafe(
|
|
self._handle_reaction_event(event_type, data),
|
|
self._loop,
|
|
)
|
|
future.add_done_callback(self._log_background_failure)
|
|
|
|
def _on_card_action_trigger(self, data: Any) -> Any:
|
|
"""Schedule Feishu card actions on the adapter loop and acknowledge immediately."""
|
|
if self._loop is None:
|
|
logger.warning("[Feishu] Dropping card action before adapter loop is ready")
|
|
else:
|
|
future = asyncio.run_coroutine_threadsafe(
|
|
self._handle_card_action_event(data),
|
|
self._loop,
|
|
)
|
|
future.add_done_callback(self._log_background_failure)
|
|
if P2CardActionTriggerResponse is None:
|
|
return None
|
|
return P2CardActionTriggerResponse()
|
|
|
|
async def _handle_reaction_event(self, event_type: str, data: Any) -> None:
|
|
"""Fetch the reacted-to message; if it was sent by this bot, emit a synthetic text event."""
|
|
if not self._client:
|
|
return
|
|
event = getattr(data, "event", None)
|
|
message_id = str(getattr(event, "message_id", "") or "")
|
|
if not message_id:
|
|
return
|
|
|
|
# Fetch the target message to verify it was sent by us and to obtain chat context.
|
|
try:
|
|
request = self._build_get_message_request(message_id)
|
|
response = await asyncio.to_thread(self._client.im.v1.message.get, request)
|
|
if not response or not getattr(response, "success", lambda: False)():
|
|
return
|
|
items = getattr(getattr(response, "data", None), "items", None) or []
|
|
msg = items[0] if items else None
|
|
if not msg:
|
|
return
|
|
sender = getattr(msg, "sender", None)
|
|
sender_type = str(getattr(sender, "sender_type", "") or "").lower()
|
|
if sender_type != "app":
|
|
return # only route reactions on our own bot messages
|
|
chat_id = str(getattr(msg, "chat_id", "") or "")
|
|
chat_type_raw = str(getattr(msg, "chat_type", "p2p") or "p2p")
|
|
if not chat_id:
|
|
return
|
|
except Exception:
|
|
logger.debug("[Feishu] Failed to fetch message for reaction routing", exc_info=True)
|
|
return
|
|
|
|
user_id_obj = getattr(event, "user_id", None)
|
|
reaction_type_obj = getattr(event, "reaction_type", None)
|
|
emoji_type = str(getattr(reaction_type_obj, "emoji_type", "") or "UNKNOWN")
|
|
action = "added" if "created" in event_type else "removed"
|
|
synthetic_text = f"reaction:{action}:{emoji_type}"
|
|
|
|
sender_profile = await self._resolve_sender_profile(user_id_obj)
|
|
chat_info = await self.get_chat_info(chat_id)
|
|
source = self.build_source(
|
|
chat_id=chat_id,
|
|
chat_name=chat_info.get("name") or chat_id or "Feishu Chat",
|
|
chat_type=self._resolve_source_chat_type(chat_info=chat_info, event_chat_type=chat_type_raw),
|
|
user_id=sender_profile["user_id"],
|
|
user_name=sender_profile["user_name"],
|
|
thread_id=None,
|
|
user_id_alt=sender_profile["user_id_alt"],
|
|
)
|
|
synthetic_event = MessageEvent(
|
|
text=synthetic_text,
|
|
message_type=MessageType.TEXT,
|
|
source=source,
|
|
raw_message=data,
|
|
message_id=message_id,
|
|
timestamp=datetime.now(),
|
|
)
|
|
logger.info("[Feishu] Routing reaction %s:%s on bot message %s as synthetic event", action, emoji_type, message_id)
|
|
await self._handle_message_with_guards(synthetic_event)
|
|
|
|
def _is_card_action_duplicate(self, token: str) -> bool:
|
|
"""Return True if this card action token was already processed within the dedup window."""
|
|
now = time.time()
|
|
# Prune expired tokens lazily each call.
|
|
expired = [t for t, ts in self._card_action_tokens.items() if now - ts > _FEISHU_CARD_ACTION_DEDUP_TTL_SECONDS]
|
|
for t in expired:
|
|
del self._card_action_tokens[t]
|
|
if token in self._card_action_tokens:
|
|
return True
|
|
self._card_action_tokens[token] = now
|
|
return False
|
|
|
|
async def _handle_card_action_event(self, data: Any) -> None:
|
|
"""Route Feishu interactive card button clicks as synthetic COMMAND events."""
|
|
event = getattr(data, "event", None)
|
|
token = str(getattr(event, "token", "") or "")
|
|
if token and self._is_card_action_duplicate(token):
|
|
logger.debug("[Feishu] Dropping duplicate card action token: %s", token)
|
|
return
|
|
|
|
context = getattr(event, "context", None)
|
|
chat_id = str(getattr(context, "open_chat_id", "") or "")
|
|
operator = getattr(event, "operator", None)
|
|
open_id = str(getattr(operator, "open_id", "") or "")
|
|
if not chat_id or not open_id:
|
|
logger.debug("[Feishu] Card action missing chat_id or operator open_id, dropping")
|
|
return
|
|
|
|
action = getattr(event, "action", None)
|
|
action_tag = str(getattr(action, "tag", "") or "button")
|
|
action_value = getattr(action, "value", {}) or {}
|
|
synthetic_text = f"/card {action_tag}"
|
|
if action_value:
|
|
try:
|
|
synthetic_text += f" {json.dumps(action_value, ensure_ascii=False)}"
|
|
except Exception:
|
|
pass
|
|
|
|
sender_id = SimpleNamespace(open_id=open_id, user_id=None, union_id=None)
|
|
sender_profile = await self._resolve_sender_profile(sender_id)
|
|
chat_info = await self.get_chat_info(chat_id)
|
|
source = self.build_source(
|
|
chat_id=chat_id,
|
|
chat_name=chat_info.get("name") or chat_id or "Feishu Chat",
|
|
chat_type=self._resolve_source_chat_type(chat_info=chat_info, event_chat_type="group"),
|
|
user_id=sender_profile["user_id"],
|
|
user_name=sender_profile["user_name"],
|
|
thread_id=None,
|
|
user_id_alt=sender_profile["user_id_alt"],
|
|
)
|
|
synthetic_event = MessageEvent(
|
|
text=synthetic_text,
|
|
message_type=MessageType.COMMAND,
|
|
source=source,
|
|
raw_message=data,
|
|
message_id=token or str(uuid.uuid4()),
|
|
timestamp=datetime.now(),
|
|
)
|
|
logger.info("[Feishu] Routing card action %r from %s in %s as synthetic command", action_tag, open_id, chat_id)
|
|
await self._handle_message_with_guards(synthetic_event)
|
|
|
|
# =========================================================================
|
|
# Per-chat serialization and typing indicator
|
|
# =========================================================================
|
|
|
|
def _get_chat_lock(self, chat_id: str) -> asyncio.Lock:
|
|
"""Return (creating if needed) the per-chat asyncio.Lock for serial message processing."""
|
|
lock = self._chat_locks.get(chat_id)
|
|
if lock is None:
|
|
lock = asyncio.Lock()
|
|
self._chat_locks[chat_id] = lock
|
|
return lock
|
|
|
|
async def _handle_message_with_guards(self, event: MessageEvent) -> None:
|
|
"""Dispatch a single event through the agent pipeline with per-chat serialization
|
|
and a persistent ACK emoji reaction before processing starts.
|
|
|
|
- Per-chat lock: ensures messages in the same chat are processed one at a time
|
|
(matches openclaw's createChatQueue serial queue behaviour).
|
|
- ACK indicator: adds a CHECK reaction to the triggering message before handing
|
|
off to the agent and leaves it in place as a receipt marker.
|
|
"""
|
|
chat_id = getattr(event.source, "chat_id", "") or "" if event.source else ""
|
|
chat_lock = self._get_chat_lock(chat_id)
|
|
async with chat_lock:
|
|
message_id = event.message_id
|
|
if message_id:
|
|
await self._add_ack_reaction(message_id)
|
|
await self.handle_message(event)
|
|
|
|
async def _add_ack_reaction(self, message_id: str) -> Optional[str]:
|
|
"""Add a persistent ACK emoji reaction to signal the message was received."""
|
|
if not self._client or not message_id:
|
|
return None
|
|
try:
|
|
from lark_oapi.api.im.v1 import ( # lazy import — keeps optional dep optional
|
|
CreateMessageReactionRequest,
|
|
CreateMessageReactionRequestBody,
|
|
)
|
|
body = (
|
|
CreateMessageReactionRequestBody.builder()
|
|
.reaction_type({"emoji_type": _FEISHU_ACK_EMOJI})
|
|
.build()
|
|
)
|
|
request = (
|
|
CreateMessageReactionRequest.builder()
|
|
.message_id(message_id)
|
|
.request_body(body)
|
|
.build()
|
|
)
|
|
response = await asyncio.to_thread(self._client.im.v1.message_reaction.create, request)
|
|
if response and getattr(response, "success", lambda: False)():
|
|
data = getattr(response, "data", None)
|
|
return getattr(data, "reaction_id", None)
|
|
logger.warning(
|
|
"[Feishu] Failed to add ack reaction to %s: code=%s msg=%s",
|
|
message_id,
|
|
getattr(response, "code", None),
|
|
getattr(response, "msg", None),
|
|
)
|
|
except Exception:
|
|
logger.warning("[Feishu] Failed to add ack reaction to %s", message_id, exc_info=True)
|
|
return None
|
|
|
|
# =========================================================================
|
|
# Webhook server and security
|
|
# =========================================================================
|
|
|
|
def _record_webhook_anomaly(self, remote_ip: str, status: str) -> None:
|
|
"""Increment the anomaly counter for remote_ip and emit a WARNING every threshold hits.
|
|
|
|
Mirrors openclaw's createWebhookAnomalyTracker: TTL 6 hours, log every 25 consecutive
|
|
error responses from the same IP.
|
|
"""
|
|
now = time.time()
|
|
entry = self._webhook_anomaly_counts.get(remote_ip)
|
|
if entry is not None:
|
|
count, _last_status, first_seen = entry
|
|
if now - first_seen < _FEISHU_WEBHOOK_ANOMALY_TTL_SECONDS:
|
|
count += 1
|
|
if count % _FEISHU_WEBHOOK_ANOMALY_THRESHOLD == 0:
|
|
logger.warning(
|
|
"[Feishu] Webhook anomaly: %d consecutive error responses (%s) from %s "
|
|
"over the last %.0fs",
|
|
count,
|
|
status,
|
|
remote_ip,
|
|
now - first_seen,
|
|
)
|
|
self._webhook_anomaly_counts[remote_ip] = (count, status, first_seen)
|
|
return
|
|
# Either first occurrence or TTL expired — start fresh.
|
|
self._webhook_anomaly_counts[remote_ip] = (1, status, now)
|
|
|
|
def _clear_webhook_anomaly(self, remote_ip: str) -> None:
|
|
"""Reset the anomaly counter for remote_ip after a successful request."""
|
|
self._webhook_anomaly_counts.pop(remote_ip, None)
|
|
|
|
# =========================================================================
|
|
# Inbound processing pipeline
|
|
# =========================================================================
|
|
|
|
async def _process_inbound_message(
|
|
self,
|
|
*,
|
|
data: Any,
|
|
message: Any,
|
|
sender_id: Any,
|
|
chat_type: str,
|
|
message_id: str,
|
|
) -> None:
|
|
text, inbound_type, media_urls, media_types = await self._extract_message_content(message)
|
|
if inbound_type == MessageType.TEXT and not text and not media_urls:
|
|
logger.debug("[Feishu] Ignoring unsupported or empty message type: %s", getattr(message, "message_type", ""))
|
|
return
|
|
|
|
if inbound_type == MessageType.TEXT and text.startswith("/"):
|
|
inbound_type = MessageType.COMMAND
|
|
|
|
reply_to_message_id = (
|
|
getattr(message, "parent_id", None)
|
|
or getattr(message, "upper_message_id", None)
|
|
or None
|
|
)
|
|
reply_to_text = await self._fetch_message_text(reply_to_message_id) if reply_to_message_id else None
|
|
|
|
logger.info(
|
|
"[Feishu] Inbound %s message received: id=%s type=%s chat_id=%s text=%r media=%d",
|
|
"dm" if chat_type == "p2p" else "group",
|
|
message_id,
|
|
inbound_type.value,
|
|
getattr(message, "chat_id", "") or "",
|
|
text[:120],
|
|
len(media_urls),
|
|
)
|
|
|
|
chat_id = getattr(message, "chat_id", "") or ""
|
|
chat_info = await self.get_chat_info(chat_id)
|
|
sender_profile = await self._resolve_sender_profile(sender_id)
|
|
source = self.build_source(
|
|
chat_id=chat_id,
|
|
chat_name=chat_info.get("name") or chat_id or "Feishu Chat",
|
|
chat_type=self._resolve_source_chat_type(chat_info=chat_info, event_chat_type=chat_type),
|
|
user_id=sender_profile["user_id"],
|
|
user_name=sender_profile["user_name"],
|
|
thread_id=getattr(message, "thread_id", None) or None,
|
|
user_id_alt=sender_profile["user_id_alt"],
|
|
)
|
|
normalized = MessageEvent(
|
|
text=text,
|
|
message_type=inbound_type,
|
|
source=source,
|
|
raw_message=data,
|
|
message_id=message_id,
|
|
media_urls=media_urls,
|
|
media_types=media_types,
|
|
reply_to_message_id=reply_to_message_id,
|
|
reply_to_text=reply_to_text,
|
|
timestamp=datetime.now(),
|
|
)
|
|
await self._dispatch_inbound_event(normalized)
|
|
|
|
async def _dispatch_inbound_event(self, event: MessageEvent) -> None:
|
|
"""Apply Feishu-specific burst protection before entering the base adapter."""
|
|
if event.message_type == MessageType.TEXT and not event.is_command():
|
|
await self._enqueue_text_event(event)
|
|
return
|
|
if self._should_batch_media_event(event):
|
|
await self._enqueue_media_event(event)
|
|
return
|
|
await self._handle_message_with_guards(event)
|
|
|
|
# =========================================================================
|
|
# Media batching
|
|
# =========================================================================
|
|
|
|
def _should_batch_media_event(self, event: MessageEvent) -> bool:
|
|
return bool(
|
|
event.media_urls
|
|
and event.message_type in {MessageType.PHOTO, MessageType.VIDEO, MessageType.DOCUMENT, MessageType.AUDIO}
|
|
)
|
|
|
|
def _media_batch_key(self, event: MessageEvent) -> str:
|
|
from gateway.session import build_session_key
|
|
|
|
session_key = build_session_key(
|
|
event.source,
|
|
group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True),
|
|
)
|
|
return f"{session_key}:media:{event.message_type.value}"
|
|
|
|
@staticmethod
|
|
def _media_batch_is_compatible(existing: MessageEvent, incoming: MessageEvent) -> bool:
|
|
return (
|
|
existing.message_type == incoming.message_type
|
|
and existing.reply_to_message_id == incoming.reply_to_message_id
|
|
and existing.reply_to_text == incoming.reply_to_text
|
|
and existing.source.thread_id == incoming.source.thread_id
|
|
)
|
|
|
|
async def _enqueue_media_event(self, event: MessageEvent) -> None:
|
|
key = self._media_batch_key(event)
|
|
existing = self._pending_media_batches.get(key)
|
|
if existing is None:
|
|
self._pending_media_batches[key] = event
|
|
self._schedule_media_batch_flush(key)
|
|
return
|
|
if not self._media_batch_is_compatible(existing, event):
|
|
await self._flush_media_batch_now(key)
|
|
self._pending_media_batches[key] = event
|
|
self._schedule_media_batch_flush(key)
|
|
return
|
|
existing.media_urls.extend(event.media_urls)
|
|
existing.media_types.extend(event.media_types)
|
|
if event.text:
|
|
if not existing.text:
|
|
existing.text = event.text
|
|
elif event.text not in existing.text.split("\n\n"):
|
|
existing.text = f"{existing.text}\n\n{event.text}"
|
|
existing.timestamp = event.timestamp
|
|
if event.message_id:
|
|
existing.message_id = event.message_id
|
|
self._schedule_media_batch_flush(key)
|
|
|
|
def _schedule_media_batch_flush(self, key: str) -> None:
|
|
self._reschedule_batch_task(
|
|
self._pending_media_batch_tasks,
|
|
key,
|
|
self._flush_media_batch,
|
|
)
|
|
|
|
async def _flush_media_batch(self, key: str) -> None:
|
|
current_task = asyncio.current_task()
|
|
try:
|
|
await asyncio.sleep(self._media_batch_delay_seconds)
|
|
await self._flush_media_batch_now(key)
|
|
finally:
|
|
if self._pending_media_batch_tasks.get(key) is current_task:
|
|
self._pending_media_batch_tasks.pop(key, None)
|
|
|
|
async def _flush_media_batch_now(self, key: str) -> None:
|
|
event = self._pending_media_batches.pop(key, None)
|
|
if not event:
|
|
return
|
|
logger.info(
|
|
"[Feishu] Flushing media batch %s with %d attachment(s)",
|
|
key,
|
|
len(event.media_urls),
|
|
)
|
|
await self._handle_message_with_guards(event)
|
|
|
|
async def _download_remote_image(self, image_url: str) -> str:
|
|
ext = self._guess_remote_extension(image_url, default=".jpg")
|
|
return await cache_image_from_url(image_url, ext=ext)
|
|
|
|
async def _download_remote_document(
|
|
self,
|
|
file_url: str,
|
|
*,
|
|
default_ext: str,
|
|
preferred_name: str,
|
|
) -> tuple[str, str]:
|
|
import httpx
|
|
|
|
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
|
|
response = await client.get(
|
|
file_url,
|
|
headers={
|
|
"User-Agent": "Mozilla/5.0 (compatible; HermesAgent/1.0)",
|
|
"Accept": "*/*",
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
filename = self._derive_remote_filename(
|
|
file_url,
|
|
content_type=str(response.headers.get("Content-Type", "")),
|
|
default_name=preferred_name,
|
|
default_ext=default_ext,
|
|
)
|
|
cached_path = cache_document_from_bytes(response.content, filename)
|
|
return cached_path, filename
|
|
|
|
@staticmethod
|
|
def _guess_remote_extension(url: str, *, default: str) -> str:
|
|
ext = Path((url or "").split("?", 1)[0]).suffix.lower()
|
|
return ext if ext in (_IMAGE_EXTENSIONS | _AUDIO_EXTENSIONS | _VIDEO_EXTENSIONS | set(SUPPORTED_DOCUMENT_TYPES)) else default
|
|
|
|
@staticmethod
|
|
def _derive_remote_filename(file_url: str, *, content_type: str, default_name: str, default_ext: str) -> str:
|
|
candidate = Path((file_url or "").split("?", 1)[0]).name or default_name
|
|
ext = Path(candidate).suffix.lower()
|
|
if not ext:
|
|
guessed = mimetypes.guess_extension((content_type or "").split(";", 1)[0].strip().lower() or "") or default_ext
|
|
candidate = f"{candidate}{guessed}"
|
|
return candidate
|
|
|
|
@staticmethod
|
|
def _namespace_from_mapping(value: Any) -> Any:
|
|
if isinstance(value, dict):
|
|
return SimpleNamespace(**{key: FeishuAdapter._namespace_from_mapping(item) for key, item in value.items()})
|
|
if isinstance(value, list):
|
|
return [FeishuAdapter._namespace_from_mapping(item) for item in value]
|
|
return value
|
|
|
|
async def _handle_webhook_request(self, request: Any) -> Any:
|
|
remote_ip = (getattr(request, "remote", None) or "unknown")
|
|
|
|
# Rate limiting — composite key: app_id:path:remote_ip (matches openclaw key structure).
|
|
rate_key = f"{self._app_id}:{self._webhook_path}:{remote_ip}"
|
|
if not self._check_webhook_rate_limit(rate_key):
|
|
logger.warning("[Feishu] Webhook rate limit exceeded for %s", remote_ip)
|
|
self._record_webhook_anomaly(remote_ip, "429")
|
|
return web.Response(status=429, text="Too Many Requests")
|
|
|
|
# Content-Type guard — Feishu always sends application/json.
|
|
headers = getattr(request, "headers", {}) or {}
|
|
content_type = str(headers.get("Content-Type", "") or "").split(";")[0].strip().lower()
|
|
if content_type and content_type != "application/json":
|
|
logger.warning("[Feishu] Webhook rejected: unexpected Content-Type %r from %s", content_type, remote_ip)
|
|
self._record_webhook_anomaly(remote_ip, "415")
|
|
return web.Response(status=415, text="Unsupported Media Type")
|
|
|
|
# Body size guard — reject early via Content-Length when present.
|
|
content_length = getattr(request, "content_length", None)
|
|
if content_length is not None and content_length > _FEISHU_WEBHOOK_MAX_BODY_BYTES:
|
|
logger.warning("[Feishu] Webhook body too large (%d bytes) from %s", content_length, remote_ip)
|
|
self._record_webhook_anomaly(remote_ip, "413")
|
|
return web.Response(status=413, text="Request body too large")
|
|
|
|
try:
|
|
body_bytes: bytes = await asyncio.wait_for(
|
|
request.read(),
|
|
timeout=_FEISHU_WEBHOOK_BODY_TIMEOUT_SECONDS,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
logger.warning("[Feishu] Webhook body read timed out after %ds from %s", _FEISHU_WEBHOOK_BODY_TIMEOUT_SECONDS, remote_ip)
|
|
self._record_webhook_anomaly(remote_ip, "408")
|
|
return web.Response(status=408, text="Request Timeout")
|
|
except Exception:
|
|
self._record_webhook_anomaly(remote_ip, "400")
|
|
return web.json_response({"code": 400, "msg": "failed to read body"}, status=400)
|
|
|
|
if len(body_bytes) > _FEISHU_WEBHOOK_MAX_BODY_BYTES:
|
|
logger.warning("[Feishu] Webhook body exceeds limit (%d bytes) from %s", len(body_bytes), remote_ip)
|
|
self._record_webhook_anomaly(remote_ip, "413")
|
|
return web.Response(status=413, text="Request body too large")
|
|
|
|
try:
|
|
payload = json.loads(body_bytes.decode("utf-8"))
|
|
except (json.JSONDecodeError, UnicodeDecodeError):
|
|
self._record_webhook_anomaly(remote_ip, "400")
|
|
return web.json_response({"code": 400, "msg": "invalid json"}, status=400)
|
|
|
|
# URL verification challenge — respond before other checks so that Feishu's
|
|
# subscription setup works even before encrypt_key is wired.
|
|
if payload.get("type") == "url_verification":
|
|
return web.json_response({"challenge": payload.get("challenge", "")})
|
|
|
|
# Verification token check — second layer of defence beyond signature (matches openclaw).
|
|
if self._verification_token:
|
|
header = payload.get("header") or {}
|
|
incoming_token = str(header.get("token") or payload.get("token") or "")
|
|
if not incoming_token or not hmac.compare_digest(incoming_token, self._verification_token):
|
|
logger.warning("[Feishu] Webhook rejected: invalid verification token from %s", remote_ip)
|
|
self._record_webhook_anomaly(remote_ip, "401-token")
|
|
return web.Response(status=401, text="Invalid verification token")
|
|
|
|
# Timing-safe signature verification (only enforced when encrypt_key is set).
|
|
if self._encrypt_key and not self._is_webhook_signature_valid(request.headers, body_bytes):
|
|
logger.warning("[Feishu] Webhook rejected: invalid signature from %s", remote_ip)
|
|
self._record_webhook_anomaly(remote_ip, "401-sig")
|
|
return web.Response(status=401, text="Invalid signature")
|
|
|
|
if payload.get("encrypt"):
|
|
logger.error("[Feishu] Encrypted webhook payloads are not supported by Hermes webhook mode")
|
|
self._record_webhook_anomaly(remote_ip, "400-encrypted")
|
|
return web.json_response({"code": 400, "msg": "encrypted webhook payloads are not supported"}, status=400)
|
|
|
|
self._clear_webhook_anomaly(remote_ip)
|
|
|
|
event_type = str((payload.get("header") or {}).get("event_type") or "")
|
|
data = self._namespace_from_mapping(payload)
|
|
if event_type == "im.message.receive_v1":
|
|
await self._handle_message_event_data(data)
|
|
elif event_type == "im.message.message_read_v1":
|
|
self._on_message_read_event(data)
|
|
elif event_type == "im.chat.member.bot.added_v1":
|
|
self._on_bot_added_to_chat(data)
|
|
elif event_type == "im.chat.member.bot.deleted_v1":
|
|
self._on_bot_removed_from_chat(data)
|
|
elif event_type in ("im.message.reaction.created_v1", "im.message.reaction.deleted_v1"):
|
|
self._on_reaction_event(event_type, data)
|
|
elif event_type == "card.action.trigger":
|
|
asyncio.ensure_future(self._handle_card_action_event(data))
|
|
else:
|
|
logger.debug("[Feishu] Ignoring webhook event type: %s", event_type or "unknown")
|
|
return web.json_response({"code": 0, "msg": "ok"})
|
|
|
|
def _is_webhook_signature_valid(self, headers: Any, body_bytes: bytes) -> bool:
|
|
"""Verify Feishu webhook signature using timing-safe comparison.
|
|
|
|
Feishu signature algorithm:
|
|
SHA256(timestamp + nonce + encrypt_key + body_string)
|
|
Headers checked: x-lark-request-timestamp, x-lark-request-nonce, x-lark-signature.
|
|
"""
|
|
timestamp = str(headers.get("x-lark-request-timestamp", "") or "")
|
|
nonce = str(headers.get("x-lark-request-nonce", "") or "")
|
|
signature = str(headers.get("x-lark-signature", "") or "")
|
|
if not timestamp or not nonce or not signature:
|
|
return False
|
|
try:
|
|
body_str = body_bytes.decode("utf-8", errors="replace")
|
|
content = f"{timestamp}{nonce}{self._encrypt_key}{body_str}"
|
|
computed = hashlib.sha256(content.encode("utf-8")).hexdigest()
|
|
return hmac.compare_digest(computed, signature)
|
|
except Exception:
|
|
logger.debug("[Feishu] Signature verification raised an exception", exc_info=True)
|
|
return False
|
|
|
|
def _check_webhook_rate_limit(self, rate_key: str) -> bool:
|
|
"""Return False when the composite rate_key has exceeded _FEISHU_WEBHOOK_RATE_LIMIT_MAX.
|
|
|
|
The rate_key is composed as "{app_id}:{path}:{remote_ip}" — matching openclaw's key
|
|
structure so the limit is scoped to a specific (account, endpoint, IP) triple rather
|
|
than a bare IP, which causes fewer false-positive denials in multi-tenant setups.
|
|
|
|
The tracking dict is capped at _FEISHU_WEBHOOK_RATE_MAX_KEYS entries to prevent unbounded
|
|
memory growth. Stale (expired) entries are pruned when the cap is reached.
|
|
"""
|
|
now = time.time()
|
|
# Fast path: existing entry within the current window.
|
|
entry = self._webhook_rate_counts.get(rate_key)
|
|
if entry is not None:
|
|
count, window_start = entry
|
|
if now - window_start < _FEISHU_WEBHOOK_RATE_WINDOW_SECONDS:
|
|
if count >= _FEISHU_WEBHOOK_RATE_LIMIT_MAX:
|
|
return False
|
|
self._webhook_rate_counts[rate_key] = (count + 1, window_start)
|
|
return True
|
|
# New window for an existing key, or a brand-new key — prune stale entries first.
|
|
if len(self._webhook_rate_counts) >= _FEISHU_WEBHOOK_RATE_MAX_KEYS:
|
|
stale_keys = [
|
|
k for k, (_, ws) in self._webhook_rate_counts.items()
|
|
if now - ws >= _FEISHU_WEBHOOK_RATE_WINDOW_SECONDS
|
|
]
|
|
for k in stale_keys:
|
|
del self._webhook_rate_counts[k]
|
|
# If still at capacity after pruning, allow through without tracking.
|
|
if rate_key not in self._webhook_rate_counts and len(self._webhook_rate_counts) >= _FEISHU_WEBHOOK_RATE_MAX_KEYS:
|
|
return True
|
|
self._webhook_rate_counts[rate_key] = (1, now)
|
|
return True
|
|
|
|
# =========================================================================
|
|
# Text batching
|
|
# =========================================================================
|
|
|
|
def _text_batch_key(self, event: MessageEvent) -> str:
|
|
"""Return the session-scoped key used for Feishu text aggregation."""
|
|
from gateway.session import build_session_key
|
|
|
|
return build_session_key(
|
|
event.source,
|
|
group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True),
|
|
)
|
|
|
|
@staticmethod
|
|
def _text_batch_is_compatible(existing: MessageEvent, incoming: MessageEvent) -> bool:
|
|
"""Only merge text events when reply/thread context is identical."""
|
|
return (
|
|
existing.reply_to_message_id == incoming.reply_to_message_id
|
|
and existing.reply_to_text == incoming.reply_to_text
|
|
and existing.source.thread_id == incoming.source.thread_id
|
|
)
|
|
|
|
async def _enqueue_text_event(self, event: MessageEvent) -> None:
|
|
"""Debounce rapid Feishu text bursts into a single MessageEvent."""
|
|
key = self._text_batch_key(event)
|
|
existing = self._pending_text_batches.get(key)
|
|
if existing is None:
|
|
self._pending_text_batches[key] = event
|
|
self._pending_text_batch_counts[key] = 1
|
|
self._schedule_text_batch_flush(key)
|
|
return
|
|
|
|
if not self._text_batch_is_compatible(existing, event):
|
|
await self._flush_text_batch_now(key)
|
|
self._pending_text_batches[key] = event
|
|
self._pending_text_batch_counts[key] = 1
|
|
self._schedule_text_batch_flush(key)
|
|
return
|
|
|
|
existing_count = self._pending_text_batch_counts.get(key, 1)
|
|
next_count = existing_count + 1
|
|
appended_text = event.text or ""
|
|
next_text = f"{existing.text}\n{appended_text}" if existing.text and appended_text else (existing.text or appended_text)
|
|
if next_count > self._text_batch_max_messages or len(next_text) > self._text_batch_max_chars:
|
|
await self._flush_text_batch_now(key)
|
|
self._pending_text_batches[key] = event
|
|
self._pending_text_batch_counts[key] = 1
|
|
self._schedule_text_batch_flush(key)
|
|
return
|
|
|
|
existing.text = next_text
|
|
existing.timestamp = event.timestamp
|
|
if event.message_id:
|
|
existing.message_id = event.message_id
|
|
self._pending_text_batch_counts[key] = next_count
|
|
self._schedule_text_batch_flush(key)
|
|
|
|
def _schedule_text_batch_flush(self, key: str) -> None:
|
|
"""Reset the debounce timer for a pending Feishu text batch."""
|
|
self._reschedule_batch_task(
|
|
self._pending_text_batch_tasks,
|
|
key,
|
|
self._flush_text_batch,
|
|
)
|
|
|
|
@staticmethod
|
|
def _reschedule_batch_task(
|
|
task_map: Dict[str, asyncio.Task],
|
|
key: str,
|
|
flush_fn: Any,
|
|
) -> None:
|
|
prior_task = task_map.get(key)
|
|
if prior_task and not prior_task.done():
|
|
prior_task.cancel()
|
|
task_map[key] = asyncio.create_task(flush_fn(key))
|
|
|
|
async def _flush_text_batch(self, key: str) -> None:
|
|
"""Flush a pending text batch after the quiet period."""
|
|
current_task = asyncio.current_task()
|
|
try:
|
|
await asyncio.sleep(self._text_batch_delay_seconds)
|
|
await self._flush_text_batch_now(key)
|
|
finally:
|
|
if self._pending_text_batch_tasks.get(key) is current_task:
|
|
self._pending_text_batch_tasks.pop(key, None)
|
|
|
|
async def _flush_text_batch_now(self, key: str) -> None:
|
|
"""Dispatch the current text batch immediately."""
|
|
event = self._pending_text_batches.pop(key, None)
|
|
self._pending_text_batch_counts.pop(key, None)
|
|
if not event:
|
|
return
|
|
logger.info(
|
|
"[Feishu] Flushing text batch %s (%d chars)",
|
|
key,
|
|
len(event.text or ""),
|
|
)
|
|
await self._handle_message_with_guards(event)
|
|
|
|
# =========================================================================
|
|
# Message content extraction and resource download
|
|
# =========================================================================
|
|
|
|
async def _extract_message_content(self, message: Any) -> tuple[str, MessageType, List[str], List[str]]:
|
|
"""Extract text and cached media from a normalized Feishu message."""
|
|
raw_content = getattr(message, "content", "") or ""
|
|
raw_type = getattr(message, "message_type", "") or ""
|
|
message_id = str(getattr(message, "message_id", "") or "")
|
|
logger.info("[Feishu] Received raw message type=%s message_id=%s", raw_type, message_id)
|
|
|
|
normalized = normalize_feishu_message(message_type=raw_type, raw_content=raw_content)
|
|
media_urls, media_types = await self._download_feishu_message_resources(
|
|
message_id=message_id,
|
|
normalized=normalized,
|
|
)
|
|
inbound_type = self._resolve_normalized_message_type(normalized, media_types)
|
|
text = normalized.text_content
|
|
|
|
if (
|
|
inbound_type in {MessageType.DOCUMENT, MessageType.AUDIO, MessageType.VIDEO, MessageType.PHOTO}
|
|
and len(media_urls) == 1
|
|
and normalized.preferred_message_type in {"document", "audio"}
|
|
):
|
|
injected = await self._maybe_extract_text_document(media_urls[0], media_types[0])
|
|
if injected:
|
|
text = injected
|
|
|
|
return text, inbound_type, media_urls, media_types
|
|
|
|
async def _download_feishu_message_resources(
|
|
self,
|
|
*,
|
|
message_id: str,
|
|
normalized: FeishuNormalizedMessage,
|
|
) -> tuple[List[str], List[str]]:
|
|
media_urls: List[str] = []
|
|
media_types: List[str] = []
|
|
|
|
for image_key in normalized.image_keys:
|
|
cached_path, media_type = await self._download_feishu_image(
|
|
message_id=message_id,
|
|
image_key=image_key,
|
|
)
|
|
if cached_path:
|
|
media_urls.append(cached_path)
|
|
media_types.append(media_type)
|
|
|
|
for media_ref in normalized.media_refs:
|
|
cached_path, media_type = await self._download_feishu_message_resource(
|
|
message_id=message_id,
|
|
file_key=media_ref.file_key,
|
|
resource_type=media_ref.resource_type,
|
|
fallback_filename=media_ref.file_name,
|
|
)
|
|
if cached_path:
|
|
media_urls.append(cached_path)
|
|
media_types.append(media_type)
|
|
|
|
return media_urls, media_types
|
|
|
|
@staticmethod
|
|
def _resolve_media_message_type(media_type: str, *, default: MessageType) -> MessageType:
|
|
normalized = (media_type or "").lower()
|
|
if normalized.startswith("image/"):
|
|
return MessageType.PHOTO
|
|
if normalized.startswith("audio/"):
|
|
return MessageType.AUDIO
|
|
if normalized.startswith("video/"):
|
|
return MessageType.VIDEO
|
|
return default
|
|
|
|
def _resolve_normalized_message_type(
|
|
self,
|
|
normalized: FeishuNormalizedMessage,
|
|
media_types: List[str],
|
|
) -> MessageType:
|
|
preferred = normalized.preferred_message_type
|
|
if preferred == "photo":
|
|
return self._resolve_media_message_type(media_types[0] if media_types else "", default=MessageType.PHOTO)
|
|
if preferred == "audio":
|
|
return self._resolve_media_message_type(media_types[0] if media_types else "", default=MessageType.AUDIO)
|
|
if preferred == "document":
|
|
return self._resolve_media_message_type(media_types[0] if media_types else "", default=MessageType.DOCUMENT)
|
|
return MessageType.TEXT
|
|
|
|
def _normalize_inbound_text(self, text: str) -> str:
|
|
"""Strip Feishu mention placeholders from inbound text."""
|
|
text = _MENTION_RE.sub(" ", text or "")
|
|
text = _MULTISPACE_RE.sub(" ", text)
|
|
return text.strip()
|
|
|
|
async def _maybe_extract_text_document(self, cached_path: str, media_type: str) -> str:
|
|
if not cached_path or not media_type.startswith("text/"):
|
|
return ""
|
|
try:
|
|
if os.path.getsize(cached_path) > _MAX_TEXT_INJECT_BYTES:
|
|
return ""
|
|
ext = Path(cached_path).suffix.lower()
|
|
if ext not in {".txt", ".md"} and media_type not in {"text/plain", "text/markdown"}:
|
|
return ""
|
|
content = Path(cached_path).read_text(encoding="utf-8")
|
|
display_name = self._display_name_from_cached_path(cached_path)
|
|
return f"[Content of {display_name}]:\n{content}"
|
|
except (OSError, UnicodeDecodeError):
|
|
logger.warning("[Feishu] Failed to inject text document content from %s", cached_path, exc_info=True)
|
|
return ""
|
|
|
|
async def _download_feishu_image(self, *, message_id: str, image_key: str) -> tuple[str, str]:
|
|
if not self._client or not message_id:
|
|
return "", ""
|
|
try:
|
|
request = self._build_message_resource_request(
|
|
message_id=message_id,
|
|
file_key=image_key,
|
|
resource_type="image",
|
|
)
|
|
response = await asyncio.to_thread(self._client.im.v1.message_resource.get, request)
|
|
if not response or not response.success():
|
|
logger.warning(
|
|
"[Feishu] Failed to download image %s: %s %s",
|
|
image_key,
|
|
getattr(response, "code", "unknown"),
|
|
getattr(response, "msg", "request failed"),
|
|
)
|
|
return "", ""
|
|
raw_bytes = self._read_binary_response(response)
|
|
if not raw_bytes:
|
|
return "", ""
|
|
content_type = self._get_response_header(response, "Content-Type")
|
|
filename = getattr(response, "file_name", None) or f"{image_key}.jpg"
|
|
ext = self._guess_extension(filename, content_type, ".jpg", allowed=_IMAGE_EXTENSIONS)
|
|
cached_path = cache_image_from_bytes(raw_bytes, ext=ext)
|
|
media_type = self._normalize_media_type(content_type, default=self._default_image_media_type(ext))
|
|
return cached_path, media_type
|
|
except Exception:
|
|
logger.warning("[Feishu] Failed to cache image resource %s", image_key, exc_info=True)
|
|
return "", ""
|
|
|
|
async def _download_feishu_message_resource(
|
|
self,
|
|
*,
|
|
message_id: str,
|
|
file_key: str,
|
|
resource_type: str,
|
|
fallback_filename: str,
|
|
) -> tuple[str, str]:
|
|
if not self._client or not message_id:
|
|
return "", ""
|
|
|
|
request_types = [resource_type]
|
|
if resource_type in {"audio", "media"}:
|
|
request_types.append("file")
|
|
|
|
for request_type in request_types:
|
|
try:
|
|
request = self._build_message_resource_request(
|
|
message_id=message_id,
|
|
file_key=file_key,
|
|
resource_type=request_type,
|
|
)
|
|
response = await asyncio.to_thread(self._client.im.v1.message_resource.get, request)
|
|
if not response or not response.success():
|
|
logger.debug(
|
|
"[Feishu] Resource download failed for %s/%s via type=%s: %s %s",
|
|
message_id,
|
|
file_key,
|
|
request_type,
|
|
getattr(response, "code", "unknown"),
|
|
getattr(response, "msg", "request failed"),
|
|
)
|
|
continue
|
|
|
|
raw_bytes = self._read_binary_response(response)
|
|
if not raw_bytes:
|
|
continue
|
|
content_type = self._get_response_header(response, "Content-Type")
|
|
response_filename = getattr(response, "file_name", None) or ""
|
|
filename = response_filename or fallback_filename or f"{request_type}_{file_key}"
|
|
media_type = self._normalize_media_type(
|
|
content_type,
|
|
default=self._guess_media_type_from_filename(filename),
|
|
)
|
|
|
|
if media_type.startswith("image/"):
|
|
ext = self._guess_extension(filename, content_type, ".jpg", allowed=_IMAGE_EXTENSIONS)
|
|
cached_path = cache_image_from_bytes(raw_bytes, ext=ext)
|
|
logger.info("[Feishu] Cached message image resource at %s", cached_path)
|
|
return cached_path, media_type or self._default_image_media_type(ext)
|
|
|
|
if request_type == "audio" or media_type.startswith("audio/"):
|
|
ext = self._guess_extension(filename, content_type, ".ogg", allowed=_AUDIO_EXTENSIONS)
|
|
cached_path = cache_audio_from_bytes(raw_bytes, ext=ext)
|
|
logger.info("[Feishu] Cached message audio resource at %s", cached_path)
|
|
return cached_path, (media_type or f"audio/{ext.lstrip('.') or 'ogg'}")
|
|
|
|
if media_type.startswith("video/"):
|
|
if not Path(filename).suffix:
|
|
filename = f"{filename}.mp4"
|
|
cached_path = cache_document_from_bytes(raw_bytes, filename)
|
|
logger.info("[Feishu] Cached message video resource at %s", cached_path)
|
|
return cached_path, media_type
|
|
|
|
if not Path(filename).suffix and media_type in _DOCUMENT_MIME_TO_EXT:
|
|
filename = f"{filename}{_DOCUMENT_MIME_TO_EXT[media_type]}"
|
|
cached_path = cache_document_from_bytes(raw_bytes, filename)
|
|
logger.info("[Feishu] Cached message document resource at %s", cached_path)
|
|
return cached_path, (media_type or self._guess_document_media_type(filename))
|
|
except Exception:
|
|
logger.warning(
|
|
"[Feishu] Failed to cache message resource %s/%s",
|
|
message_id,
|
|
file_key,
|
|
exc_info=True,
|
|
)
|
|
return "", ""
|
|
|
|
# =========================================================================
|
|
# Static helpers — extension / media-type guessing
|
|
# =========================================================================
|
|
|
|
@staticmethod
|
|
def _read_binary_response(response: Any) -> bytes:
|
|
file_obj = getattr(response, "file", None)
|
|
if file_obj is None:
|
|
return b""
|
|
if hasattr(file_obj, "getvalue"):
|
|
return bytes(file_obj.getvalue())
|
|
return bytes(file_obj.read())
|
|
|
|
@staticmethod
|
|
def _get_response_header(response: Any, name: str) -> str:
|
|
raw = getattr(response, "raw", None)
|
|
headers = getattr(raw, "headers", {}) or {}
|
|
return str(headers.get(name, headers.get(name.lower(), "")) or "").split(";", 1)[0].strip().lower()
|
|
|
|
@staticmethod
|
|
def _guess_extension(filename: str, content_type: str, default: str, *, allowed: set[str]) -> str:
|
|
ext = Path(filename or "").suffix.lower()
|
|
if ext in allowed:
|
|
return ext
|
|
guessed = mimetypes.guess_extension((content_type or "").split(";", 1)[0].strip().lower() or "")
|
|
if guessed in allowed:
|
|
return guessed
|
|
return default
|
|
|
|
@staticmethod
|
|
def _normalize_media_type(content_type: str, *, default: str) -> str:
|
|
normalized = (content_type or "").split(";", 1)[0].strip().lower()
|
|
return normalized or default
|
|
|
|
@staticmethod
|
|
def _guess_document_media_type(filename: str) -> str:
|
|
ext = Path(filename or "").suffix.lower()
|
|
return SUPPORTED_DOCUMENT_TYPES.get(ext, mimetypes.guess_type(filename or "")[0] or "application/octet-stream")
|
|
|
|
@staticmethod
|
|
def _display_name_from_cached_path(path: str) -> str:
|
|
basename = os.path.basename(path)
|
|
parts = basename.split("_", 2)
|
|
display_name = parts[2] if len(parts) >= 3 else basename
|
|
return re.sub(r"[^\w.\- ]", "_", display_name)
|
|
|
|
@staticmethod
|
|
def _guess_media_type_from_filename(filename: str) -> str:
|
|
guessed = (mimetypes.guess_type(filename or "")[0] or "").lower()
|
|
if guessed:
|
|
return guessed
|
|
ext = Path(filename or "").suffix.lower()
|
|
if ext in _VIDEO_EXTENSIONS:
|
|
return f"video/{ext.lstrip('.')}"
|
|
if ext in _AUDIO_EXTENSIONS:
|
|
return f"audio/{ext.lstrip('.')}"
|
|
if ext in _IMAGE_EXTENSIONS:
|
|
return FeishuAdapter._default_image_media_type(ext)
|
|
return ""
|
|
|
|
@staticmethod
|
|
def _map_chat_type(raw_chat_type: str) -> str:
|
|
normalized = (raw_chat_type or "").strip().lower()
|
|
if normalized == "p2p":
|
|
return "dm"
|
|
if "topic" in normalized or "thread" in normalized or "forum" in normalized:
|
|
return "forum"
|
|
if normalized == "group":
|
|
return "group"
|
|
return "dm"
|
|
|
|
@staticmethod
|
|
def _resolve_source_chat_type(*, chat_info: Dict[str, Any], event_chat_type: str) -> str:
|
|
resolved = str(chat_info.get("type") or "").strip().lower()
|
|
if resolved in {"group", "forum"}:
|
|
return resolved
|
|
if event_chat_type == "p2p":
|
|
return "dm"
|
|
return "group"
|
|
|
|
async def _resolve_sender_profile(self, sender_id: Any) -> Dict[str, Optional[str]]:
|
|
open_id = getattr(sender_id, "open_id", None) or None
|
|
user_id = getattr(sender_id, "user_id", None) or None
|
|
union_id = getattr(sender_id, "union_id", None) or None
|
|
primary_id = open_id or user_id
|
|
display_name = await self._resolve_sender_name_from_api(primary_id or union_id)
|
|
return {
|
|
"user_id": primary_id,
|
|
"user_name": display_name,
|
|
"user_id_alt": union_id,
|
|
}
|
|
|
|
async def _resolve_sender_name_from_api(self, sender_id: Optional[str]) -> Optional[str]:
|
|
"""Fetch the sender's display name from the Feishu contact API with a 10-minute cache.
|
|
|
|
ID-type detection mirrors openclaw: ou_ → open_id, on_ → union_id, else user_id.
|
|
Failures are silently suppressed; the message pipeline must not block on name resolution.
|
|
"""
|
|
if not sender_id or not self._client:
|
|
return None
|
|
trimmed = sender_id.strip()
|
|
if not trimmed:
|
|
return None
|
|
now = time.time()
|
|
cached = self._sender_name_cache.get(trimmed)
|
|
if cached is not None:
|
|
name, expire_at = cached
|
|
if now < expire_at:
|
|
return name
|
|
try:
|
|
from lark_oapi.api.contact.v3 import GetUserRequest # lazy import
|
|
if trimmed.startswith("ou_"):
|
|
id_type = "open_id"
|
|
elif trimmed.startswith("on_"):
|
|
id_type = "union_id"
|
|
else:
|
|
id_type = "user_id"
|
|
request = GetUserRequest.builder().user_id(trimmed).user_id_type(id_type).build()
|
|
response = await asyncio.to_thread(self._client.contact.v3.user.get, request)
|
|
if not response or not response.success():
|
|
return None
|
|
user = getattr(getattr(response, "data", None), "user", None)
|
|
name = (
|
|
getattr(user, "name", None)
|
|
or getattr(user, "display_name", None)
|
|
or getattr(user, "nickname", None)
|
|
or getattr(user, "en_name", None)
|
|
)
|
|
if name and isinstance(name, str):
|
|
name = name.strip()
|
|
if name:
|
|
self._sender_name_cache[trimmed] = (name, now + _FEISHU_SENDER_NAME_TTL_SECONDS)
|
|
return name
|
|
except Exception:
|
|
logger.debug("[Feishu] Failed to resolve sender name for %s", sender_id, exc_info=True)
|
|
return None
|
|
|
|
async def _fetch_message_text(self, message_id: str) -> Optional[str]:
|
|
if not self._client or not message_id:
|
|
return None
|
|
if message_id in self._message_text_cache:
|
|
return self._message_text_cache[message_id]
|
|
try:
|
|
request = self._build_get_message_request(message_id)
|
|
response = await asyncio.to_thread(self._client.im.v1.message.get, request)
|
|
if not response or getattr(response, "success", lambda: False)() is False:
|
|
code = getattr(response, "code", "unknown")
|
|
msg = getattr(response, "msg", "message lookup failed")
|
|
logger.warning("[Feishu] Failed to fetch parent message %s: [%s] %s", message_id, code, msg)
|
|
return None
|
|
items = getattr(getattr(response, "data", None), "items", None) or []
|
|
parent = items[0] if items else None
|
|
body = getattr(parent, "body", None)
|
|
msg_type = getattr(parent, "msg_type", "") or ""
|
|
raw_content = getattr(body, "content", "") or ""
|
|
text = self._extract_text_from_raw_content(msg_type=msg_type, raw_content=raw_content)
|
|
self._message_text_cache[message_id] = text
|
|
return text
|
|
except Exception:
|
|
logger.warning("[Feishu] Failed to fetch parent message %s", message_id, exc_info=True)
|
|
return None
|
|
|
|
def _extract_text_from_raw_content(self, *, msg_type: str, raw_content: str) -> Optional[str]:
|
|
normalized = normalize_feishu_message(message_type=msg_type, raw_content=raw_content)
|
|
if normalized.text_content:
|
|
return normalized.text_content
|
|
placeholder = normalized.metadata.get("placeholder_text") if isinstance(normalized.metadata, dict) else None
|
|
return str(placeholder).strip() or None
|
|
|
|
@staticmethod
|
|
def _default_image_media_type(ext: str) -> str:
|
|
normalized_ext = (ext or "").lower()
|
|
if normalized_ext in {".jpg", ".jpeg"}:
|
|
return "image/jpeg"
|
|
return f"image/{normalized_ext.lstrip('.') or 'jpeg'}"
|
|
|
|
@staticmethod
|
|
def _log_background_failure(future: Any) -> None:
|
|
try:
|
|
future.result()
|
|
except Exception:
|
|
logger.exception("[Feishu] Background inbound processing failed")
|
|
|
|
# =========================================================================
|
|
# Group policy and mention gating
|
|
# =========================================================================
|
|
|
|
def _allow_group_message(self, sender_id: Any) -> bool:
|
|
"""Current group policy gate for non-DM traffic."""
|
|
if self._group_policy == "disabled":
|
|
return False
|
|
sender_open_id = getattr(sender_id, "open_id", None) or getattr(sender_id, "user_id", None)
|
|
if self._group_policy == "open":
|
|
return True
|
|
return bool(sender_open_id and sender_open_id in self._allowed_group_users)
|
|
|
|
def _should_accept_group_message(self, message: Any, sender_id: Any) -> bool:
|
|
"""Require an explicit @mention before group messages enter the agent."""
|
|
if not self._allow_group_message(sender_id):
|
|
return False
|
|
# @_all is Feishu's @everyone placeholder — always route to the bot.
|
|
raw_content = getattr(message, "content", "") or ""
|
|
if "@_all" in raw_content:
|
|
return True
|
|
mentions = getattr(message, "mentions", None) or []
|
|
if mentions:
|
|
return self._message_mentions_bot(mentions)
|
|
normalized = normalize_feishu_message(
|
|
message_type=getattr(message, "message_type", "") or "",
|
|
raw_content=raw_content,
|
|
)
|
|
if normalized.mentioned_ids:
|
|
return self._post_mentions_bot(normalized.mentioned_ids)
|
|
return False
|
|
|
|
def _message_mentions_bot(self, mentions: List[Any]) -> bool:
|
|
"""Check whether any mention targets the configured or inferred bot identity."""
|
|
for mention in mentions:
|
|
mention_id = getattr(mention, "id", None)
|
|
mention_open_id = getattr(mention_id, "open_id", None)
|
|
mention_user_id = getattr(mention_id, "user_id", None)
|
|
mention_name = (getattr(mention, "name", None) or "").strip()
|
|
|
|
if self._bot_open_id and mention_open_id == self._bot_open_id:
|
|
return True
|
|
if self._bot_user_id and mention_user_id == self._bot_user_id:
|
|
return True
|
|
if self._bot_name and mention_name == self._bot_name:
|
|
return True
|
|
|
|
return False
|
|
|
|
def _post_mentions_bot(self, mentioned_ids: List[str]) -> bool:
|
|
if not mentioned_ids:
|
|
return False
|
|
if self._bot_open_id and self._bot_open_id in mentioned_ids:
|
|
return True
|
|
if self._bot_user_id and self._bot_user_id in mentioned_ids:
|
|
return True
|
|
return False
|
|
|
|
async def _hydrate_bot_identity(self) -> None:
|
|
"""Best-effort discovery of bot identity for precise group mention gating."""
|
|
if not self._client:
|
|
return
|
|
if any((self._bot_open_id, self._bot_user_id, self._bot_name)):
|
|
return
|
|
try:
|
|
request = self._build_get_application_request(app_id=self._app_id, lang="en_us")
|
|
response = await asyncio.to_thread(self._client.application.v6.application.get, request)
|
|
if not response or not response.success():
|
|
code = getattr(response, "code", None)
|
|
if code == 99991672:
|
|
logger.warning(
|
|
"[Feishu] Unable to hydrate bot identity from application info. "
|
|
"Grant admin:app.info:readonly or application:application:self_manage "
|
|
"so group @mention gating can resolve the bot name precisely."
|
|
)
|
|
return
|
|
app = getattr(getattr(response, "data", None), "app", None)
|
|
app_name = (getattr(app, "app_name", None) or "").strip()
|
|
if app_name:
|
|
self._bot_name = app_name
|
|
except Exception:
|
|
logger.debug("[Feishu] Failed to hydrate bot identity", exc_info=True)
|
|
|
|
# =========================================================================
|
|
# Deduplication — seen message ID cache (persistent)
|
|
# =========================================================================
|
|
|
|
def _load_seen_message_ids(self) -> None:
|
|
try:
|
|
payload = json.loads(self._dedup_state_path.read_text(encoding="utf-8"))
|
|
except FileNotFoundError:
|
|
return
|
|
except (OSError, json.JSONDecodeError):
|
|
logger.warning("[Feishu] Failed to load persisted dedup state from %s", self._dedup_state_path, exc_info=True)
|
|
return
|
|
seen_data = payload.get("message_ids", {}) if isinstance(payload, dict) else {}
|
|
now = time.time()
|
|
ttl = _FEISHU_DEDUP_TTL_SECONDS
|
|
# Backward-compat: old format stored a plain list of IDs (no timestamps).
|
|
if isinstance(seen_data, list):
|
|
entries: Dict[str, float] = {str(item).strip(): 0.0 for item in seen_data if str(item).strip()}
|
|
elif isinstance(seen_data, dict):
|
|
entries = {k: float(v) for k, v in seen_data.items() if isinstance(k, str) and k.strip()}
|
|
else:
|
|
return
|
|
# Filter out TTL-expired entries (entries saved with ts=0.0 are treated as immortal
|
|
# for one migration cycle to avoid nuking old data on first upgrade).
|
|
valid: Dict[str, float] = {
|
|
msg_id: ts for msg_id, ts in entries.items()
|
|
if ts == 0.0 or ttl <= 0 or now - ts < ttl
|
|
}
|
|
# Apply size cap; keep the most recently seen IDs.
|
|
sorted_ids = sorted(valid, key=lambda k: valid[k], reverse=True)[:self._dedup_cache_size]
|
|
self._seen_message_order = list(reversed(sorted_ids))
|
|
self._seen_message_ids = {k: valid[k] for k in sorted_ids}
|
|
|
|
def _persist_seen_message_ids(self) -> None:
|
|
try:
|
|
self._dedup_state_path.parent.mkdir(parents=True, exist_ok=True)
|
|
recent = self._seen_message_order[-self._dedup_cache_size:]
|
|
# Save as {msg_id: timestamp} so TTL filtering works across restarts.
|
|
payload = {"message_ids": {k: self._seen_message_ids[k] for k in recent if k in self._seen_message_ids}}
|
|
self._dedup_state_path.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8")
|
|
except OSError:
|
|
logger.warning("[Feishu] Failed to persist dedup state to %s", self._dedup_state_path, exc_info=True)
|
|
|
|
def _is_duplicate(self, message_id: str) -> bool:
|
|
now = time.time()
|
|
ttl = _FEISHU_DEDUP_TTL_SECONDS
|
|
with self._dedup_lock:
|
|
seen_at = self._seen_message_ids.get(message_id)
|
|
if seen_at is not None and (ttl <= 0 or now - seen_at < ttl):
|
|
return True
|
|
# Record with current wall-clock timestamp so TTL works across restarts.
|
|
self._seen_message_ids[message_id] = now
|
|
self._seen_message_order.append(message_id)
|
|
while len(self._seen_message_order) > self._dedup_cache_size:
|
|
stale = self._seen_message_order.pop(0)
|
|
self._seen_message_ids.pop(stale, None)
|
|
self._persist_seen_message_ids()
|
|
return False
|
|
|
|
# =========================================================================
|
|
# Outbound payload construction and send pipeline
|
|
# =========================================================================
|
|
|
|
def _build_outbound_payload(self, content: str) -> tuple[str, str]:
|
|
if _MARKDOWN_HINT_RE.search(content):
|
|
return "post", _build_markdown_post_payload(content)
|
|
text_payload = {"text": content}
|
|
return "text", json.dumps(text_payload, ensure_ascii=False)
|
|
|
|
async def _send_uploaded_file_message(
|
|
self,
|
|
*,
|
|
chat_id: str,
|
|
file_path: str,
|
|
reply_to: Optional[str],
|
|
metadata: Optional[Dict[str, Any]],
|
|
caption: Optional[str] = None,
|
|
file_name: Optional[str] = None,
|
|
outbound_message_type: str = "file",
|
|
) -> SendResult:
|
|
if not self._client:
|
|
return SendResult(success=False, error="Not connected")
|
|
if not os.path.exists(file_path):
|
|
return SendResult(success=False, error=f"File not found: {file_path}")
|
|
|
|
display_name = file_name or os.path.basename(file_path)
|
|
upload_file_type, resolved_message_type = self._resolve_outbound_file_routing(
|
|
file_path=display_name,
|
|
requested_message_type=outbound_message_type,
|
|
)
|
|
try:
|
|
with open(file_path, "rb") as file_obj:
|
|
body = self._build_file_upload_body(
|
|
file_type=upload_file_type,
|
|
file_name=display_name,
|
|
file=file_obj,
|
|
)
|
|
request = self._build_file_upload_request(body)
|
|
upload_response = await asyncio.to_thread(self._client.im.v1.file.create, request)
|
|
file_key = self._extract_response_field(upload_response, "file_key")
|
|
if not file_key:
|
|
return self._response_error_result(
|
|
upload_response,
|
|
default_message="file upload failed",
|
|
override_error="Feishu file upload missing file_key",
|
|
)
|
|
|
|
if caption:
|
|
media_tag = {
|
|
"tag": "media",
|
|
"file_key": file_key,
|
|
"file_name": display_name,
|
|
}
|
|
message_response = await self._feishu_send_with_retry(
|
|
chat_id=chat_id,
|
|
msg_type="post",
|
|
payload=self._build_media_post_payload(caption=caption, media_tag=media_tag),
|
|
reply_to=reply_to,
|
|
metadata=metadata,
|
|
)
|
|
else:
|
|
message_response = await self._feishu_send_with_retry(
|
|
chat_id=chat_id,
|
|
msg_type=resolved_message_type,
|
|
payload=json.dumps({"file_key": file_key}, ensure_ascii=False),
|
|
reply_to=reply_to,
|
|
metadata=metadata,
|
|
)
|
|
return self._finalize_send_result(message_response, "file send failed")
|
|
except Exception as exc:
|
|
logger.error("[Feishu] Failed to send file %s: %s", file_path, exc, exc_info=True)
|
|
return SendResult(success=False, error=str(exc))
|
|
|
|
async def _send_raw_message(
|
|
self,
|
|
*,
|
|
chat_id: str,
|
|
msg_type: str,
|
|
payload: str,
|
|
reply_to: Optional[str],
|
|
metadata: Optional[Dict[str, Any]],
|
|
) -> Any:
|
|
reply_in_thread = bool((metadata or {}).get("thread_id"))
|
|
if reply_to:
|
|
body = self._build_reply_message_body(
|
|
content=payload,
|
|
msg_type=msg_type,
|
|
reply_in_thread=reply_in_thread,
|
|
uuid_value=str(uuid.uuid4()),
|
|
)
|
|
request = self._build_reply_message_request(reply_to, body)
|
|
return await asyncio.to_thread(self._client.im.v1.message.reply, request)
|
|
|
|
body = self._build_create_message_body(
|
|
receive_id=chat_id,
|
|
msg_type=msg_type,
|
|
content=payload,
|
|
uuid_value=str(uuid.uuid4()),
|
|
)
|
|
request = self._build_create_message_request("chat_id", body)
|
|
return await asyncio.to_thread(self._client.im.v1.message.create, request)
|
|
|
|
@staticmethod
|
|
def _response_succeeded(response: Any) -> bool:
|
|
return bool(response and getattr(response, "success", lambda: False)())
|
|
|
|
@staticmethod
|
|
def _extract_response_field(response: Any, field_name: str) -> Any:
|
|
if not FeishuAdapter._response_succeeded(response):
|
|
return None
|
|
data = getattr(response, "data", None)
|
|
return getattr(data, field_name, None) if data else None
|
|
|
|
def _response_error_result(
|
|
self,
|
|
response: Any,
|
|
*,
|
|
default_message: str,
|
|
override_error: Optional[str] = None,
|
|
) -> SendResult:
|
|
if override_error:
|
|
return SendResult(success=False, error=override_error, raw_response=response)
|
|
code = getattr(response, "code", "unknown")
|
|
msg = getattr(response, "msg", default_message)
|
|
return SendResult(success=False, error=f"[{code}] {msg}", raw_response=response)
|
|
|
|
def _finalize_send_result(self, response: Any, default_message: str) -> SendResult:
|
|
if not self._response_succeeded(response):
|
|
return self._response_error_result(response, default_message=default_message)
|
|
return SendResult(
|
|
success=True,
|
|
message_id=self._extract_response_field(response, "message_id"),
|
|
raw_response=response,
|
|
)
|
|
|
|
# =========================================================================
|
|
# Connection internals — websocket / webhook setup
|
|
# =========================================================================
|
|
|
|
async def _connect_with_retry(self) -> None:
|
|
for attempt in range(_FEISHU_CONNECT_ATTEMPTS):
|
|
try:
|
|
if self._connection_mode == "websocket":
|
|
await self._connect_websocket()
|
|
else:
|
|
await self._connect_webhook()
|
|
return
|
|
except Exception as exc:
|
|
self._running = False
|
|
self._disable_websocket_auto_reconnect()
|
|
self._ws_future = None
|
|
await self._stop_webhook_server()
|
|
if attempt >= _FEISHU_CONNECT_ATTEMPTS - 1:
|
|
raise
|
|
wait_seconds = 2 ** attempt
|
|
logger.warning(
|
|
"[Feishu] Connect attempt %d/%d failed; retrying in %ds: %s",
|
|
attempt + 1,
|
|
_FEISHU_CONNECT_ATTEMPTS,
|
|
wait_seconds,
|
|
exc,
|
|
)
|
|
await asyncio.sleep(wait_seconds)
|
|
|
|
async def _connect_websocket(self) -> None:
|
|
if not FEISHU_WEBSOCKET_AVAILABLE:
|
|
raise RuntimeError("websockets not installed; websocket mode unavailable")
|
|
domain = FEISHU_DOMAIN if self._domain_name != "lark" else LARK_DOMAIN
|
|
self._client = self._build_lark_client(domain)
|
|
await self._hydrate_bot_identity()
|
|
self._ws_client = FeishuWSClient(
|
|
app_id=self._app_id,
|
|
app_secret=self._app_secret,
|
|
log_level=lark.LogLevel.INFO,
|
|
event_handler=self._event_handler,
|
|
domain=domain,
|
|
)
|
|
self._ws_future = self._loop.run_in_executor(
|
|
None,
|
|
_run_official_feishu_ws_client,
|
|
self._ws_client,
|
|
)
|
|
|
|
async def _connect_webhook(self) -> None:
|
|
if not FEISHU_WEBHOOK_AVAILABLE:
|
|
raise RuntimeError("aiohttp not installed; webhook mode unavailable")
|
|
domain = FEISHU_DOMAIN if self._domain_name != "lark" else LARK_DOMAIN
|
|
self._client = self._build_lark_client(domain)
|
|
await self._hydrate_bot_identity()
|
|
app = web.Application()
|
|
app.router.add_post(self._webhook_path, self._handle_webhook_request)
|
|
self._webhook_runner = web.AppRunner(app)
|
|
await self._webhook_runner.setup()
|
|
self._webhook_site = web.TCPSite(self._webhook_runner, self._webhook_host, self._webhook_port)
|
|
await self._webhook_site.start()
|
|
|
|
def _build_lark_client(self, domain: Any) -> Any:
|
|
return (
|
|
lark.Client.builder()
|
|
.app_id(self._app_id)
|
|
.app_secret(self._app_secret)
|
|
.domain(domain)
|
|
.log_level(lark.LogLevel.WARNING)
|
|
.build()
|
|
)
|
|
|
|
async def _feishu_send_with_retry(
|
|
self,
|
|
*,
|
|
chat_id: str,
|
|
msg_type: str,
|
|
payload: str,
|
|
reply_to: Optional[str],
|
|
metadata: Optional[Dict[str, Any]],
|
|
) -> Any:
|
|
last_error: Optional[Exception] = None
|
|
active_reply_to = reply_to
|
|
for attempt in range(_FEISHU_SEND_ATTEMPTS):
|
|
try:
|
|
response = await self._send_raw_message(
|
|
chat_id=chat_id,
|
|
msg_type=msg_type,
|
|
payload=payload,
|
|
reply_to=active_reply_to,
|
|
metadata=metadata,
|
|
)
|
|
# If replying to a message failed because it was withdrawn or not found,
|
|
# fall back to posting a new message directly to the chat.
|
|
if active_reply_to and not self._response_succeeded(response):
|
|
code = getattr(response, "code", None)
|
|
if code in _FEISHU_REPLY_FALLBACK_CODES:
|
|
logger.warning(
|
|
"[Feishu] Reply to %s failed (code %s — message withdrawn/missing); "
|
|
"falling back to new message in chat %s",
|
|
active_reply_to,
|
|
code,
|
|
chat_id,
|
|
)
|
|
active_reply_to = None
|
|
response = await self._send_raw_message(
|
|
chat_id=chat_id,
|
|
msg_type=msg_type,
|
|
payload=payload,
|
|
reply_to=None,
|
|
metadata=metadata,
|
|
)
|
|
return response
|
|
except Exception as exc:
|
|
last_error = exc
|
|
if msg_type == "post" and _POST_CONTENT_INVALID_RE.search(str(exc)):
|
|
raise
|
|
if attempt >= _FEISHU_SEND_ATTEMPTS - 1:
|
|
raise
|
|
wait_seconds = 2 ** attempt
|
|
logger.warning(
|
|
"[Feishu] Send attempt %d/%d failed for chat %s; retrying in %ds: %s",
|
|
attempt + 1,
|
|
_FEISHU_SEND_ATTEMPTS,
|
|
chat_id,
|
|
wait_seconds,
|
|
exc,
|
|
)
|
|
await asyncio.sleep(wait_seconds)
|
|
raise last_error or RuntimeError("Feishu send failed")
|
|
|
|
async def _release_app_lock(self) -> None:
|
|
if not self._app_lock_identity:
|
|
return
|
|
try:
|
|
release_scoped_lock(_FEISHU_APP_LOCK_SCOPE, self._app_lock_identity)
|
|
except Exception as exc:
|
|
logger.warning("[Feishu] Failed to release app lock: %s", exc, exc_info=True)
|
|
finally:
|
|
self._app_lock_identity = None
|
|
|
|
# =========================================================================
|
|
# Lark API request builders
|
|
# =========================================================================
|
|
|
|
@staticmethod
|
|
def _build_get_chat_request(chat_id: str) -> Any:
|
|
if "GetChatRequest" in globals():
|
|
return GetChatRequest.builder().chat_id(chat_id).build()
|
|
return SimpleNamespace(chat_id=chat_id)
|
|
|
|
@staticmethod
|
|
def _build_get_message_request(message_id: str) -> Any:
|
|
if "GetMessageRequest" in globals():
|
|
return GetMessageRequest.builder().message_id(message_id).build()
|
|
return SimpleNamespace(message_id=message_id)
|
|
|
|
@staticmethod
|
|
def _build_message_resource_request(*, message_id: str, file_key: str, resource_type: str) -> Any:
|
|
if "GetMessageResourceRequest" in globals():
|
|
return (
|
|
GetMessageResourceRequest.builder()
|
|
.message_id(message_id)
|
|
.file_key(file_key)
|
|
.type(resource_type)
|
|
.build()
|
|
)
|
|
return SimpleNamespace(message_id=message_id, file_key=file_key, type=resource_type)
|
|
|
|
@staticmethod
|
|
def _build_get_application_request(*, app_id: str, lang: str) -> Any:
|
|
if "GetApplicationRequest" in globals():
|
|
return (
|
|
GetApplicationRequest.builder()
|
|
.app_id(app_id)
|
|
.lang(lang)
|
|
.build()
|
|
)
|
|
return SimpleNamespace(app_id=app_id, lang=lang)
|
|
|
|
@staticmethod
|
|
def _build_reply_message_body(*, content: str, msg_type: str, reply_in_thread: bool, uuid_value: str) -> Any:
|
|
if "ReplyMessageRequestBody" in globals():
|
|
return (
|
|
ReplyMessageRequestBody.builder()
|
|
.content(content)
|
|
.msg_type(msg_type)
|
|
.reply_in_thread(reply_in_thread)
|
|
.uuid(uuid_value)
|
|
.build()
|
|
)
|
|
return SimpleNamespace(
|
|
content=content,
|
|
msg_type=msg_type,
|
|
reply_in_thread=reply_in_thread,
|
|
uuid=uuid_value,
|
|
)
|
|
|
|
@staticmethod
|
|
def _build_reply_message_request(message_id: str, request_body: Any) -> Any:
|
|
if "ReplyMessageRequest" in globals():
|
|
return (
|
|
ReplyMessageRequest.builder()
|
|
.message_id(message_id)
|
|
.request_body(request_body)
|
|
.build()
|
|
)
|
|
return SimpleNamespace(message_id=message_id, request_body=request_body)
|
|
|
|
@staticmethod
|
|
def _build_update_message_body(*, msg_type: str, content: str) -> Any:
|
|
if "UpdateMessageRequestBody" in globals():
|
|
return (
|
|
UpdateMessageRequestBody.builder()
|
|
.msg_type(msg_type)
|
|
.content(content)
|
|
.build()
|
|
)
|
|
return SimpleNamespace(msg_type=msg_type, content=content)
|
|
|
|
@staticmethod
|
|
def _build_update_message_request(message_id: str, request_body: Any) -> Any:
|
|
if "UpdateMessageRequest" in globals():
|
|
return (
|
|
UpdateMessageRequest.builder()
|
|
.message_id(message_id)
|
|
.request_body(request_body)
|
|
.build()
|
|
)
|
|
return SimpleNamespace(message_id=message_id, request_body=request_body)
|
|
|
|
@staticmethod
|
|
def _build_create_message_body(*, receive_id: str, msg_type: str, content: str, uuid_value: str) -> Any:
|
|
if "CreateMessageRequestBody" in globals():
|
|
return (
|
|
CreateMessageRequestBody.builder()
|
|
.receive_id(receive_id)
|
|
.msg_type(msg_type)
|
|
.content(content)
|
|
.uuid(uuid_value)
|
|
.build()
|
|
)
|
|
return SimpleNamespace(
|
|
receive_id=receive_id,
|
|
msg_type=msg_type,
|
|
content=content,
|
|
uuid=uuid_value,
|
|
)
|
|
|
|
@staticmethod
|
|
def _build_create_message_request(receive_id_type: str, request_body: Any) -> Any:
|
|
if "CreateMessageRequest" in globals():
|
|
return (
|
|
CreateMessageRequest.builder()
|
|
.receive_id_type(receive_id_type)
|
|
.request_body(request_body)
|
|
.build()
|
|
)
|
|
return SimpleNamespace(receive_id_type=receive_id_type, request_body=request_body)
|
|
|
|
@staticmethod
|
|
def _build_image_upload_body(*, image_type: str, image: Any) -> Any:
|
|
if "CreateImageRequestBody" in globals():
|
|
return (
|
|
CreateImageRequestBody.builder()
|
|
.image_type(image_type)
|
|
.image(image)
|
|
.build()
|
|
)
|
|
return SimpleNamespace(image_type=image_type, image=image)
|
|
|
|
@staticmethod
|
|
def _build_image_upload_request(request_body: Any) -> Any:
|
|
if "CreateImageRequest" in globals():
|
|
return CreateImageRequest.builder().request_body(request_body).build()
|
|
return SimpleNamespace(request_body=request_body)
|
|
|
|
@staticmethod
|
|
def _build_file_upload_body(*, file_type: str, file_name: str, file: Any) -> Any:
|
|
if "CreateFileRequestBody" in globals():
|
|
return (
|
|
CreateFileRequestBody.builder()
|
|
.file_type(file_type)
|
|
.file_name(file_name)
|
|
.file(file)
|
|
.build()
|
|
)
|
|
return SimpleNamespace(file_type=file_type, file_name=file_name, file=file)
|
|
|
|
@staticmethod
|
|
def _build_file_upload_request(request_body: Any) -> Any:
|
|
if "CreateFileRequest" in globals():
|
|
return CreateFileRequest.builder().request_body(request_body).build()
|
|
return SimpleNamespace(request_body=request_body)
|
|
|
|
def _build_post_payload(self, content: str) -> str:
|
|
return _build_markdown_post_payload(content)
|
|
|
|
def _build_media_post_payload(self, *, caption: str, media_tag: Dict[str, str]) -> str:
|
|
payload = json.loads(self._build_post_payload(caption))
|
|
content = payload.setdefault("zh_cn", {}).setdefault("content", [])
|
|
content.append([media_tag])
|
|
return json.dumps(payload, ensure_ascii=False)
|
|
|
|
@staticmethod
|
|
def _resolve_outbound_file_routing(
|
|
*,
|
|
file_path: str,
|
|
requested_message_type: str,
|
|
) -> tuple[str, str]:
|
|
ext = Path(file_path).suffix.lower()
|
|
|
|
if ext in _FEISHU_OPUS_UPLOAD_EXTENSIONS:
|
|
return "opus", "audio"
|
|
|
|
if ext in _FEISHU_MEDIA_UPLOAD_EXTENSIONS:
|
|
return "mp4", "media"
|
|
|
|
if ext in _FEISHU_DOC_UPLOAD_TYPES:
|
|
return _FEISHU_DOC_UPLOAD_TYPES[ext], "file"
|
|
|
|
if requested_message_type == "file":
|
|
return _FEISHU_FILE_UPLOAD_TYPE, "file"
|
|
|
|
return _FEISHU_FILE_UPLOAD_TYPE, "file"
|