feat(gateway): add per-group access control for Feishu

Add fine-grained authorization policies per Feishu group chat via
platforms.feishu.extra configuration.

- Add global bot-level admins that bypass all group restrictions
- Add per-group policies: open, allowlist, blacklist, admin_only, disabled
- Add default_group_policy fallback for chats without explicit rules
- Thread chat_id through group message gate for per-chat rule selection
- Match both open_id and user_id for backward compatibility
- Preserve existing FEISHU_ALLOWED_USERS / FEISHU_GROUP_POLICY behavior
- Add focused regression tests for all policy modes

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
jtuki
2026-04-06 22:18:41 +08:00
committed by Teknium
parent 18727ca9aa
commit 57abc99315
2 changed files with 290 additions and 46 deletions

View File

@@ -274,6 +274,18 @@ class FeishuAdapterSettings:
ws_reconnect_interval: int = 120
ws_ping_interval: Optional[int] = None
ws_ping_timeout: Optional[int] = None
admins: frozenset[str] = frozenset()
default_group_policy: str = ""
group_rules: Dict[str, FeishuGroupRule] = field(default_factory=dict)
@dataclass
class FeishuGroupRule:
"""Per-group policy rule for controlling which users may interact with the bot."""
policy: str # "open" | "allowlist" | "blacklist" | "admin_only" | "disabled"
allowlist: set[str] = field(default_factory=set)
blacklist: set[str] = field(default_factory=set)
@dataclass
@@ -1049,6 +1061,26 @@ class FeishuAdapter(BasePlatformAdapter):
@staticmethod
def _load_settings(extra: Dict[str, Any]) -> FeishuAdapterSettings:
# Parse per-group rules from config
raw_group_rules = extra.get("group_rules", {})
group_rules: Dict[str, FeishuGroupRule] = {}
if isinstance(raw_group_rules, dict):
for chat_id, rule_cfg in raw_group_rules.items():
if not isinstance(rule_cfg, dict):
continue
group_rules[str(chat_id)] = FeishuGroupRule(
policy=str(rule_cfg.get("policy", "open")).strip().lower(),
allowlist=set(str(u).strip() for u in rule_cfg.get("allowlist", []) if str(u).strip()),
blacklist=set(str(u).strip() for u in rule_cfg.get("blacklist", []) if str(u).strip()),
)
# Bot-level admins
raw_admins = extra.get("admins", [])
admins = frozenset(str(u).strip() for u in raw_admins if str(u).strip())
# Default group policy (for groups not in group_rules)
default_group_policy = str(extra.get("default_group_policy", "")).strip().lower()
return FeishuAdapterSettings(
app_id=str(extra.get("app_id") or os.getenv("FEISHU_APP_ID", "")).strip(),
app_secret=str(extra.get("app_secret") or os.getenv("FEISHU_APP_SECRET", "")).strip(),
@@ -1099,6 +1131,9 @@ class FeishuAdapter(BasePlatformAdapter):
ws_reconnect_interval=_coerce_required_int(extra.get("ws_reconnect_interval"), default=120, min_value=1),
ws_ping_interval=_coerce_int(extra.get("ws_ping_interval"), default=None, min_value=1),
ws_ping_timeout=_coerce_int(extra.get("ws_ping_timeout"), default=None, min_value=1),
admins=admins,
default_group_policy=default_group_policy,
group_rules=group_rules,
)
def _apply_settings(self, settings: FeishuAdapterSettings) -> None:
@@ -1110,6 +1145,9 @@ class FeishuAdapter(BasePlatformAdapter):
self._verification_token = settings.verification_token
self._group_policy = settings.group_policy
self._allowed_group_users = set(settings.allowed_group_users)
self._admins = set(settings.admins)
self._default_group_policy = settings.default_group_policy or settings.group_policy
self._group_rules = settings.group_rules
self._bot_open_id = settings.bot_open_id
self._bot_user_id = settings.bot_user_id
self._bot_name = settings.bot_name
@@ -1617,7 +1655,8 @@ class FeishuAdapter(BasePlatformAdapter):
return
chat_type = getattr(message, "chat_type", "p2p")
if chat_type != "p2p" and not self._should_accept_group_message(message, sender_id):
chat_id = getattr(message, "chat_id", "") or ""
if chat_type != "p2p" and not self._should_accept_group_message(message, sender_id, chat_id):
logger.debug("[Feishu] Dropping group message that failed mention/policy gate: %s", message_id)
return
await self._process_inbound_message(
@@ -2773,18 +2812,41 @@ class FeishuAdapter(BasePlatformAdapter):
# Group policy and mention gating
# =========================================================================
def _allow_group_message(self, sender_id: Any) -> bool:
"""Current group policy gate for non-DM traffic."""
if self._group_policy == "disabled":
return False
sender_open_id = getattr(sender_id, "open_id", None) or getattr(sender_id, "user_id", None)
if self._group_policy == "open":
return True
return bool(sender_open_id and sender_open_id in self._allowed_group_users)
def _allow_group_message(self, sender_id: Any, chat_id: str = "") -> bool:
"""Per-group policy gate for non-DM traffic."""
sender_open_id = getattr(sender_id, "open_id", None)
sender_user_id = getattr(sender_id, "user_id", None)
sender_ids = {sender_open_id, sender_user_id} - {None}
def _should_accept_group_message(self, message: Any, sender_id: Any) -> bool:
if sender_ids and self._admins and (sender_ids & self._admins):
return True
rule = self._group_rules.get(chat_id) if chat_id else None
if rule:
policy = rule.policy
allowlist = rule.allowlist
blacklist = rule.blacklist
else:
policy = self._default_group_policy or self._group_policy
allowlist = self._allowed_group_users
blacklist = set()
if policy == "disabled":
return False
if policy == "open":
return True
if policy == "admin_only":
return False
if policy == "allowlist":
return bool(sender_ids and (sender_ids & allowlist))
if policy == "blacklist":
return bool(sender_ids and not (sender_ids & blacklist))
return bool(sender_ids and (sender_ids & self._allowed_group_users))
def _should_accept_group_message(self, message: Any, sender_id: Any, chat_id: str = "") -> bool:
"""Require an explicit @mention before group messages enter the agent."""
if not self._allow_group_message(sender_id):
if not self._allow_group_message(sender_id, chat_id):
return False
# @_all is Feishu's @everyone placeholder — always route to the bot.
raw_content = getattr(message, "content", "") or ""

View File

@@ -822,10 +822,10 @@ class TestAdapterBehavior(unittest.TestCase):
adapter = FeishuAdapter(PlatformConfig())
message = SimpleNamespace(mentions=[])
sender_id = SimpleNamespace(open_id="ou_any", user_id=None)
self.assertFalse(adapter._should_accept_group_message(message, sender_id))
self.assertFalse(adapter._should_accept_group_message(message, sender_id, ""))
message_with_mention = SimpleNamespace(mentions=[SimpleNamespace(key="@_user_1")])
self.assertFalse(adapter._should_accept_group_message(message_with_mention, sender_id))
self.assertFalse(adapter._should_accept_group_message(message_with_mention, sender_id, ""))
@patch.dict(os.environ, {"FEISHU_GROUP_POLICY": "open"}, clear=True)
def test_group_message_with_other_user_mention_is_rejected_when_bot_identity_unknown(self):
@@ -839,7 +839,7 @@ class TestAdapterBehavior(unittest.TestCase):
id=SimpleNamespace(open_id="ou_other", user_id="u_other"),
)
self.assertFalse(adapter._should_accept_group_message(SimpleNamespace(mentions=[other_mention]), sender_id))
self.assertFalse(adapter._should_accept_group_message(SimpleNamespace(mentions=[other_mention]), sender_id, ""))
@patch.dict(
os.environ,
@@ -868,28 +868,222 @@ class TestAdapterBehavior(unittest.TestCase):
adapter._should_accept_group_message(
mentioned,
SimpleNamespace(open_id="ou_allowed", user_id=None),
"",
)
)
self.assertFalse(
adapter._should_accept_group_message(
mentioned,
SimpleNamespace(open_id="ou_blocked", user_id=None),
"",
)
)
@patch.dict(
os.environ,
{
"FEISHU_GROUP_POLICY": "open",
"FEISHU_BOT_OPEN_ID": "ou_bot",
},
clear=True,
)
def test_per_group_allowlist_policy_gates_by_sender(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
config = PlatformConfig(
extra={
"group_rules": {
"oc_chat_a": {
"policy": "allowlist",
"allowlist": ["ou_alice", "ou_bob"],
}
}
}
)
adapter = FeishuAdapter(config)
adapter._bot_open_id = "ou_bot"
message = SimpleNamespace(
mentions=[SimpleNamespace(name="Bot", id=SimpleNamespace(open_id="ou_bot", user_id=None))]
)
self.assertTrue(
adapter._should_accept_group_message(
message,
SimpleNamespace(open_id="ou_alice", user_id=None),
"oc_chat_a",
)
)
self.assertFalse(
adapter._should_accept_group_message(
message,
SimpleNamespace(open_id="ou_charlie", user_id=None),
"oc_chat_a",
)
)
def test_per_group_blacklist_policy_blocks_specific_users(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
config = PlatformConfig(
extra={
"group_rules": {
"oc_chat_b": {
"policy": "blacklist",
"blacklist": ["ou_blocked"],
}
}
}
)
adapter = FeishuAdapter(config)
adapter._bot_open_id = "ou_bot"
message = SimpleNamespace(
mentions=[SimpleNamespace(name="Bot", id=SimpleNamespace(open_id="ou_bot", user_id=None))]
)
self.assertTrue(
adapter._should_accept_group_message(
message,
SimpleNamespace(open_id="ou_alice", user_id=None),
"oc_chat_b",
)
)
self.assertFalse(
adapter._should_accept_group_message(
message,
SimpleNamespace(open_id="ou_blocked", user_id=None),
"oc_chat_b",
)
)
def test_per_group_admin_only_policy_requires_admin(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
config = PlatformConfig(
extra={
"admins": ["ou_admin"],
"group_rules": {
"oc_chat_c": {
"policy": "admin_only",
}
},
}
)
adapter = FeishuAdapter(config)
adapter._bot_open_id = "ou_bot"
message = SimpleNamespace(
mentions=[SimpleNamespace(name="Bot", id=SimpleNamespace(open_id="ou_bot", user_id=None))]
)
self.assertTrue(
adapter._should_accept_group_message(
message,
SimpleNamespace(open_id="ou_admin", user_id=None),
"oc_chat_c",
)
)
self.assertFalse(
adapter._should_accept_group_message(
message,
SimpleNamespace(open_id="ou_regular", user_id=None),
"oc_chat_c",
)
)
def test_per_group_disabled_policy_blocks_all(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
config = PlatformConfig(
extra={
"admins": ["ou_admin"],
"group_rules": {
"oc_chat_d": {
"policy": "disabled",
}
},
}
)
adapter = FeishuAdapter(config)
adapter._bot_open_id = "ou_bot"
message = SimpleNamespace(
mentions=[SimpleNamespace(name="Bot", id=SimpleNamespace(open_id="ou_bot", user_id=None))]
)
self.assertTrue(
adapter._should_accept_group_message(
message,
SimpleNamespace(open_id="ou_admin", user_id=None),
"oc_chat_d",
)
)
self.assertFalse(
adapter._should_accept_group_message(
message,
SimpleNamespace(open_id="ou_regular", user_id=None),
"oc_chat_d",
)
)
def test_global_admins_bypass_all_group_rules(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
config = PlatformConfig(
extra={
"admins": ["ou_admin"],
"group_rules": {
"oc_chat_e": {
"policy": "allowlist",
"allowlist": ["ou_alice"],
}
},
}
)
adapter = FeishuAdapter(config)
adapter._bot_open_id = "ou_bot"
message = SimpleNamespace(
mentions=[SimpleNamespace(name="Bot", id=SimpleNamespace(open_id="ou_bot", user_id=None))]
)
self.assertTrue(
adapter._should_accept_group_message(
message,
SimpleNamespace(open_id="ou_admin", user_id=None),
"oc_chat_e",
)
)
def test_default_group_policy_fallback_for_chats_without_explicit_rule(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
config = PlatformConfig(
extra={
"default_group_policy": "open",
}
)
adapter = FeishuAdapter(config)
adapter._bot_open_id = "ou_bot"
message = SimpleNamespace(
mentions=[SimpleNamespace(name="Bot", id=SimpleNamespace(open_id="ou_bot", user_id=None))]
)
self.assertTrue(
adapter._should_accept_group_message(
message,
SimpleNamespace(open_id="ou_anyone", user_id=None),
"oc_chat_unknown",
)
)
@patch.dict(os.environ, {"FEISHU_GROUP_POLICY": "open"}, clear=True)
def test_group_message_matches_bot_open_id_when_configured(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter._bot_open_id = "ou_bot"
sender_id = SimpleNamespace(open_id="ou_any", user_id=None)
bot_mention = SimpleNamespace(
@@ -901,22 +1095,16 @@ class TestAdapterBehavior(unittest.TestCase):
id=SimpleNamespace(open_id="ou_other", user_id="u_other"),
)
self.assertTrue(adapter._should_accept_group_message(SimpleNamespace(mentions=[bot_mention]), sender_id))
self.assertFalse(adapter._should_accept_group_message(SimpleNamespace(mentions=[other_mention]), sender_id))
self.assertTrue(adapter._should_accept_group_message(SimpleNamespace(mentions=[bot_mention]), sender_id, ""))
self.assertFalse(adapter._should_accept_group_message(SimpleNamespace(mentions=[other_mention]), sender_id, ""))
@patch.dict(
os.environ,
{
"FEISHU_GROUP_POLICY": "open",
"FEISHU_BOT_NAME": "Hermes Bot",
},
clear=True,
)
@patch.dict(os.environ, {"FEISHU_GROUP_POLICY": "open"}, clear=True)
def test_group_message_matches_bot_name_when_only_name_available(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter._bot_name = "Hermes Bot"
sender_id = SimpleNamespace(open_id="ou_any", user_id=None)
named_mention = SimpleNamespace(
@@ -928,22 +1116,16 @@ class TestAdapterBehavior(unittest.TestCase):
id=SimpleNamespace(open_id="ou_other", user_id="u_other"),
)
self.assertTrue(adapter._should_accept_group_message(SimpleNamespace(mentions=[named_mention]), sender_id))
self.assertFalse(adapter._should_accept_group_message(SimpleNamespace(mentions=[different_mention]), sender_id))
self.assertTrue(adapter._should_accept_group_message(SimpleNamespace(mentions=[named_mention]), sender_id, ""))
self.assertFalse(adapter._should_accept_group_message(SimpleNamespace(mentions=[different_mention]), sender_id, ""))
@patch.dict(
os.environ,
{
"FEISHU_GROUP_POLICY": "open",
"FEISHU_BOT_OPEN_ID": "ou_bot",
},
clear=True,
)
@patch.dict(os.environ, {"FEISHU_GROUP_POLICY": "open"}, clear=True)
def test_group_post_message_uses_parsed_mentions_when_sdk_mentions_missing(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter._bot_open_id = "ou_bot"
sender_id = SimpleNamespace(open_id="ou_any", user_id=None)
message = SimpleNamespace(
message_type="post",
@@ -951,7 +1133,7 @@ class TestAdapterBehavior(unittest.TestCase):
content='{"en_us":{"content":[[{"tag":"at","user_name":"Hermes","open_id":"ou_bot"}]]}}',
)
self.assertTrue(adapter._should_accept_group_message(message, sender_id))
self.assertTrue(adapter._should_accept_group_message(message, sender_id, ""))
@patch.dict(os.environ, {}, clear=True)
def test_extract_post_message_as_text(self):
@@ -2618,7 +2800,7 @@ class TestGroupMentionAtAll(unittest.TestCase):
mentions=[],
)
sender_id = SimpleNamespace(open_id="ou_any", user_id=None)
self.assertTrue(adapter._should_accept_group_message(message, sender_id))
self.assertTrue(adapter._should_accept_group_message(message, sender_id, ""))
@patch.dict(os.environ, {"FEISHU_GROUP_POLICY": "allowlist", "FEISHU_ALLOWED_USERS": "ou_allowed"}, clear=True)
def test_at_all_still_requires_policy_gate(self):
@@ -2630,10 +2812,10 @@ class TestGroupMentionAtAll(unittest.TestCase):
message = SimpleNamespace(content='{"text":"@_all attention"}', mentions=[])
# Non-allowlisted user — should be blocked even with @_all.
blocked_sender = SimpleNamespace(open_id="ou_blocked", user_id=None)
self.assertFalse(adapter._should_accept_group_message(message, blocked_sender))
self.assertFalse(adapter._should_accept_group_message(message, blocked_sender, ""))
# Allowlisted user — should pass.
allowed_sender = SimpleNamespace(open_id="ou_allowed", user_id=None)
self.assertTrue(adapter._should_accept_group_message(message, allowed_sender))
self.assertTrue(adapter._should_accept_group_message(message, allowed_sender, ""))
@unittest.skipUnless(_HAS_LARK_OAPI, "lark-oapi not installed")