Merge pull request #1683 from NousResearch/feat/mattermost-matrix-adapters

feat: add Mattermost and Matrix gateway adapters
This commit is contained in:
Teknium
2026-03-17 03:18:25 -07:00
committed by GitHub
12 changed files with 3462 additions and 0 deletions

View File

@@ -40,6 +40,8 @@ class Platform(Enum):
WHATSAPP = "whatsapp"
SLACK = "slack"
SIGNAL = "signal"
MATTERMOST = "mattermost"
MATRIX = "matrix"
HOMEASSISTANT = "homeassistant"
EMAIL = "email"
SMS = "sms"
@@ -442,6 +444,8 @@ def load_gateway_config() -> GatewayConfig:
Platform.TELEGRAM: "TELEGRAM_BOT_TOKEN",
Platform.DISCORD: "DISCORD_BOT_TOKEN",
Platform.SLACK: "SLACK_BOT_TOKEN",
Platform.MATTERMOST: "MATTERMOST_TOKEN",
Platform.MATRIX: "MATRIX_ACCESS_TOKEN",
}
for platform, pconfig in config.platforms.items():
if not pconfig.enabled:
@@ -535,6 +539,53 @@ def _apply_env_overrides(config: GatewayConfig) -> None:
name=os.getenv("SIGNAL_HOME_CHANNEL_NAME", "Home"),
)
# Mattermost
mattermost_token = os.getenv("MATTERMOST_TOKEN")
if mattermost_token:
mattermost_url = os.getenv("MATTERMOST_URL", "")
if not mattermost_url:
logger.warning("MATTERMOST_TOKEN set but MATTERMOST_URL is missing")
if Platform.MATTERMOST not in config.platforms:
config.platforms[Platform.MATTERMOST] = PlatformConfig()
config.platforms[Platform.MATTERMOST].enabled = True
config.platforms[Platform.MATTERMOST].token = mattermost_token
config.platforms[Platform.MATTERMOST].extra["url"] = mattermost_url
mattermost_home = os.getenv("MATTERMOST_HOME_CHANNEL")
if mattermost_home:
config.platforms[Platform.MATTERMOST].home_channel = HomeChannel(
platform=Platform.MATTERMOST,
chat_id=mattermost_home,
name=os.getenv("MATTERMOST_HOME_CHANNEL_NAME", "Home"),
)
# Matrix
matrix_token = os.getenv("MATRIX_ACCESS_TOKEN")
matrix_homeserver = os.getenv("MATRIX_HOMESERVER", "")
if matrix_token or os.getenv("MATRIX_PASSWORD"):
if not matrix_homeserver:
logger.warning("MATRIX_ACCESS_TOKEN/MATRIX_PASSWORD set but MATRIX_HOMESERVER is missing")
if Platform.MATRIX not in config.platforms:
config.platforms[Platform.MATRIX] = PlatformConfig()
config.platforms[Platform.MATRIX].enabled = True
if matrix_token:
config.platforms[Platform.MATRIX].token = matrix_token
config.platforms[Platform.MATRIX].extra["homeserver"] = matrix_homeserver
matrix_user = os.getenv("MATRIX_USER_ID", "")
if matrix_user:
config.platforms[Platform.MATRIX].extra["user_id"] = matrix_user
matrix_password = os.getenv("MATRIX_PASSWORD", "")
if matrix_password:
config.platforms[Platform.MATRIX].extra["password"] = matrix_password
matrix_e2ee = os.getenv("MATRIX_ENCRYPTION", "").lower() in ("true", "1", "yes")
config.platforms[Platform.MATRIX].extra["encryption"] = matrix_e2ee
matrix_home = os.getenv("MATRIX_HOME_ROOM")
if matrix_home:
config.platforms[Platform.MATRIX].home_channel = HomeChannel(
platform=Platform.MATRIX,
chat_id=matrix_home,
name=os.getenv("MATRIX_HOME_ROOM_NAME", "Home"),
)
# Home Assistant
hass_token = os.getenv("HASS_TOKEN")
if hass_token:

841
gateway/platforms/matrix.py Normal file
View File

@@ -0,0 +1,841 @@
"""Matrix gateway adapter.
Connects to any Matrix homeserver (self-hosted or matrix.org) via the
matrix-nio Python SDK. Supports optional end-to-end encryption (E2EE)
when installed with ``pip install "matrix-nio[e2e]"``.
Environment variables:
MATRIX_HOMESERVER Homeserver URL (e.g. https://matrix.example.org)
MATRIX_ACCESS_TOKEN Access token (preferred auth method)
MATRIX_USER_ID Full user ID (@bot:server) — required for password login
MATRIX_PASSWORD Password (alternative to access token)
MATRIX_ENCRYPTION Set "true" to enable E2EE
MATRIX_ALLOWED_USERS Comma-separated Matrix user IDs (@user:server)
MATRIX_HOME_ROOM Room ID for cron/notification delivery
"""
from __future__ import annotations
import asyncio
import json
import logging
import mimetypes
import os
import re
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import (
BasePlatformAdapter,
MessageEvent,
MessageType,
SendResult,
)
logger = logging.getLogger(__name__)
# Matrix message size limit (4000 chars practical, spec has no hard limit
# but clients render poorly above this).
MAX_MESSAGE_LENGTH = 4000
# Store directory for E2EE keys and sync state.
_STORE_DIR = Path.home() / ".hermes" / "matrix" / "store"
# Grace period: ignore messages older than this many seconds before startup.
_STARTUP_GRACE_SECONDS = 5
def check_matrix_requirements() -> bool:
"""Return True if the Matrix adapter can be used."""
token = os.getenv("MATRIX_ACCESS_TOKEN", "")
password = os.getenv("MATRIX_PASSWORD", "")
homeserver = os.getenv("MATRIX_HOMESERVER", "")
if not token and not password:
logger.debug("Matrix: neither MATRIX_ACCESS_TOKEN nor MATRIX_PASSWORD set")
return False
if not homeserver:
logger.warning("Matrix: MATRIX_HOMESERVER not set")
return False
try:
import nio # noqa: F401
return True
except ImportError:
logger.warning(
"Matrix: matrix-nio not installed. "
"Run: pip install 'matrix-nio[e2e]'"
)
return False
class MatrixAdapter(BasePlatformAdapter):
"""Gateway adapter for Matrix (any homeserver)."""
def __init__(self, config: PlatformConfig):
super().__init__(config, Platform.MATRIX)
self._homeserver: str = (
config.extra.get("homeserver", "")
or os.getenv("MATRIX_HOMESERVER", "")
).rstrip("/")
self._access_token: str = config.token or os.getenv("MATRIX_ACCESS_TOKEN", "")
self._user_id: str = (
config.extra.get("user_id", "")
or os.getenv("MATRIX_USER_ID", "")
)
self._password: str = (
config.extra.get("password", "")
or os.getenv("MATRIX_PASSWORD", "")
)
self._encryption: bool = config.extra.get(
"encryption",
os.getenv("MATRIX_ENCRYPTION", "").lower() in ("true", "1", "yes"),
)
self._client: Any = None # nio.AsyncClient
self._sync_task: Optional[asyncio.Task] = None
self._closing = False
self._startup_ts: float = 0.0
# Cache: room_id → bool (is DM)
self._dm_rooms: Dict[str, bool] = {}
# Set of room IDs we've joined
self._joined_rooms: Set[str] = set()
# ------------------------------------------------------------------
# Required overrides
# ------------------------------------------------------------------
async def connect(self) -> bool:
"""Connect to the Matrix homeserver and start syncing."""
import nio
if not self._homeserver:
logger.error("Matrix: homeserver URL not configured")
return False
# Determine store path and ensure it exists.
store_path = str(_STORE_DIR)
_STORE_DIR.mkdir(parents=True, exist_ok=True)
# Create the client.
if self._encryption:
try:
client = nio.AsyncClient(
self._homeserver,
self._user_id or "",
store_path=store_path,
)
logger.info("Matrix: E2EE enabled (store: %s)", store_path)
except Exception as exc:
logger.warning(
"Matrix: failed to create E2EE client (%s), "
"falling back to plain client. Install: "
"pip install 'matrix-nio[e2e]'",
exc,
)
client = nio.AsyncClient(self._homeserver, self._user_id or "")
else:
client = nio.AsyncClient(self._homeserver, self._user_id or "")
self._client = client
# Authenticate.
if self._access_token:
client.access_token = self._access_token
# Resolve user_id if not set.
if not self._user_id:
resp = await client.whoami()
if isinstance(resp, nio.WhoamiResponse):
self._user_id = resp.user_id
client.user_id = resp.user_id
logger.info("Matrix: authenticated as %s", self._user_id)
else:
logger.error(
"Matrix: whoami failed — check MATRIX_ACCESS_TOKEN and MATRIX_HOMESERVER"
)
await client.close()
return False
else:
client.user_id = self._user_id
logger.info("Matrix: using access token for %s", self._user_id)
elif self._password and self._user_id:
resp = await client.login(
self._password,
device_name="Hermes Agent",
)
if isinstance(resp, nio.LoginResponse):
logger.info("Matrix: logged in as %s", self._user_id)
else:
logger.error("Matrix: login failed — %s", getattr(resp, "message", resp))
await client.close()
return False
else:
logger.error("Matrix: need MATRIX_ACCESS_TOKEN or MATRIX_USER_ID + MATRIX_PASSWORD")
await client.close()
return False
# If E2EE is enabled, load the crypto store.
if self._encryption and hasattr(client, "olm"):
try:
if client.should_upload_keys:
await client.keys_upload()
logger.info("Matrix: E2EE crypto initialized")
except Exception as exc:
logger.warning("Matrix: crypto init issue: %s", exc)
# Register event callbacks.
client.add_event_callback(self._on_room_message, nio.RoomMessageText)
client.add_event_callback(self._on_room_message_media, nio.RoomMessageMedia)
client.add_event_callback(self._on_room_message_media, nio.RoomMessageImage)
client.add_event_callback(self._on_room_message_media, nio.RoomMessageAudio)
client.add_event_callback(self._on_room_message_media, nio.RoomMessageVideo)
client.add_event_callback(self._on_room_message_media, nio.RoomMessageFile)
client.add_event_callback(self._on_invite, nio.InviteMemberEvent)
# If E2EE: handle encrypted events.
if self._encryption and hasattr(client, "olm"):
client.add_event_callback(
self._on_room_message, nio.MegolmEvent
)
# Initial sync to catch up, then start background sync.
self._startup_ts = time.time()
self._closing = False
# Do an initial sync to populate room state.
resp = await client.sync(timeout=10000, full_state=True)
if isinstance(resp, nio.SyncResponse):
self._joined_rooms = set(resp.rooms.join.keys())
logger.info(
"Matrix: initial sync complete, joined %d rooms",
len(self._joined_rooms),
)
# Build DM room cache from m.direct account data.
await self._refresh_dm_cache()
else:
logger.warning("Matrix: initial sync returned %s", type(resp).__name__)
# Start the sync loop.
self._sync_task = asyncio.create_task(self._sync_loop())
return True
async def disconnect(self) -> None:
"""Disconnect from Matrix."""
self._closing = True
if self._sync_task and not self._sync_task.done():
self._sync_task.cancel()
try:
await self._sync_task
except (asyncio.CancelledError, Exception):
pass
if self._client:
await self._client.close()
self._client = None
logger.info("Matrix: disconnected")
async def send(
self,
chat_id: str,
content: str,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Send a message to a Matrix room."""
import nio
if not content:
return SendResult(success=True)
formatted = self.format_message(content)
chunks = self.truncate_message(formatted, MAX_MESSAGE_LENGTH)
last_event_id = None
for chunk in chunks:
msg_content: Dict[str, Any] = {
"msgtype": "m.text",
"body": chunk,
}
# Convert markdown to HTML for rich rendering.
html = self._markdown_to_html(chunk)
if html and html != chunk:
msg_content["format"] = "org.matrix.custom.html"
msg_content["formatted_body"] = html
# Reply-to support.
if reply_to:
msg_content["m.relates_to"] = {
"m.in_reply_to": {"event_id": reply_to}
}
# Thread support: if metadata has thread_id, send as threaded reply.
thread_id = (metadata or {}).get("thread_id")
if thread_id:
relates_to = msg_content.get("m.relates_to", {})
relates_to["rel_type"] = "m.thread"
relates_to["event_id"] = thread_id
relates_to["is_falling_back"] = True
if reply_to and "m.in_reply_to" not in relates_to:
relates_to["m.in_reply_to"] = {"event_id": reply_to}
msg_content["m.relates_to"] = relates_to
resp = await self._client.room_send(
chat_id,
"m.room.message",
msg_content,
)
if isinstance(resp, nio.RoomSendResponse):
last_event_id = resp.event_id
else:
err = getattr(resp, "message", str(resp))
logger.error("Matrix: failed to send to %s: %s", chat_id, err)
return SendResult(success=False, error=err)
return SendResult(success=True, message_id=last_event_id)
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
"""Return room name and type (dm/group)."""
name = chat_id
chat_type = "group"
if self._client:
room = self._client.rooms.get(chat_id)
if room:
name = room.display_name or room.canonical_alias or chat_id
# Use DM cache.
if self._dm_rooms.get(chat_id, False):
chat_type = "dm"
elif room.member_count == 2:
chat_type = "dm"
return {"name": name, "type": chat_type}
# ------------------------------------------------------------------
# Optional overrides
# ------------------------------------------------------------------
async def send_typing(
self, chat_id: str, metadata: Optional[Dict[str, Any]] = None
) -> None:
"""Send a typing indicator."""
if self._client:
try:
await self._client.room_typing(chat_id, typing_state=True, timeout=30000)
except Exception:
pass
async def edit_message(
self, chat_id: str, message_id: str, content: str
) -> SendResult:
"""Edit an existing message (via m.replace)."""
import nio
formatted = self.format_message(content)
msg_content: Dict[str, Any] = {
"msgtype": "m.text",
"body": f"* {formatted}",
"m.new_content": {
"msgtype": "m.text",
"body": formatted,
},
"m.relates_to": {
"rel_type": "m.replace",
"event_id": message_id,
},
}
html = self._markdown_to_html(formatted)
if html and html != formatted:
msg_content["m.new_content"]["format"] = "org.matrix.custom.html"
msg_content["m.new_content"]["formatted_body"] = html
msg_content["format"] = "org.matrix.custom.html"
msg_content["formatted_body"] = f"* {html}"
resp = await self._client.room_send(chat_id, "m.room.message", msg_content)
if isinstance(resp, nio.RoomSendResponse):
return SendResult(success=True, message_id=resp.event_id)
return SendResult(success=False, error=getattr(resp, "message", str(resp)))
async def send_image(
self,
chat_id: str,
image_url: str,
caption: Optional[str] = None,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Download an image URL and upload it to Matrix."""
try:
# Try aiohttp first (always available), fall back to httpx
try:
import aiohttp as _aiohttp
async with _aiohttp.ClientSession() as http:
async with http.get(image_url, timeout=_aiohttp.ClientTimeout(total=30)) as resp:
resp.raise_for_status()
data = await resp.read()
ct = resp.content_type or "image/png"
fname = image_url.rsplit("/", 1)[-1].split("?")[0] or "image.png"
except ImportError:
import httpx
async with httpx.AsyncClient() as http:
resp = await http.get(image_url, follow_redirects=True, timeout=30)
resp.raise_for_status()
data = resp.content
ct = resp.headers.get("content-type", "image/png")
fname = image_url.rsplit("/", 1)[-1].split("?")[0] or "image.png"
except Exception as exc:
logger.warning("Matrix: failed to download image %s: %s", image_url, exc)
return await self.send(chat_id, f"{caption or ''}\n{image_url}".strip(), reply_to)
return await self._upload_and_send(chat_id, data, fname, ct, "m.image", caption, reply_to, metadata)
async def send_image_file(
self,
chat_id: str,
image_path: str,
caption: Optional[str] = None,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Upload a local image file to Matrix."""
return await self._send_local_file(chat_id, image_path, "m.image", caption, reply_to, metadata=metadata)
async def send_document(
self,
chat_id: str,
file_path: str,
caption: Optional[str] = None,
file_name: Optional[str] = None,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Upload a local file as a document."""
return await self._send_local_file(chat_id, file_path, "m.file", caption, reply_to, file_name, metadata)
async def send_voice(
self,
chat_id: str,
audio_path: str,
caption: Optional[str] = None,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Upload an audio file as a voice message."""
return await self._send_local_file(chat_id, audio_path, "m.audio", caption, reply_to, metadata=metadata)
async def send_video(
self,
chat_id: str,
video_path: str,
caption: Optional[str] = None,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Upload a video file."""
return await self._send_local_file(chat_id, video_path, "m.video", caption, reply_to, metadata=metadata)
def format_message(self, content: str) -> str:
"""Pass-through — Matrix supports standard Markdown natively."""
# Strip image markdown; media is uploaded separately.
content = re.sub(r"!\[([^\]]*)\]\(([^)]+)\)", r"\2", content)
return content
# ------------------------------------------------------------------
# File helpers
# ------------------------------------------------------------------
async def _upload_and_send(
self,
room_id: str,
data: bytes,
filename: str,
content_type: str,
msgtype: str,
caption: Optional[str] = None,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Upload bytes to Matrix and send as a media message."""
import nio
# Upload to homeserver.
resp = await self._client.upload(
data,
content_type=content_type,
filename=filename,
)
if not isinstance(resp, nio.UploadResponse):
err = getattr(resp, "message", str(resp))
logger.error("Matrix: upload failed: %s", err)
return SendResult(success=False, error=err)
mxc_url = resp.content_uri
# Build media message content.
msg_content: Dict[str, Any] = {
"msgtype": msgtype,
"body": caption or filename,
"url": mxc_url,
"info": {
"mimetype": content_type,
"size": len(data),
},
}
if reply_to:
msg_content["m.relates_to"] = {
"m.in_reply_to": {"event_id": reply_to}
}
thread_id = (metadata or {}).get("thread_id")
if thread_id:
relates_to = msg_content.get("m.relates_to", {})
relates_to["rel_type"] = "m.thread"
relates_to["event_id"] = thread_id
relates_to["is_falling_back"] = True
msg_content["m.relates_to"] = relates_to
resp2 = await self._client.room_send(room_id, "m.room.message", msg_content)
if isinstance(resp2, nio.RoomSendResponse):
return SendResult(success=True, message_id=resp2.event_id)
return SendResult(success=False, error=getattr(resp2, "message", str(resp2)))
async def _send_local_file(
self,
room_id: str,
file_path: str,
msgtype: str,
caption: Optional[str] = None,
reply_to: Optional[str] = None,
file_name: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Read a local file and upload it."""
p = Path(file_path)
if not p.exists():
return await self.send(
room_id, f"{caption or ''}\n(file not found: {file_path})", reply_to
)
fname = file_name or p.name
ct = mimetypes.guess_type(fname)[0] or "application/octet-stream"
data = p.read_bytes()
return await self._upload_and_send(room_id, data, fname, ct, msgtype, caption, reply_to, metadata)
# ------------------------------------------------------------------
# Sync loop
# ------------------------------------------------------------------
async def _sync_loop(self) -> None:
"""Continuously sync with the homeserver."""
while not self._closing:
try:
await self._client.sync(timeout=30000)
except asyncio.CancelledError:
return
except Exception as exc:
if self._closing:
return
logger.warning("Matrix: sync error: %s — retrying in 5s", exc)
await asyncio.sleep(5)
# ------------------------------------------------------------------
# Event callbacks
# ------------------------------------------------------------------
async def _on_room_message(self, room: Any, event: Any) -> None:
"""Handle incoming text messages (and decrypted megolm events)."""
import nio
# Ignore own messages.
if event.sender == self._user_id:
return
# Startup grace: ignore old messages from initial sync.
event_ts = getattr(event, "server_timestamp", 0) / 1000.0
if event_ts and event_ts < self._startup_ts - _STARTUP_GRACE_SECONDS:
return
# Handle decrypted MegolmEvents — extract the inner event.
if isinstance(event, nio.MegolmEvent):
# Failed to decrypt.
logger.warning(
"Matrix: could not decrypt event %s in %s",
event.event_id, room.room_id,
)
return
# Skip edits (m.replace relation).
source_content = getattr(event, "source", {}).get("content", {})
relates_to = source_content.get("m.relates_to", {})
if relates_to.get("rel_type") == "m.replace":
return
body = getattr(event, "body", "") or ""
if not body:
return
# Determine chat type.
is_dm = self._dm_rooms.get(room.room_id, False)
if not is_dm and room.member_count == 2:
is_dm = True
chat_type = "dm" if is_dm else "group"
# Thread support.
thread_id = None
if relates_to.get("rel_type") == "m.thread":
thread_id = relates_to.get("event_id")
# Reply-to detection.
reply_to = None
in_reply_to = relates_to.get("m.in_reply_to", {})
if in_reply_to:
reply_to = in_reply_to.get("event_id")
# Strip reply fallback from body (Matrix prepends "> ..." lines).
if reply_to and body.startswith("> "):
lines = body.split("\n")
stripped = []
past_fallback = False
for line in lines:
if not past_fallback:
if line.startswith("> ") or line == ">":
continue
if line == "":
past_fallback = True
continue
past_fallback = True
stripped.append(line)
body = "\n".join(stripped) if stripped else body
# Message type.
msg_type = MessageType.TEXT
if body.startswith("!") or body.startswith("/"):
msg_type = MessageType.COMMAND
source = self.build_source(
chat_id=room.room_id,
chat_type=chat_type,
user_id=event.sender,
user_name=self._get_display_name(room, event.sender),
thread_id=thread_id,
)
msg_event = MessageEvent(
text=body,
message_type=msg_type,
source=source,
raw_message=getattr(event, "source", {}),
message_id=event.event_id,
reply_to=reply_to,
)
await self.handle_message(msg_event)
async def _on_room_message_media(self, room: Any, event: Any) -> None:
"""Handle incoming media messages (images, audio, video, files)."""
import nio
# Ignore own messages.
if event.sender == self._user_id:
return
# Startup grace.
event_ts = getattr(event, "server_timestamp", 0) / 1000.0
if event_ts and event_ts < self._startup_ts - _STARTUP_GRACE_SECONDS:
return
body = getattr(event, "body", "") or ""
url = getattr(event, "url", "")
# Convert mxc:// to HTTP URL for downstream processing.
http_url = ""
if url and url.startswith("mxc://"):
http_url = self._mxc_to_http(url)
# Determine message type from event class.
media_type = "document"
msg_type = MessageType.DOCUMENT
if isinstance(event, nio.RoomMessageImage):
msg_type = MessageType.PHOTO
media_type = "image"
elif isinstance(event, nio.RoomMessageAudio):
msg_type = MessageType.AUDIO
media_type = "audio"
elif isinstance(event, nio.RoomMessageVideo):
msg_type = MessageType.VIDEO
media_type = "video"
is_dm = self._dm_rooms.get(room.room_id, False)
if not is_dm and room.member_count == 2:
is_dm = True
chat_type = "dm" if is_dm else "group"
# Thread/reply detection.
source_content = getattr(event, "source", {}).get("content", {})
relates_to = source_content.get("m.relates_to", {})
thread_id = None
if relates_to.get("rel_type") == "m.thread":
thread_id = relates_to.get("event_id")
source = self.build_source(
chat_id=room.room_id,
chat_type=chat_type,
user_id=event.sender,
user_name=self._get_display_name(room, event.sender),
thread_id=thread_id,
)
msg_event = MessageEvent(
text=body,
message_type=msg_type,
source=source,
raw_message=getattr(event, "source", {}),
message_id=event.event_id,
media_urls=[http_url] if http_url else None,
media_types=[media_type] if http_url else None,
)
await self.handle_message(msg_event)
async def _on_invite(self, room: Any, event: Any) -> None:
"""Auto-join rooms when invited."""
import nio
if not isinstance(event, nio.InviteMemberEvent):
return
# Only process invites directed at us.
if event.state_key != self._user_id:
return
if event.membership != "invite":
return
logger.info(
"Matrix: invited to %s by %s — joining",
room.room_id, event.sender,
)
try:
resp = await self._client.join(room.room_id)
if isinstance(resp, nio.JoinResponse):
self._joined_rooms.add(room.room_id)
logger.info("Matrix: joined %s", room.room_id)
# Refresh DM cache since new room may be a DM.
await self._refresh_dm_cache()
else:
logger.warning(
"Matrix: failed to join %s: %s",
room.room_id, getattr(resp, "message", resp),
)
except Exception as exc:
logger.warning("Matrix: error joining %s: %s", room.room_id, exc)
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
async def _refresh_dm_cache(self) -> None:
"""Refresh the DM room cache from m.direct account data.
Tries the account_data API first, then falls back to parsing
the sync response's account_data for robustness.
"""
if not self._client:
return
dm_data: Optional[Dict] = None
# Primary: try the dedicated account data endpoint.
try:
resp = await self._client.get_account_data("m.direct")
if hasattr(resp, "content"):
dm_data = resp.content
elif isinstance(resp, dict):
dm_data = resp
except Exception as exc:
logger.debug("Matrix: get_account_data('m.direct') failed: %s — trying sync fallback", exc)
# Fallback: parse from the client's account_data store (populated by sync).
if dm_data is None:
try:
# matrix-nio stores account data events on the client object
ad = getattr(self._client, "account_data", None)
if ad and isinstance(ad, dict) and "m.direct" in ad:
event = ad["m.direct"]
if hasattr(event, "content"):
dm_data = event.content
elif isinstance(event, dict):
dm_data = event
except Exception:
pass
if dm_data is None:
return
dm_room_ids: Set[str] = set()
for user_id, rooms in dm_data.items():
if isinstance(rooms, list):
dm_room_ids.update(rooms)
self._dm_rooms = {
rid: (rid in dm_room_ids)
for rid in self._joined_rooms
}
def _get_display_name(self, room: Any, user_id: str) -> str:
"""Get a user's display name in a room, falling back to user_id."""
if room and hasattr(room, "users"):
user = room.users.get(user_id)
if user and getattr(user, "display_name", None):
return user.display_name
# Strip the @...:server format to just the localpart.
if user_id.startswith("@") and ":" in user_id:
return user_id[1:].split(":")[0]
return user_id
def _mxc_to_http(self, mxc_url: str) -> str:
"""Convert mxc://server/media_id to an HTTP download URL."""
# mxc://matrix.org/abc123 → https://matrix.org/_matrix/client/v1/media/download/matrix.org/abc123
# Uses the authenticated client endpoint (spec v1.11+) instead of the
# deprecated /_matrix/media/v3/download/ path.
if not mxc_url.startswith("mxc://"):
return mxc_url
parts = mxc_url[6:] # strip mxc://
# Use our homeserver for download (federation handles the rest).
return f"{self._homeserver}/_matrix/client/v1/media/download/{parts}"
def _markdown_to_html(self, text: str) -> str:
"""Convert Markdown to Matrix-compatible HTML.
Uses a simple conversion for common patterns. For full fidelity
a markdown-it style library could be used, but this covers the
common cases without an extra dependency.
"""
try:
import markdown
html = markdown.markdown(
text,
extensions=["fenced_code", "tables", "nl2br"],
)
# Strip wrapping <p> tags for single-paragraph messages.
if html.count("<p>") == 1:
html = html.replace("<p>", "").replace("</p>", "")
return html
except ImportError:
pass
# Minimal fallback: just handle bold, italic, code.
html = text
html = re.sub(r"\*\*(.+?)\*\*", r"<strong>\1</strong>", html)
html = re.sub(r"\*(.+?)\*", r"<em>\1</em>", html)
html = re.sub(r"`([^`]+)`", r"<code>\1</code>", html)
html = re.sub(r"\n", r"<br>", html)
return html

View File

@@ -0,0 +1,663 @@
"""Mattermost gateway adapter.
Connects to a self-hosted (or cloud) Mattermost instance via its REST API
(v4) and WebSocket for real-time events. No external Mattermost library
required — uses aiohttp which is already a Hermes dependency.
Environment variables:
MATTERMOST_URL Server URL (e.g. https://mm.example.com)
MATTERMOST_TOKEN Bot token or personal-access token
MATTERMOST_ALLOWED_USERS Comma-separated user IDs
MATTERMOST_HOME_CHANNEL Channel ID for cron/notification delivery
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import re
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import (
BasePlatformAdapter,
MessageEvent,
MessageType,
SendResult,
)
logger = logging.getLogger(__name__)
# Mattermost post size limit (server default is 16383, but 4000 is the
# practical limit for readable messages — matching OpenClaw's choice).
MAX_POST_LENGTH = 4000
# Channel type codes returned by the Mattermost API.
_CHANNEL_TYPE_MAP = {
"D": "dm",
"G": "group",
"P": "group", # private channel → treat as group
"O": "channel",
}
# Reconnect parameters (exponential backoff).
_RECONNECT_BASE_DELAY = 2.0
_RECONNECT_MAX_DELAY = 60.0
_RECONNECT_JITTER = 0.2
def check_mattermost_requirements() -> bool:
"""Return True if the Mattermost adapter can be used."""
token = os.getenv("MATTERMOST_TOKEN", "")
url = os.getenv("MATTERMOST_URL", "")
if not token:
logger.debug("Mattermost: MATTERMOST_TOKEN not set")
return False
if not url:
logger.warning("Mattermost: MATTERMOST_URL not set")
return False
try:
import aiohttp # noqa: F401
return True
except ImportError:
logger.warning("Mattermost: aiohttp not installed")
return False
class MattermostAdapter(BasePlatformAdapter):
"""Gateway adapter for Mattermost (self-hosted or cloud)."""
def __init__(self, config: PlatformConfig):
super().__init__(config, Platform.MATTERMOST)
self._base_url: str = (
config.extra.get("url", "")
or os.getenv("MATTERMOST_URL", "")
).rstrip("/")
self._token: str = config.token or os.getenv("MATTERMOST_TOKEN", "")
self._bot_user_id: str = ""
self._bot_username: str = ""
# aiohttp session + websocket handle
self._session: Any = None # aiohttp.ClientSession
self._ws: Any = None # aiohttp.ClientWebSocketResponse
self._ws_task: Optional[asyncio.Task] = None
self._reconnect_task: Optional[asyncio.Task] = None
self._closing = False
# Reply mode: "thread" to nest replies, "off" for flat messages.
self._reply_mode: str = (
config.extra.get("reply_mode", "")
or os.getenv("MATTERMOST_REPLY_MODE", "off")
).lower()
# Dedup cache: post_id → timestamp (prevent reprocessing)
self._seen_posts: Dict[str, float] = {}
self._SEEN_MAX = 2000
self._SEEN_TTL = 300 # 5 minutes
# ------------------------------------------------------------------
# HTTP helpers
# ------------------------------------------------------------------
def _headers(self) -> Dict[str, str]:
return {
"Authorization": f"Bearer {self._token}",
"Content-Type": "application/json",
}
async def _api_get(self, path: str) -> Dict[str, Any]:
"""GET /api/v4/{path}."""
import aiohttp
url = f"{self._base_url}/api/v4/{path.lstrip('/')}"
try:
async with self._session.get(url, headers=self._headers()) as resp:
if resp.status >= 400:
body = await resp.text()
logger.error("MM API GET %s%s: %s", path, resp.status, body[:200])
return {}
return await resp.json()
except aiohttp.ClientError as exc:
logger.error("MM API GET %s network error: %s", path, exc)
return {}
async def _api_post(
self, path: str, payload: Dict[str, Any]
) -> Dict[str, Any]:
"""POST /api/v4/{path} with JSON body."""
import aiohttp
url = f"{self._base_url}/api/v4/{path.lstrip('/')}"
try:
async with self._session.post(
url, headers=self._headers(), json=payload
) as resp:
if resp.status >= 400:
body = await resp.text()
logger.error("MM API POST %s%s: %s", path, resp.status, body[:200])
return {}
return await resp.json()
except aiohttp.ClientError as exc:
logger.error("MM API POST %s network error: %s", path, exc)
return {}
async def _api_put(
self, path: str, payload: Dict[str, Any]
) -> Dict[str, Any]:
"""PUT /api/v4/{path} with JSON body."""
import aiohttp
url = f"{self._base_url}/api/v4/{path.lstrip('/')}"
try:
async with self._session.put(
url, headers=self._headers(), json=payload
) as resp:
if resp.status >= 400:
body = await resp.text()
logger.error("MM API PUT %s%s: %s", path, resp.status, body[:200])
return {}
return await resp.json()
except aiohttp.ClientError as exc:
logger.error("MM API PUT %s network error: %s", path, exc)
return {}
async def _upload_file(
self, channel_id: str, file_data: bytes, filename: str, content_type: str = "application/octet-stream"
) -> Optional[str]:
"""Upload a file and return its file ID, or None on failure."""
import aiohttp
url = f"{self._base_url}/api/v4/files"
form = aiohttp.FormData()
form.add_field("channel_id", channel_id)
form.add_field(
"files",
file_data,
filename=filename,
content_type=content_type,
)
headers = {"Authorization": f"Bearer {self._token}"}
async with self._session.post(url, headers=headers, data=form) as resp:
if resp.status >= 400:
body = await resp.text()
logger.error("MM file upload → %s: %s", resp.status, body[:200])
return None
data = await resp.json()
infos = data.get("file_infos", [])
return infos[0]["id"] if infos else None
# ------------------------------------------------------------------
# Required overrides
# ------------------------------------------------------------------
async def connect(self) -> bool:
"""Connect to Mattermost and start the WebSocket listener."""
import aiohttp
if not self._base_url or not self._token:
logger.error("Mattermost: URL or token not configured")
return False
self._session = aiohttp.ClientSession()
self._closing = False
# Verify credentials and fetch bot identity.
me = await self._api_get("users/me")
if not me or "id" not in me:
logger.error("Mattermost: failed to authenticate — check MATTERMOST_TOKEN and MATTERMOST_URL")
await self._session.close()
return False
self._bot_user_id = me["id"]
self._bot_username = me.get("username", "")
logger.info(
"Mattermost: authenticated as @%s (%s) on %s",
self._bot_username,
self._bot_user_id,
self._base_url,
)
# Start WebSocket in background.
self._ws_task = asyncio.create_task(self._ws_loop())
return True
async def disconnect(self) -> None:
"""Disconnect from Mattermost."""
self._closing = True
if self._ws_task and not self._ws_task.done():
self._ws_task.cancel()
try:
await self._ws_task
except (asyncio.CancelledError, Exception):
pass
if self._reconnect_task and not self._reconnect_task.done():
self._reconnect_task.cancel()
if self._ws:
await self._ws.close()
self._ws = None
if self._session and not self._session.closed:
await self._session.close()
logger.info("Mattermost: disconnected")
async def send(
self,
chat_id: str,
content: str,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Send a message (or multiple chunks) to a channel."""
if not content:
return SendResult(success=True)
formatted = self.format_message(content)
chunks = self.truncate_message(formatted, MAX_POST_LENGTH)
last_id = None
for chunk in chunks:
payload: Dict[str, Any] = {
"channel_id": chat_id,
"message": chunk,
}
# Thread support: reply_to is the root post ID.
if reply_to and self._reply_mode == "thread":
payload["root_id"] = reply_to
data = await self._api_post("posts", payload)
if not data or "id" not in data:
return SendResult(success=False, error="Failed to create post")
last_id = data["id"]
return SendResult(success=True, message_id=last_id)
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
"""Return channel name and type."""
data = await self._api_get(f"channels/{chat_id}")
if not data:
return {"name": chat_id, "type": "channel"}
ch_type = _CHANNEL_TYPE_MAP.get(data.get("type", "O"), "channel")
display_name = data.get("display_name") or data.get("name") or chat_id
return {"name": display_name, "type": ch_type}
# ------------------------------------------------------------------
# Optional overrides
# ------------------------------------------------------------------
async def send_typing(
self, chat_id: str, metadata: Optional[Dict[str, Any]] = None
) -> None:
"""Send a typing indicator."""
await self._api_post(
f"users/{self._bot_user_id}/typing",
{"channel_id": chat_id},
)
async def edit_message(
self, chat_id: str, message_id: str, content: str
) -> SendResult:
"""Edit an existing post."""
formatted = self.format_message(content)
data = await self._api_put(
f"posts/{message_id}/patch",
{"message": formatted},
)
if not data or "id" not in data:
return SendResult(success=False, error="Failed to edit post")
return SendResult(success=True, message_id=data["id"])
async def send_image(
self,
chat_id: str,
image_url: str,
caption: Optional[str] = None,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Download an image and upload it as a file attachment."""
return await self._send_url_as_file(
chat_id, image_url, caption, reply_to, "image"
)
async def send_image_file(
self,
chat_id: str,
image_path: str,
caption: Optional[str] = None,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Upload a local image file."""
return await self._send_local_file(
chat_id, image_path, caption, reply_to
)
async def send_document(
self,
chat_id: str,
file_path: str,
caption: Optional[str] = None,
file_name: Optional[str] = None,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Upload a local file as a document."""
return await self._send_local_file(
chat_id, file_path, caption, reply_to, file_name
)
async def send_voice(
self,
chat_id: str,
audio_path: str,
caption: Optional[str] = None,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Upload an audio file."""
return await self._send_local_file(
chat_id, audio_path, caption, reply_to
)
async def send_video(
self,
chat_id: str,
video_path: str,
caption: Optional[str] = None,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Upload a video file."""
return await self._send_local_file(
chat_id, video_path, caption, reply_to
)
def format_message(self, content: str) -> str:
"""Mattermost uses standard Markdown — mostly pass through.
Strip image markdown into plain links (files are uploaded separately).
"""
# Convert ![alt](url) to just the URL — Mattermost renders
# image URLs as inline previews automatically.
content = re.sub(r"!\[([^\]]*)\]\(([^)]+)\)", r"\2", content)
return content
# ------------------------------------------------------------------
# File helpers
# ------------------------------------------------------------------
async def _send_url_as_file(
self,
chat_id: str,
url: str,
caption: Optional[str],
reply_to: Optional[str],
kind: str = "file",
) -> SendResult:
"""Download a URL and upload it as a file attachment."""
import aiohttp
try:
async with self._session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as resp:
if resp.status >= 400:
# Fall back to sending the URL as text.
return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to)
file_data = await resp.read()
ct = resp.content_type or "application/octet-stream"
# Derive filename from URL.
fname = url.rsplit("/", 1)[-1].split("?")[0] or f"{kind}.png"
except Exception as exc:
logger.warning("Mattermost: failed to download %s: %s", url, exc)
return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to)
file_id = await self._upload_file(chat_id, file_data, fname, ct)
if not file_id:
return await self.send(chat_id, f"{caption or ''}\n{url}".strip(), reply_to)
payload: Dict[str, Any] = {
"channel_id": chat_id,
"message": caption or "",
"file_ids": [file_id],
}
if reply_to and self._reply_mode == "thread":
payload["root_id"] = reply_to
data = await self._api_post("posts", payload)
if not data or "id" not in data:
return SendResult(success=False, error="Failed to post with file")
return SendResult(success=True, message_id=data["id"])
async def _send_local_file(
self,
chat_id: str,
file_path: str,
caption: Optional[str],
reply_to: Optional[str],
file_name: Optional[str] = None,
) -> SendResult:
"""Upload a local file and attach it to a post."""
import mimetypes
p = Path(file_path)
if not p.exists():
return await self.send(
chat_id, f"{caption or ''}\n(file not found: {file_path})", reply_to
)
fname = file_name or p.name
ct = mimetypes.guess_type(fname)[0] or "application/octet-stream"
file_data = p.read_bytes()
file_id = await self._upload_file(chat_id, file_data, fname, ct)
if not file_id:
return SendResult(success=False, error="File upload failed")
payload: Dict[str, Any] = {
"channel_id": chat_id,
"message": caption or "",
"file_ids": [file_id],
}
if reply_to and self._reply_mode == "thread":
payload["root_id"] = reply_to
data = await self._api_post("posts", payload)
if not data or "id" not in data:
return SendResult(success=False, error="Failed to post with file")
return SendResult(success=True, message_id=data["id"])
# ------------------------------------------------------------------
# WebSocket
# ------------------------------------------------------------------
async def _ws_loop(self) -> None:
"""Connect to the WebSocket and listen for events, reconnecting on failure."""
delay = _RECONNECT_BASE_DELAY
while not self._closing:
try:
await self._ws_connect_and_listen()
# Clean disconnect — reset delay.
delay = _RECONNECT_BASE_DELAY
except asyncio.CancelledError:
return
except Exception as exc:
if self._closing:
return
logger.warning("Mattermost WS error: %s — reconnecting in %.0fs", exc, delay)
if self._closing:
return
# Exponential backoff with jitter.
import random
jitter = delay * _RECONNECT_JITTER * random.random()
await asyncio.sleep(delay + jitter)
delay = min(delay * 2, _RECONNECT_MAX_DELAY)
async def _ws_connect_and_listen(self) -> None:
"""Single WebSocket session: connect, authenticate, process events."""
# Build WS URL: https:// → wss://, http:// → ws://
ws_url = re.sub(r"^http", "ws", self._base_url) + "/api/v4/websocket"
logger.info("Mattermost: connecting to %s", ws_url)
self._ws = await self._session.ws_connect(ws_url, heartbeat=30.0)
# Authenticate via the WebSocket.
auth_msg = {
"seq": 1,
"action": "authentication_challenge",
"data": {"token": self._token},
}
await self._ws.send_json(auth_msg)
logger.info("Mattermost: WebSocket connected and authenticated")
async for raw_msg in self._ws:
if self._closing:
return
if raw_msg.type in (
raw_msg.type.TEXT,
raw_msg.type.BINARY,
):
try:
event = json.loads(raw_msg.data)
except (json.JSONDecodeError, TypeError):
continue
await self._handle_ws_event(event)
elif raw_msg.type in (
raw_msg.type.ERROR,
raw_msg.type.CLOSE,
raw_msg.type.CLOSING,
raw_msg.type.CLOSED,
):
logger.info("Mattermost: WebSocket closed (%s)", raw_msg.type)
break
async def _handle_ws_event(self, event: Dict[str, Any]) -> None:
"""Process a single WebSocket event."""
event_type = event.get("event")
if event_type != "posted":
return
data = event.get("data", {})
raw_post_str = data.get("post")
if not raw_post_str:
return
try:
post = json.loads(raw_post_str)
except (json.JSONDecodeError, TypeError):
return
# Ignore own messages.
if post.get("user_id") == self._bot_user_id:
return
# Ignore system posts.
if post.get("type"):
return
post_id = post.get("id", "")
# Dedup.
self._prune_seen()
if post_id in self._seen_posts:
return
self._seen_posts[post_id] = time.time()
# Build message event.
channel_id = post.get("channel_id", "")
channel_type_raw = data.get("channel_type", "O")
chat_type = _CHANNEL_TYPE_MAP.get(channel_type_raw, "channel")
# For DMs, user_id is sufficient. For channels, check for @mention.
message_text = post.get("message", "")
# Resolve sender info.
sender_id = post.get("user_id", "")
sender_name = data.get("sender_name", "").lstrip("@") or sender_id
# Thread support: if the post is in a thread, use root_id.
thread_id = post.get("root_id") or None
# Determine message type.
file_ids = post.get("file_ids") or []
msg_type = MessageType.TEXT
if message_text.startswith("/"):
msg_type = MessageType.COMMAND
# Download file attachments immediately (URLs require auth headers
# that downstream tools won't have).
media_urls: List[str] = []
media_types: List[str] = []
for fid in file_ids:
try:
file_info = await self._api_get(f"files/{fid}/info")
fname = file_info.get("name", f"file_{fid}")
ext = Path(fname).suffix or ""
mime = file_info.get("mime_type", "application/octet-stream")
import aiohttp
dl_url = f"{self._base_url}/api/v4/files/{fid}"
async with self._session.get(
dl_url,
headers={"Authorization": f"Bearer {self._token}"},
timeout=aiohttp.ClientTimeout(total=30),
) as resp:
if resp.status < 400:
file_data = await resp.read()
from gateway.platforms.base import cache_image_from_bytes, cache_document_from_bytes
if mime.startswith("image/"):
local_path = cache_image_from_bytes(file_data, ext or ".png")
media_urls.append(local_path)
media_types.append("image")
elif mime.startswith("audio/"):
from gateway.platforms.base import cache_audio_from_bytes
local_path = cache_audio_from_bytes(file_data, ext or ".ogg")
media_urls.append(local_path)
media_types.append("audio")
else:
local_path = cache_document_from_bytes(file_data, fname)
media_urls.append(local_path)
media_types.append("document")
else:
logger.warning("Mattermost: failed to download file %s: HTTP %s", fid, resp.status)
except Exception as exc:
logger.warning("Mattermost: error downloading file %s: %s", fid, exc)
source = self.build_source(
chat_id=channel_id,
chat_type=chat_type,
user_id=sender_id,
user_name=sender_name,
thread_id=thread_id,
)
msg_event = MessageEvent(
text=message_text,
message_type=msg_type,
source=source,
raw_message=post,
message_id=post_id,
media_urls=media_urls if media_urls else None,
media_types=media_types if media_types else None,
)
await self.handle_message(msg_event)
def _prune_seen(self) -> None:
"""Remove expired entries from the dedup cache."""
if len(self._seen_posts) < self._SEEN_MAX:
return
now = time.time()
self._seen_posts = {
pid: ts
for pid, ts in self._seen_posts.items()
if now - ts < self._SEEN_TTL
}

View File

@@ -1147,6 +1147,20 @@ class GatewayRunner:
return None
return DingTalkAdapter(config)
elif platform == Platform.MATTERMOST:
from gateway.platforms.mattermost import MattermostAdapter, check_mattermost_requirements
if not check_mattermost_requirements():
logger.warning("Mattermost: MATTERMOST_TOKEN or MATTERMOST_URL not set, or aiohttp missing")
return None
return MattermostAdapter(config)
elif platform == Platform.MATRIX:
from gateway.platforms.matrix import MatrixAdapter, check_matrix_requirements
if not check_matrix_requirements():
logger.warning("Matrix: matrix-nio not installed or credentials not set. Run: pip install 'matrix-nio[e2e]'")
return None
return MatrixAdapter(config)
return None
def _is_user_authorized(self, source: SessionSource) -> bool:
@@ -1178,6 +1192,8 @@ class GatewayRunner:
Platform.SIGNAL: "SIGNAL_ALLOWED_USERS",
Platform.EMAIL: "EMAIL_ALLOWED_USERS",
Platform.SMS: "SMS_ALLOWED_USERS",
Platform.MATTERMOST: "MATTERMOST_ALLOWED_USERS",
Platform.MATRIX: "MATRIX_ALLOWED_USERS",
}
platform_allow_all_map = {
Platform.TELEGRAM: "TELEGRAM_ALLOW_ALL_USERS",
@@ -1187,6 +1203,8 @@ class GatewayRunner:
Platform.SIGNAL: "SIGNAL_ALLOW_ALL_USERS",
Platform.EMAIL: "EMAIL_ALLOW_ALL_USERS",
Platform.SMS: "SMS_ALLOW_ALL_USERS",
Platform.MATTERMOST: "MATTERMOST_ALLOW_ALL_USERS",
Platform.MATRIX: "MATRIX_ALLOW_ALL_USERS",
}
# Per-platform allow-all flag (e.g., DISCORD_ALLOW_ALL_USERS=true)

View File

@@ -37,6 +37,8 @@ _EXTRA_ENV_KEYS = frozenset({
"DINGTALK_CLIENT_ID", "DINGTALK_CLIENT_SECRET",
"TERMINAL_ENV", "TERMINAL_SSH_KEY", "TERMINAL_SSH_PORT",
"WHATSAPP_MODE", "WHATSAPP_ENABLED",
"MATTERMOST_HOME_CHANNEL", "MATTERMOST_REPLY_MODE",
"MATRIX_PASSWORD", "MATRIX_ENCRYPTION", "MATRIX_HOME_ROOM",
})
import yaml
@@ -692,6 +694,55 @@ OPTIONAL_ENV_VARS = {
"password": True,
"category": "messaging",
},
"MATTERMOST_URL": {
"description": "Mattermost server URL (e.g. https://mm.example.com)",
"prompt": "Mattermost server URL",
"url": "https://mattermost.com/deploy/",
"password": False,
"category": "messaging",
},
"MATTERMOST_TOKEN": {
"description": "Mattermost bot token or personal access token",
"prompt": "Mattermost bot token",
"url": None,
"password": True,
"category": "messaging",
},
"MATTERMOST_ALLOWED_USERS": {
"description": "Comma-separated Mattermost user IDs allowed to use the bot",
"prompt": "Allowed Mattermost user IDs (comma-separated)",
"url": None,
"password": False,
"category": "messaging",
},
"MATRIX_HOMESERVER": {
"description": "Matrix homeserver URL (e.g. https://matrix.example.org)",
"prompt": "Matrix homeserver URL",
"url": "https://matrix.org/ecosystem/servers/",
"password": False,
"category": "messaging",
},
"MATRIX_ACCESS_TOKEN": {
"description": "Matrix access token (preferred over password login)",
"prompt": "Matrix access token",
"url": None,
"password": True,
"category": "messaging",
},
"MATRIX_USER_ID": {
"description": "Matrix user ID (e.g. @hermes:example.org)",
"prompt": "Matrix user ID (@user:server)",
"url": None,
"password": False,
"category": "messaging",
},
"MATRIX_ALLOWED_USERS": {
"description": "Comma-separated Matrix user IDs allowed to use the bot (@user:server format)",
"prompt": "Allowed Matrix user IDs (comma-separated)",
"url": None,
"password": False,
"category": "messaging",
},
"GATEWAY_ALLOW_ALL_USERS": {
"description": "Allow all users to interact with messaging bots (true/false). Default: false.",
"prompt": "Allow all users (true/false)",

View File

@@ -1001,6 +1001,64 @@ _PLATFORMS = [
"help": "Paste your member ID from step 7 above."},
],
},
{
"key": "matrix",
"label": "Matrix",
"emoji": "🔐",
"token_var": "MATRIX_ACCESS_TOKEN",
"setup_instructions": [
"1. Works with any Matrix homeserver (self-hosted Synapse/Conduit/Dendrite or matrix.org)",
"2. Create a bot user on your homeserver, or use your own account",
"3. Get an access token: Element → Settings → Help & About → Access Token",
" Or via API: curl -X POST https://your-server/_matrix/client/v3/login \\",
" -d '{\"type\":\"m.login.password\",\"user\":\"@bot:server\",\"password\":\"...\"}'",
"4. Alternatively, provide user ID + password and Hermes will log in directly",
"5. For E2EE: set MATRIX_ENCRYPTION=true (requires pip install 'matrix-nio[e2e]')",
"6. To find your user ID: it's @username:your-server (shown in Element profile)",
],
"vars": [
{"name": "MATRIX_HOMESERVER", "prompt": "Homeserver URL (e.g. https://matrix.example.org)", "password": False,
"help": "Your Matrix homeserver URL. Works with any self-hosted instance."},
{"name": "MATRIX_ACCESS_TOKEN", "prompt": "Access token (leave empty to use password login instead)", "password": True,
"help": "Paste your access token, or leave empty and provide user ID + password below."},
{"name": "MATRIX_USER_ID", "prompt": "User ID (@bot:server — required for password login)", "password": False,
"help": "Full Matrix user ID, e.g. @hermes:matrix.example.org"},
{"name": "MATRIX_ALLOWED_USERS", "prompt": "Allowed user IDs (comma-separated, e.g. @you:server)", "password": False,
"is_allowlist": True,
"help": "Matrix user IDs who can interact with the bot."},
{"name": "MATRIX_HOME_ROOM", "prompt": "Home room ID (for cron/notification delivery, or empty to set later with /set-home)", "password": False,
"help": "Room ID (e.g. !abc123:server) for delivering cron results and notifications."},
],
},
{
"key": "mattermost",
"label": "Mattermost",
"emoji": "💬",
"token_var": "MATTERMOST_TOKEN",
"setup_instructions": [
"1. In Mattermost: Integrations → Bot Accounts → Add Bot Account",
" (System Console → Integrations → Bot Accounts must be enabled)",
"2. Give it a username (e.g. hermes) and copy the bot token",
"3. Works with any self-hosted Mattermost instance — enter your server URL",
"4. To find your user ID: click your avatar (top-left) → Profile",
" Your user ID is displayed there — click it to copy.",
" ⚠ This is NOT your username — it's a 26-character alphanumeric ID.",
"5. To get a channel ID: click the channel name → View Info → copy the ID",
],
"vars": [
{"name": "MATTERMOST_URL", "prompt": "Server URL (e.g. https://mm.example.com)", "password": False,
"help": "Your Mattermost server URL. Works with any self-hosted instance."},
{"name": "MATTERMOST_TOKEN", "prompt": "Bot token", "password": True,
"help": "Paste the bot token from step 2 above."},
{"name": "MATTERMOST_ALLOWED_USERS", "prompt": "Allowed user IDs (comma-separated)", "password": False,
"is_allowlist": True,
"help": "Your Mattermost user ID from step 4 above."},
{"name": "MATTERMOST_HOME_CHANNEL", "prompt": "Home channel ID (for cron/notification delivery, or empty to set later with /set-home)", "password": False,
"help": "Channel ID where Hermes delivers cron results and notifications."},
{"name": "MATTERMOST_REPLY_MODE", "prompt": "Reply mode — 'off' for flat messages, 'thread' for threaded replies (default: off)", "password": False,
"help": "off = flat channel messages, thread = replies nest under your message."},
],
},
{
"key": "whatsapp",
"label": "WhatsApp",
@@ -1100,6 +1158,16 @@ def _platform_status(platform: dict) -> str:
if any([val, pwd, imap, smtp]):
return "partially configured"
return "not configured"
if platform.get("key") == "matrix":
homeserver = get_env_value("MATRIX_HOMESERVER")
password = get_env_value("MATRIX_PASSWORD")
if (val or password) and homeserver:
e2ee = get_env_value("MATRIX_ENCRYPTION")
suffix = " + E2EE" if e2ee and e2ee.lower() in ("true", "1", "yes") else ""
return f"configured{suffix}"
if val or password or homeserver:
return "partially configured"
return "not configured"
if val:
return "configured"
return "not configured"

View File

@@ -2518,6 +2518,119 @@ def setup_gateway(config: dict):
" Set SLACK_ALLOW_ALL_USERS=true or GATEWAY_ALLOW_ALL_USERS=true only if you intentionally want open workspace access."
)
# ── Matrix ──
existing_matrix = get_env_value("MATRIX_ACCESS_TOKEN") or get_env_value("MATRIX_PASSWORD")
if existing_matrix:
print_info("Matrix: already configured")
if prompt_yes_no("Reconfigure Matrix?", False):
existing_matrix = None
if not existing_matrix and prompt_yes_no("Set up Matrix?", False):
print_info("Works with any Matrix homeserver (Synapse, Conduit, Dendrite, or matrix.org).")
print_info(" 1. Create a bot user on your homeserver, or use your own account")
print_info(" 2. Get an access token from Element, or provide user ID + password")
print()
homeserver = prompt("Homeserver URL (e.g. https://matrix.example.org)")
if homeserver:
save_env_value("MATRIX_HOMESERVER", homeserver.rstrip("/"))
print()
print_info("Auth: provide an access token (recommended), or user ID + password.")
token = prompt("Access token (leave empty for password login)", password=True)
if token:
save_env_value("MATRIX_ACCESS_TOKEN", token)
user_id = prompt("User ID (@bot:server — optional, will be auto-detected)")
if user_id:
save_env_value("MATRIX_USER_ID", user_id)
print_success("Matrix access token saved")
else:
user_id = prompt("User ID (@bot:server)")
if user_id:
save_env_value("MATRIX_USER_ID", user_id)
password = prompt("Password", password=True)
if password:
save_env_value("MATRIX_PASSWORD", password)
print_success("Matrix credentials saved")
if token or get_env_value("MATRIX_PASSWORD"):
# E2EE
print()
if prompt_yes_no("Enable end-to-end encryption (E2EE)?", False):
save_env_value("MATRIX_ENCRYPTION", "true")
print_success("E2EE enabled")
print_info(" Requires: pip install 'matrix-nio[e2e]'")
# Allowed users
print()
print_info("🔒 Security: Restrict who can use your bot")
print_info(" Matrix user IDs look like @username:server")
print()
allowed_users = prompt(
"Allowed user IDs (comma-separated, leave empty for open access)"
)
if allowed_users:
save_env_value("MATRIX_ALLOWED_USERS", allowed_users.replace(" ", ""))
print_success("Matrix allowlist configured")
else:
print_info(
"⚠️ No allowlist set - anyone who can message the bot can use it!"
)
# Home room
print()
print_info("📬 Home Room: where Hermes delivers cron job results and notifications.")
print_info(" Room IDs look like !abc123:server (shown in Element room settings)")
print_info(" You can also set this later by typing /set-home in a Matrix room.")
home_room = prompt("Home room ID (leave empty to set later with /set-home)")
if home_room:
save_env_value("MATRIX_HOME_ROOM", home_room)
# ── Mattermost ──
existing_mattermost = get_env_value("MATTERMOST_TOKEN")
if existing_mattermost:
print_info("Mattermost: already configured")
if prompt_yes_no("Reconfigure Mattermost?", False):
existing_mattermost = None
if not existing_mattermost and prompt_yes_no("Set up Mattermost?", False):
print_info("Works with any self-hosted Mattermost instance.")
print_info(" 1. In Mattermost: Integrations → Bot Accounts → Add Bot Account")
print_info(" 2. Copy the bot token")
print()
mm_url = prompt("Mattermost server URL (e.g. https://mm.example.com)")
if mm_url:
save_env_value("MATTERMOST_URL", mm_url.rstrip("/"))
token = prompt("Bot token", password=True)
if token:
save_env_value("MATTERMOST_TOKEN", token)
print_success("Mattermost token saved")
# Allowed users
print()
print_info("🔒 Security: Restrict who can use your bot")
print_info(" To find your user ID: click your avatar → Profile")
print_info(" or use the API: GET /api/v4/users/me")
print()
allowed_users = prompt(
"Allowed user IDs (comma-separated, leave empty for open access)"
)
if allowed_users:
save_env_value("MATTERMOST_ALLOWED_USERS", allowed_users.replace(" ", ""))
print_success("Mattermost allowlist configured")
else:
print_info(
"⚠️ No allowlist set - anyone who can message the bot can use it!"
)
# Home channel
print()
print_info("📬 Home Channel: where Hermes delivers cron job results and notifications.")
print_info(" To get a channel ID: click channel name → View Info → copy the ID")
print_info(" You can also set this later by typing /set-home in a Mattermost channel.")
home_channel = prompt("Home channel ID (leave empty to set later with /set-home)")
if home_channel:
save_env_value("MATTERMOST_HOME_CHANNEL", home_channel)
# ── WhatsApp ──
existing_whatsapp = get_env_value("WHATSAPP_ENABLED")
if not existing_whatsapp and prompt_yes_no("Set up WhatsApp?", False):
@@ -2535,6 +2648,9 @@ def setup_gateway(config: dict):
get_env_value("TELEGRAM_BOT_TOKEN")
or get_env_value("DISCORD_BOT_TOKEN")
or get_env_value("SLACK_BOT_TOKEN")
or get_env_value("MATTERMOST_TOKEN")
or get_env_value("MATRIX_ACCESS_TOKEN")
or get_env_value("MATRIX_PASSWORD")
or get_env_value("WHATSAPP_ENABLED")
)
if any_messaging:

View File

@@ -46,6 +46,7 @@ dev = ["pytest", "pytest-asyncio", "pytest-xdist", "mcp>=1.2.0"]
messaging = ["python-telegram-bot>=20.0", "discord.py[voice]>=2.0", "aiohttp>=3.9.0", "slack-bolt>=1.18.0", "slack-sdk>=3.27.0"]
cron = ["croniter"]
slack = ["slack-bolt>=1.18.0", "slack-sdk>=3.27.0"]
matrix = ["matrix-nio[e2e]>=0.24.0"]
cli = ["simple-term-menu"]
tts-premium = ["elevenlabs"]
voice = ["sounddevice>=0.4.6", "numpy>=1.24.0"]

View File

@@ -0,0 +1,448 @@
"""Tests for Matrix platform adapter."""
import json
import re
import pytest
from unittest.mock import MagicMock, patch, AsyncMock
from gateway.config import Platform, PlatformConfig
# ---------------------------------------------------------------------------
# Platform & Config
# ---------------------------------------------------------------------------
class TestMatrixPlatformEnum:
def test_matrix_enum_exists(self):
assert Platform.MATRIX.value == "matrix"
def test_matrix_in_platform_list(self):
platforms = [p.value for p in Platform]
assert "matrix" in platforms
class TestMatrixConfigLoading:
def test_apply_env_overrides_with_access_token(self, monkeypatch):
monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "syt_abc123")
monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org")
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
assert Platform.MATRIX in config.platforms
mc = config.platforms[Platform.MATRIX]
assert mc.enabled is True
assert mc.token == "syt_abc123"
assert mc.extra.get("homeserver") == "https://matrix.example.org"
def test_apply_env_overrides_with_password(self, monkeypatch):
monkeypatch.delenv("MATRIX_ACCESS_TOKEN", raising=False)
monkeypatch.setenv("MATRIX_PASSWORD", "secret123")
monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org")
monkeypatch.setenv("MATRIX_USER_ID", "@bot:example.org")
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
assert Platform.MATRIX in config.platforms
mc = config.platforms[Platform.MATRIX]
assert mc.enabled is True
assert mc.extra.get("password") == "secret123"
assert mc.extra.get("user_id") == "@bot:example.org"
def test_matrix_not_loaded_without_creds(self, monkeypatch):
monkeypatch.delenv("MATRIX_ACCESS_TOKEN", raising=False)
monkeypatch.delenv("MATRIX_PASSWORD", raising=False)
monkeypatch.delenv("MATRIX_HOMESERVER", raising=False)
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
assert Platform.MATRIX not in config.platforms
def test_matrix_encryption_flag(self, monkeypatch):
monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "syt_abc123")
monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org")
monkeypatch.setenv("MATRIX_ENCRYPTION", "true")
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
mc = config.platforms[Platform.MATRIX]
assert mc.extra.get("encryption") is True
def test_matrix_encryption_default_off(self, monkeypatch):
monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "syt_abc123")
monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org")
monkeypatch.delenv("MATRIX_ENCRYPTION", raising=False)
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
mc = config.platforms[Platform.MATRIX]
assert mc.extra.get("encryption") is False
def test_matrix_home_room(self, monkeypatch):
monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "syt_abc123")
monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org")
monkeypatch.setenv("MATRIX_HOME_ROOM", "!room123:example.org")
monkeypatch.setenv("MATRIX_HOME_ROOM_NAME", "Bot Room")
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
home = config.get_home_channel(Platform.MATRIX)
assert home is not None
assert home.chat_id == "!room123:example.org"
assert home.name == "Bot Room"
def test_matrix_user_id_stored_in_extra(self, monkeypatch):
monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "syt_abc123")
monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org")
monkeypatch.setenv("MATRIX_USER_ID", "@hermes:example.org")
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
mc = config.platforms[Platform.MATRIX]
assert mc.extra.get("user_id") == "@hermes:example.org"
# ---------------------------------------------------------------------------
# Adapter helpers
# ---------------------------------------------------------------------------
def _make_adapter():
"""Create a MatrixAdapter with mocked config."""
from gateway.platforms.matrix import MatrixAdapter
config = PlatformConfig(
enabled=True,
token="syt_test_token",
extra={
"homeserver": "https://matrix.example.org",
"user_id": "@bot:example.org",
},
)
adapter = MatrixAdapter(config)
return adapter
# ---------------------------------------------------------------------------
# mxc:// URL conversion
# ---------------------------------------------------------------------------
class TestMatrixMxcToHttp:
def setup_method(self):
self.adapter = _make_adapter()
def test_basic_mxc_conversion(self):
"""mxc://server/media_id should become an authenticated HTTP URL."""
mxc = "mxc://matrix.org/abc123"
result = self.adapter._mxc_to_http(mxc)
assert result == "https://matrix.example.org/_matrix/client/v1/media/download/matrix.org/abc123"
def test_mxc_with_different_server(self):
"""mxc:// from a different server should still use our homeserver."""
mxc = "mxc://other.server/media456"
result = self.adapter._mxc_to_http(mxc)
assert result.startswith("https://matrix.example.org/")
assert "other.server/media456" in result
def test_non_mxc_url_passthrough(self):
"""Non-mxc URLs should be returned unchanged."""
url = "https://example.com/image.png"
assert self.adapter._mxc_to_http(url) == url
def test_mxc_uses_client_v1_endpoint(self):
"""Should use /_matrix/client/v1/media/download/ not the deprecated path."""
mxc = "mxc://example.com/test123"
result = self.adapter._mxc_to_http(mxc)
assert "/_matrix/client/v1/media/download/" in result
assert "/_matrix/media/v3/download/" not in result
# ---------------------------------------------------------------------------
# DM detection
# ---------------------------------------------------------------------------
class TestMatrixDmDetection:
def setup_method(self):
self.adapter = _make_adapter()
def test_room_in_m_direct_is_dm(self):
"""A room listed in m.direct should be detected as DM."""
self.adapter._joined_rooms = {"!dm_room:ex.org", "!group_room:ex.org"}
self.adapter._dm_rooms = {
"!dm_room:ex.org": True,
"!group_room:ex.org": False,
}
assert self.adapter._dm_rooms.get("!dm_room:ex.org") is True
assert self.adapter._dm_rooms.get("!group_room:ex.org") is False
def test_unknown_room_not_in_cache(self):
"""Unknown rooms should not be in the DM cache."""
self.adapter._dm_rooms = {}
assert self.adapter._dm_rooms.get("!unknown:ex.org") is None
@pytest.mark.asyncio
async def test_refresh_dm_cache_with_m_direct(self):
"""_refresh_dm_cache should populate _dm_rooms from m.direct data."""
self.adapter._joined_rooms = {"!room_a:ex.org", "!room_b:ex.org", "!room_c:ex.org"}
mock_client = MagicMock()
mock_resp = MagicMock()
mock_resp.content = {
"@alice:ex.org": ["!room_a:ex.org"],
"@bob:ex.org": ["!room_b:ex.org"],
}
mock_client.get_account_data = AsyncMock(return_value=mock_resp)
self.adapter._client = mock_client
await self.adapter._refresh_dm_cache()
assert self.adapter._dm_rooms["!room_a:ex.org"] is True
assert self.adapter._dm_rooms["!room_b:ex.org"] is True
assert self.adapter._dm_rooms["!room_c:ex.org"] is False
# ---------------------------------------------------------------------------
# Reply fallback stripping
# ---------------------------------------------------------------------------
class TestMatrixReplyFallbackStripping:
"""Test that Matrix reply fallback lines ('> ' prefix) are stripped."""
def setup_method(self):
self.adapter = _make_adapter()
self.adapter._user_id = "@bot:example.org"
self.adapter._startup_ts = 0.0
self.adapter._dm_rooms = {}
self.adapter._message_handler = AsyncMock()
def _strip_fallback(self, body: str, has_reply: bool = True) -> str:
"""Simulate the reply fallback stripping logic from _on_room_message."""
reply_to = "some_event_id" if has_reply else None
if reply_to and body.startswith("> "):
lines = body.split("\n")
stripped = []
past_fallback = False
for line in lines:
if not past_fallback:
if line.startswith("> ") or line == ">":
continue
if line == "":
past_fallback = True
continue
past_fallback = True
stripped.append(line)
body = "\n".join(stripped) if stripped else body
return body
def test_simple_reply_fallback(self):
body = "> <@alice:ex.org> Original message\n\nActual reply"
result = self._strip_fallback(body)
assert result == "Actual reply"
def test_multiline_reply_fallback(self):
body = "> <@alice:ex.org> Line 1\n> Line 2\n\nMy response"
result = self._strip_fallback(body)
assert result == "My response"
def test_no_reply_fallback_preserved(self):
body = "Just a normal message"
result = self._strip_fallback(body, has_reply=False)
assert result == "Just a normal message"
def test_quote_without_reply_preserved(self):
"""'> ' lines without a reply_to context should be preserved."""
body = "> This is a blockquote"
result = self._strip_fallback(body, has_reply=False)
assert result == "> This is a blockquote"
def test_empty_fallback_separator(self):
"""The blank line between fallback and actual content should be stripped."""
body = "> <@alice:ex.org> hi\n>\n\nResponse"
result = self._strip_fallback(body)
assert result == "Response"
def test_multiline_response_after_fallback(self):
body = "> <@alice:ex.org> Original\n\nLine 1\nLine 2\nLine 3"
result = self._strip_fallback(body)
assert result == "Line 1\nLine 2\nLine 3"
# ---------------------------------------------------------------------------
# Thread detection
# ---------------------------------------------------------------------------
class TestMatrixThreadDetection:
def test_thread_id_from_m_relates_to(self):
"""m.relates_to with rel_type=m.thread should extract the event_id."""
relates_to = {
"rel_type": "m.thread",
"event_id": "$thread_root_event",
"is_falling_back": True,
"m.in_reply_to": {"event_id": "$some_event"},
}
# Simulate the extraction logic from _on_room_message
thread_id = None
if relates_to.get("rel_type") == "m.thread":
thread_id = relates_to.get("event_id")
assert thread_id == "$thread_root_event"
def test_no_thread_for_reply(self):
"""m.in_reply_to without m.thread should not set thread_id."""
relates_to = {
"m.in_reply_to": {"event_id": "$reply_event"},
}
thread_id = None
if relates_to.get("rel_type") == "m.thread":
thread_id = relates_to.get("event_id")
assert thread_id is None
def test_no_thread_for_edit(self):
"""m.replace relation should not set thread_id."""
relates_to = {
"rel_type": "m.replace",
"event_id": "$edited_event",
}
thread_id = None
if relates_to.get("rel_type") == "m.thread":
thread_id = relates_to.get("event_id")
assert thread_id is None
def test_empty_relates_to(self):
"""Empty m.relates_to should not set thread_id."""
relates_to = {}
thread_id = None
if relates_to.get("rel_type") == "m.thread":
thread_id = relates_to.get("event_id")
assert thread_id is None
# ---------------------------------------------------------------------------
# Format message
# ---------------------------------------------------------------------------
class TestMatrixFormatMessage:
def setup_method(self):
self.adapter = _make_adapter()
def test_image_markdown_stripped(self):
"""![alt](url) should be converted to just the URL."""
result = self.adapter.format_message("![cat](https://img.example.com/cat.png)")
assert result == "https://img.example.com/cat.png"
def test_regular_markdown_preserved(self):
"""Standard markdown should be preserved (Matrix supports it)."""
content = "**bold** and *italic* and `code`"
assert self.adapter.format_message(content) == content
def test_plain_text_unchanged(self):
content = "Hello, world!"
assert self.adapter.format_message(content) == content
def test_multiple_images_stripped(self):
content = "![a](http://a.com/1.png) and ![b](http://b.com/2.png)"
result = self.adapter.format_message(content)
assert "![" not in result
assert "http://a.com/1.png" in result
assert "http://b.com/2.png" in result
# ---------------------------------------------------------------------------
# Markdown to HTML conversion
# ---------------------------------------------------------------------------
class TestMatrixMarkdownToHtml:
def setup_method(self):
self.adapter = _make_adapter()
def test_bold_conversion(self):
"""**bold** should produce <strong> tags."""
result = self.adapter._markdown_to_html("**bold**")
assert "<strong>" in result or "<b>" in result
assert "bold" in result
def test_italic_conversion(self):
"""*italic* should produce <em> tags."""
result = self.adapter._markdown_to_html("*italic*")
assert "<em>" in result or "<i>" in result
def test_inline_code(self):
"""`code` should produce <code> tags."""
result = self.adapter._markdown_to_html("`code`")
assert "<code>" in result
def test_plain_text_returns_html(self):
"""Plain text should still be returned (possibly with <br> or <p>)."""
result = self.adapter._markdown_to_html("Hello world")
assert "Hello world" in result
# ---------------------------------------------------------------------------
# Helper: display name extraction
# ---------------------------------------------------------------------------
class TestMatrixDisplayName:
def setup_method(self):
self.adapter = _make_adapter()
def test_get_display_name_from_room_users(self):
"""Should get display name from room's users dict."""
mock_room = MagicMock()
mock_user = MagicMock()
mock_user.display_name = "Alice"
mock_room.users = {"@alice:ex.org": mock_user}
name = self.adapter._get_display_name(mock_room, "@alice:ex.org")
assert name == "Alice"
def test_get_display_name_fallback_to_localpart(self):
"""Should extract localpart from @user:server format."""
mock_room = MagicMock()
mock_room.users = {}
name = self.adapter._get_display_name(mock_room, "@bob:example.org")
assert name == "bob"
def test_get_display_name_no_room(self):
"""Should handle None room gracefully."""
name = self.adapter._get_display_name(None, "@charlie:ex.org")
assert name == "charlie"
# ---------------------------------------------------------------------------
# Requirements check
# ---------------------------------------------------------------------------
class TestMatrixRequirements:
def test_check_requirements_with_token(self, monkeypatch):
monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "syt_test")
monkeypatch.setenv("MATRIX_HOMESERVER", "https://matrix.example.org")
from gateway.platforms.matrix import check_matrix_requirements
try:
import nio # noqa: F401
assert check_matrix_requirements() is True
except ImportError:
assert check_matrix_requirements() is False
def test_check_requirements_without_creds(self, monkeypatch):
monkeypatch.delenv("MATRIX_ACCESS_TOKEN", raising=False)
monkeypatch.delenv("MATRIX_PASSWORD", raising=False)
monkeypatch.delenv("MATRIX_HOMESERVER", raising=False)
from gateway.platforms.matrix import check_matrix_requirements
assert check_matrix_requirements() is False
def test_check_requirements_without_homeserver(self, monkeypatch):
monkeypatch.setenv("MATRIX_ACCESS_TOKEN", "syt_test")
monkeypatch.delenv("MATRIX_HOMESERVER", raising=False)
from gateway.platforms.matrix import check_matrix_requirements
assert check_matrix_requirements() is False

View File

@@ -0,0 +1,574 @@
"""Tests for Mattermost platform adapter."""
import json
import time
import pytest
from unittest.mock import MagicMock, patch, AsyncMock
from gateway.config import Platform, PlatformConfig
# ---------------------------------------------------------------------------
# Platform & Config
# ---------------------------------------------------------------------------
class TestMattermostPlatformEnum:
def test_mattermost_enum_exists(self):
assert Platform.MATTERMOST.value == "mattermost"
def test_mattermost_in_platform_list(self):
platforms = [p.value for p in Platform]
assert "mattermost" in platforms
class TestMattermostConfigLoading:
def test_apply_env_overrides_mattermost(self, monkeypatch):
monkeypatch.setenv("MATTERMOST_TOKEN", "mm-tok-abc123")
monkeypatch.setenv("MATTERMOST_URL", "https://mm.example.com")
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
assert Platform.MATTERMOST in config.platforms
mc = config.platforms[Platform.MATTERMOST]
assert mc.enabled is True
assert mc.token == "mm-tok-abc123"
assert mc.extra.get("url") == "https://mm.example.com"
def test_mattermost_not_loaded_without_token(self, monkeypatch):
monkeypatch.delenv("MATTERMOST_TOKEN", raising=False)
monkeypatch.delenv("MATTERMOST_URL", raising=False)
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
assert Platform.MATTERMOST not in config.platforms
def test_connected_platforms_includes_mattermost(self, monkeypatch):
monkeypatch.setenv("MATTERMOST_TOKEN", "mm-tok-abc123")
monkeypatch.setenv("MATTERMOST_URL", "https://mm.example.com")
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
connected = config.get_connected_platforms()
assert Platform.MATTERMOST in connected
def test_mattermost_home_channel(self, monkeypatch):
monkeypatch.setenv("MATTERMOST_TOKEN", "mm-tok-abc123")
monkeypatch.setenv("MATTERMOST_URL", "https://mm.example.com")
monkeypatch.setenv("MATTERMOST_HOME_CHANNEL", "ch_abc123")
monkeypatch.setenv("MATTERMOST_HOME_CHANNEL_NAME", "General")
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
home = config.get_home_channel(Platform.MATTERMOST)
assert home is not None
assert home.chat_id == "ch_abc123"
assert home.name == "General"
def test_mattermost_url_warning_without_url(self, monkeypatch):
"""MATTERMOST_TOKEN set but MATTERMOST_URL missing should still load."""
monkeypatch.setenv("MATTERMOST_TOKEN", "mm-tok-abc123")
monkeypatch.delenv("MATTERMOST_URL", raising=False)
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
assert Platform.MATTERMOST in config.platforms
assert config.platforms[Platform.MATTERMOST].extra.get("url") == ""
# ---------------------------------------------------------------------------
# Adapter format / truncate
# ---------------------------------------------------------------------------
def _make_adapter():
"""Create a MattermostAdapter with mocked config."""
from gateway.platforms.mattermost import MattermostAdapter
config = PlatformConfig(
enabled=True,
token="test-token",
extra={"url": "https://mm.example.com"},
)
adapter = MattermostAdapter(config)
return adapter
class TestMattermostFormatMessage:
def setup_method(self):
self.adapter = _make_adapter()
def test_image_markdown_to_url(self):
"""![alt](url) should be converted to just the URL."""
result = self.adapter.format_message("![cat](https://img.example.com/cat.png)")
assert result == "https://img.example.com/cat.png"
def test_image_markdown_strips_alt_text(self):
result = self.adapter.format_message("Here: ![my image](https://x.com/a.jpg) done")
assert "![" not in result
assert "https://x.com/a.jpg" in result
def test_regular_markdown_preserved(self):
"""Regular markdown (bold, italic, code) should be kept as-is."""
content = "**bold** and *italic* and `code`"
assert self.adapter.format_message(content) == content
def test_regular_links_preserved(self):
"""Non-image links should be preserved."""
content = "[click](https://example.com)"
assert self.adapter.format_message(content) == content
def test_plain_text_unchanged(self):
content = "Hello, world!"
assert self.adapter.format_message(content) == content
def test_multiple_images(self):
content = "![a](http://a.com/1.png) text ![b](http://b.com/2.png)"
result = self.adapter.format_message(content)
assert "![" not in result
assert "http://a.com/1.png" in result
assert "http://b.com/2.png" in result
class TestMattermostTruncateMessage:
def setup_method(self):
self.adapter = _make_adapter()
def test_short_message_single_chunk(self):
msg = "Hello, world!"
chunks = self.adapter.truncate_message(msg, 4000)
assert len(chunks) == 1
assert chunks[0] == msg
def test_long_message_splits(self):
msg = "a " * 2500 # 5000 chars
chunks = self.adapter.truncate_message(msg, 4000)
assert len(chunks) >= 2
for chunk in chunks:
assert len(chunk) <= 4000
def test_custom_max_length(self):
msg = "Hello " * 20
chunks = self.adapter.truncate_message(msg, max_length=50)
assert all(len(c) <= 50 for c in chunks)
def test_exactly_at_limit(self):
msg = "x" * 4000
chunks = self.adapter.truncate_message(msg, 4000)
assert len(chunks) == 1
# ---------------------------------------------------------------------------
# Send
# ---------------------------------------------------------------------------
class TestMattermostSend:
def setup_method(self):
self.adapter = _make_adapter()
self.adapter._session = MagicMock()
@pytest.mark.asyncio
async def test_send_calls_api_post(self):
"""send() should POST to /api/v4/posts with channel_id and message."""
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.json = AsyncMock(return_value={"id": "post123"})
mock_resp.text = AsyncMock(return_value="")
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
self.adapter._session.post = MagicMock(return_value=mock_resp)
result = await self.adapter.send("channel_1", "Hello!")
assert result.success is True
assert result.message_id == "post123"
# Verify post was called with correct URL
call_args = self.adapter._session.post.call_args
assert "/api/v4/posts" in call_args[0][0]
# Verify payload
payload = call_args[1]["json"]
assert payload["channel_id"] == "channel_1"
assert payload["message"] == "Hello!"
@pytest.mark.asyncio
async def test_send_empty_content_succeeds(self):
"""Empty content should return success without calling the API."""
result = await self.adapter.send("channel_1", "")
assert result.success is True
@pytest.mark.asyncio
async def test_send_with_thread_reply(self):
"""When reply_mode is 'thread', reply_to should become root_id."""
self.adapter._reply_mode = "thread"
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.json = AsyncMock(return_value={"id": "post456"})
mock_resp.text = AsyncMock(return_value="")
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
self.adapter._session.post = MagicMock(return_value=mock_resp)
result = await self.adapter.send("channel_1", "Reply!", reply_to="root_post")
assert result.success is True
payload = self.adapter._session.post.call_args[1]["json"]
assert payload["root_id"] == "root_post"
@pytest.mark.asyncio
async def test_send_without_thread_no_root_id(self):
"""When reply_mode is 'off', reply_to should NOT set root_id."""
self.adapter._reply_mode = "off"
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.json = AsyncMock(return_value={"id": "post789"})
mock_resp.text = AsyncMock(return_value="")
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
self.adapter._session.post = MagicMock(return_value=mock_resp)
result = await self.adapter.send("channel_1", "Reply!", reply_to="root_post")
assert result.success is True
payload = self.adapter._session.post.call_args[1]["json"]
assert "root_id" not in payload
@pytest.mark.asyncio
async def test_send_api_failure(self):
"""When API returns error, send should return failure."""
mock_resp = AsyncMock()
mock_resp.status = 500
mock_resp.json = AsyncMock(return_value={})
mock_resp.text = AsyncMock(return_value="Internal Server Error")
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
self.adapter._session.post = MagicMock(return_value=mock_resp)
result = await self.adapter.send("channel_1", "Hello!")
assert result.success is False
# ---------------------------------------------------------------------------
# WebSocket event parsing
# ---------------------------------------------------------------------------
class TestMattermostWebSocketParsing:
def setup_method(self):
self.adapter = _make_adapter()
self.adapter._bot_user_id = "bot_user_id"
# Mock handle_message to capture the MessageEvent without processing
self.adapter.handle_message = AsyncMock()
@pytest.mark.asyncio
async def test_parse_posted_event(self):
"""'posted' events should extract message from double-encoded post JSON."""
post_data = {
"id": "post_abc",
"user_id": "user_123",
"channel_id": "chan_456",
"message": "Hello from Matrix!",
}
event = {
"event": "posted",
"data": {
"post": json.dumps(post_data), # double-encoded JSON string
"channel_type": "O",
"sender_name": "@alice",
},
}
await self.adapter._handle_ws_event(event)
assert self.adapter.handle_message.called
msg_event = self.adapter.handle_message.call_args[0][0]
assert msg_event.text == "Hello from Matrix!"
assert msg_event.message_id == "post_abc"
@pytest.mark.asyncio
async def test_ignore_own_messages(self):
"""Messages from the bot's own user_id should be ignored."""
post_data = {
"id": "post_self",
"user_id": "bot_user_id", # same as bot
"channel_id": "chan_456",
"message": "Bot echo",
}
event = {
"event": "posted",
"data": {
"post": json.dumps(post_data),
"channel_type": "O",
},
}
await self.adapter._handle_ws_event(event)
assert not self.adapter.handle_message.called
@pytest.mark.asyncio
async def test_ignore_non_posted_events(self):
"""Non-'posted' events should be ignored."""
event = {
"event": "typing",
"data": {"user_id": "user_123"},
}
await self.adapter._handle_ws_event(event)
assert not self.adapter.handle_message.called
@pytest.mark.asyncio
async def test_ignore_system_posts(self):
"""Posts with a 'type' field (system messages) should be ignored."""
post_data = {
"id": "sys_post",
"user_id": "user_123",
"channel_id": "chan_456",
"message": "user joined",
"type": "system_join_channel",
}
event = {
"event": "posted",
"data": {
"post": json.dumps(post_data),
"channel_type": "O",
},
}
await self.adapter._handle_ws_event(event)
assert not self.adapter.handle_message.called
@pytest.mark.asyncio
async def test_channel_type_mapping(self):
"""channel_type 'D' should map to 'dm'."""
post_data = {
"id": "post_dm",
"user_id": "user_123",
"channel_id": "chan_dm",
"message": "DM message",
}
event = {
"event": "posted",
"data": {
"post": json.dumps(post_data),
"channel_type": "D",
"sender_name": "@bob",
},
}
await self.adapter._handle_ws_event(event)
assert self.adapter.handle_message.called
msg_event = self.adapter.handle_message.call_args[0][0]
assert msg_event.source.chat_type == "dm"
@pytest.mark.asyncio
async def test_thread_id_from_root_id(self):
"""Post with root_id should have thread_id set."""
post_data = {
"id": "post_reply",
"user_id": "user_123",
"channel_id": "chan_456",
"message": "Thread reply",
"root_id": "root_post_123",
}
event = {
"event": "posted",
"data": {
"post": json.dumps(post_data),
"channel_type": "O",
"sender_name": "@alice",
},
}
await self.adapter._handle_ws_event(event)
assert self.adapter.handle_message.called
msg_event = self.adapter.handle_message.call_args[0][0]
assert msg_event.source.thread_id == "root_post_123"
@pytest.mark.asyncio
async def test_invalid_post_json_ignored(self):
"""Invalid JSON in data.post should be silently ignored."""
event = {
"event": "posted",
"data": {
"post": "not-valid-json{{{",
"channel_type": "O",
},
}
await self.adapter._handle_ws_event(event)
assert not self.adapter.handle_message.called
# ---------------------------------------------------------------------------
# File upload (send_image)
# ---------------------------------------------------------------------------
class TestMattermostFileUpload:
def setup_method(self):
self.adapter = _make_adapter()
self.adapter._session = MagicMock()
@pytest.mark.asyncio
async def test_send_image_downloads_and_uploads(self):
"""send_image should download the URL, upload via /api/v4/files, then post."""
# Mock the download (GET)
mock_dl_resp = AsyncMock()
mock_dl_resp.status = 200
mock_dl_resp.read = AsyncMock(return_value=b"\x89PNG\x00fake-image-data")
mock_dl_resp.content_type = "image/png"
mock_dl_resp.__aenter__ = AsyncMock(return_value=mock_dl_resp)
mock_dl_resp.__aexit__ = AsyncMock(return_value=False)
# Mock the upload (POST to /files)
mock_upload_resp = AsyncMock()
mock_upload_resp.status = 200
mock_upload_resp.json = AsyncMock(return_value={
"file_infos": [{"id": "file_abc123"}]
})
mock_upload_resp.text = AsyncMock(return_value="")
mock_upload_resp.__aenter__ = AsyncMock(return_value=mock_upload_resp)
mock_upload_resp.__aexit__ = AsyncMock(return_value=False)
# Mock the post (POST to /posts)
mock_post_resp = AsyncMock()
mock_post_resp.status = 200
mock_post_resp.json = AsyncMock(return_value={"id": "post_with_file"})
mock_post_resp.text = AsyncMock(return_value="")
mock_post_resp.__aenter__ = AsyncMock(return_value=mock_post_resp)
mock_post_resp.__aexit__ = AsyncMock(return_value=False)
# Route calls: first GET (download), then POST (upload), then POST (create post)
self.adapter._session.get = MagicMock(return_value=mock_dl_resp)
post_call_count = 0
original_post_returns = [mock_upload_resp, mock_post_resp]
def post_side_effect(*args, **kwargs):
nonlocal post_call_count
resp = original_post_returns[min(post_call_count, len(original_post_returns) - 1)]
post_call_count += 1
return resp
self.adapter._session.post = MagicMock(side_effect=post_side_effect)
result = await self.adapter.send_image(
"channel_1", "https://img.example.com/cat.png", caption="A cat"
)
assert result.success is True
assert result.message_id == "post_with_file"
# ---------------------------------------------------------------------------
# Dedup cache
# ---------------------------------------------------------------------------
class TestMattermostDedup:
def setup_method(self):
self.adapter = _make_adapter()
self.adapter._bot_user_id = "bot_user_id"
# Mock handle_message to capture calls without processing
self.adapter.handle_message = AsyncMock()
@pytest.mark.asyncio
async def test_duplicate_post_ignored(self):
"""The same post_id within the TTL window should be ignored."""
post_data = {
"id": "post_dup",
"user_id": "user_123",
"channel_id": "chan_456",
"message": "Hello!",
}
event = {
"event": "posted",
"data": {
"post": json.dumps(post_data),
"channel_type": "O",
"sender_name": "@alice",
},
}
# First time: should process
await self.adapter._handle_ws_event(event)
assert self.adapter.handle_message.call_count == 1
# Second time (same post_id): should be deduped
await self.adapter._handle_ws_event(event)
assert self.adapter.handle_message.call_count == 1 # still 1
@pytest.mark.asyncio
async def test_different_post_ids_both_processed(self):
"""Different post IDs should both be processed."""
for i, pid in enumerate(["post_a", "post_b"]):
post_data = {
"id": pid,
"user_id": "user_123",
"channel_id": "chan_456",
"message": f"Message {i}",
}
event = {
"event": "posted",
"data": {
"post": json.dumps(post_data),
"channel_type": "O",
"sender_name": "@alice",
},
}
await self.adapter._handle_ws_event(event)
assert self.adapter.handle_message.call_count == 2
def test_prune_seen_clears_expired(self):
"""_prune_seen should remove entries older than _SEEN_TTL."""
now = time.time()
# Fill with enough expired entries to trigger pruning
for i in range(self.adapter._SEEN_MAX + 10):
self.adapter._seen_posts[f"old_{i}"] = now - 600 # 10 min ago
# Add a fresh one
self.adapter._seen_posts["fresh"] = now
self.adapter._prune_seen()
# Old entries should be pruned, fresh one kept
assert "fresh" in self.adapter._seen_posts
assert len(self.adapter._seen_posts) < self.adapter._SEEN_MAX
def test_seen_cache_tracks_post_ids(self):
"""Posts are tracked in _seen_posts dict."""
self.adapter._seen_posts["test_post"] = time.time()
assert "test_post" in self.adapter._seen_posts
# ---------------------------------------------------------------------------
# Requirements check
# ---------------------------------------------------------------------------
class TestMattermostRequirements:
def test_check_requirements_with_token_and_url(self, monkeypatch):
monkeypatch.setenv("MATTERMOST_TOKEN", "test-token")
monkeypatch.setenv("MATTERMOST_URL", "https://mm.example.com")
from gateway.platforms.mattermost import check_mattermost_requirements
assert check_mattermost_requirements() is True
def test_check_requirements_without_token(self, monkeypatch):
monkeypatch.delenv("MATTERMOST_TOKEN", raising=False)
monkeypatch.delenv("MATTERMOST_URL", raising=False)
from gateway.platforms.mattermost import check_mattermost_requirements
assert check_mattermost_requirements() is False
def test_check_requirements_without_url(self, monkeypatch):
monkeypatch.setenv("MATTERMOST_TOKEN", "test-token")
monkeypatch.delenv("MATTERMOST_URL", raising=False)
from gateway.platforms.mattermost import check_mattermost_requirements
assert check_mattermost_requirements() is False

View File

@@ -0,0 +1,354 @@
---
sidebar_position: 9
title: "Matrix"
description: "Set up Hermes Agent as a Matrix bot"
---
# Matrix Setup
Hermes Agent integrates with Matrix, the open, federated messaging protocol. Matrix lets you run your own homeserver or use a public one like matrix.org — either way, you keep control of your communications. The bot connects via the `matrix-nio` Python SDK, processes messages through the Hermes Agent pipeline (including tool use, memory, and reasoning), and responds in real time. It supports text, file attachments, images, audio, video, and optional end-to-end encryption (E2EE).
Hermes works with any Matrix homeserver — Synapse, Conduit, Dendrite, or matrix.org.
Before setup, here's the part most people want to know: how Hermes behaves once it's connected.
## How Hermes Behaves
| Context | Behavior |
|---------|----------|
| **DMs** | Hermes responds to every message. No `@mention` needed. Each DM has its own session. |
| **Rooms** | Hermes responds to all messages in rooms it has joined. Room invites are auto-accepted. |
| **Threads** | Hermes supports Matrix threads (MSC3440). If you reply in a thread, Hermes keeps the thread context isolated from the main room timeline. |
| **Shared rooms with multiple users** | By default, Hermes isolates session history per user inside the room. Two people talking in the same room do not share one transcript unless you explicitly disable that. |
:::tip
The bot automatically joins rooms when invited. Just invite the bot's Matrix user to any room and it will join and start responding.
:::
### Session Model in Matrix
By default:
- each DM gets its own session
- each thread gets its own session namespace
- each user in a shared room gets their own session inside that room
This is controlled by `config.yaml`:
```yaml
group_sessions_per_user: true
```
Set it to `false` only if you explicitly want one shared conversation for the entire room:
```yaml
group_sessions_per_user: false
```
Shared sessions can be useful for a collaborative room, but they also mean:
- users share context growth and token costs
- one person's long tool-heavy task can bloat everyone else's context
- one person's in-flight run can interrupt another person's follow-up in the same room
This guide walks you through the full setup process — from creating your bot account to sending your first message.
## Step 1: Create a Bot Account
You need a Matrix user account for the bot. There are several ways to do this:
### Option A: Register on Your Homeserver (Recommended)
If you run your own homeserver (Synapse, Conduit, Dendrite):
1. Use the admin API or registration tool to create a new user:
```bash
# Synapse example
register_new_matrix_user -c /etc/synapse/homeserver.yaml http://localhost:8008
```
2. Choose a username like `hermes` — the full user ID will be `@hermes:your-server.org`.
### Option B: Use matrix.org or Another Public Homeserver
1. Go to [Element Web](https://app.element.io) and create a new account.
2. Pick a username for your bot (e.g., `hermes-bot`).
### Option C: Use Your Own Account
You can also run Hermes as your own user. This means the bot posts as you — useful for personal assistants.
## Step 2: Get an Access Token
Hermes needs an access token to authenticate with the homeserver. You have two options:
### Option A: Access Token (Recommended)
The most reliable way to get a token:
**Via Element:**
1. Log in to [Element](https://app.element.io) with the bot account.
2. Go to **Settings****Help & About**.
3. Scroll down and expand **Advanced** — the access token is displayed there.
4. **Copy it immediately.**
**Via the API:**
```bash
curl -X POST https://your-server/_matrix/client/v3/login \
-H "Content-Type: application/json" \
-d '{
"type": "m.login.password",
"user": "@hermes:your-server.org",
"password": "your-password"
}'
```
The response includes an `access_token` field — copy it.
:::warning[Keep your access token safe]
The access token gives full access to the bot's Matrix account. Never share it publicly or commit it to Git. If compromised, revoke it by logging out all sessions for that user.
:::
### Option B: Password Login
Instead of providing an access token, you can give Hermes the bot's user ID and password. Hermes will log in automatically on startup. This is simpler but means the password is stored in your `.env` file.
```bash
MATRIX_USER_ID=@hermes:your-server.org
MATRIX_PASSWORD=your-password
```
## Step 3: Find Your Matrix User ID
Hermes Agent uses your Matrix User ID to control who can interact with the bot. Matrix User IDs follow the format `@username:server`.
To find yours:
1. Open [Element](https://app.element.io) (or your preferred Matrix client).
2. Click your avatar → **Settings**.
3. Your User ID is displayed at the top of the profile (e.g., `@alice:matrix.org`).
:::tip
Matrix User IDs always start with `@` and contain a `:` followed by the server name. For example: `@alice:matrix.org`, `@bob:your-server.com`.
:::
## Step 4: Configure Hermes Agent
### Option A: Interactive Setup (Recommended)
Run the guided setup command:
```bash
hermes gateway setup
```
Select **Matrix** when prompted, then provide your homeserver URL, access token (or user ID + password), and allowed user IDs when asked.
### Option B: Manual Configuration
Add the following to your `~/.hermes/.env` file:
**Using an access token:**
```bash
# Required
MATRIX_HOMESERVER=https://matrix.example.org
MATRIX_ACCESS_TOKEN=***
# Optional: user ID (auto-detected from token if omitted)
# MATRIX_USER_ID=@hermes:matrix.example.org
# Security: restrict who can interact with the bot
MATRIX_ALLOWED_USERS=@alice:matrix.example.org
# Multiple allowed users (comma-separated)
# MATRIX_ALLOWED_USERS=@alice:matrix.example.org,@bob:matrix.example.org
```
**Using password login:**
```bash
# Required
MATRIX_HOMESERVER=https://matrix.example.org
MATRIX_USER_ID=@hermes:matrix.example.org
MATRIX_PASSWORD=***
# Security
MATRIX_ALLOWED_USERS=@alice:matrix.example.org
```
Optional behavior settings in `~/.hermes/config.yaml`:
```yaml
group_sessions_per_user: true
```
- `group_sessions_per_user: true` keeps each participant's context isolated inside shared rooms
### Start the Gateway
Once configured, start the Matrix gateway:
```bash
hermes gateway
```
The bot should connect to your homeserver and start syncing within a few seconds. Send it a message — either a DM or in a room it has joined — to test.
:::tip
You can run `hermes gateway` in the background or as a systemd service for persistent operation. See the deployment docs for details.
:::
## End-to-End Encryption (E2EE)
Hermes supports Matrix end-to-end encryption, so you can chat with your bot in encrypted rooms.
### Requirements
E2EE requires the `matrix-nio` library with encryption extras and the `libolm` C library:
```bash
# Install matrix-nio with E2EE support
pip install 'matrix-nio[e2e]'
# Or install with hermes extras
pip install 'hermes-agent[matrix]'
```
You also need `libolm` installed on your system:
```bash
# Debian/Ubuntu
sudo apt install libolm-dev
# macOS
brew install libolm
# Fedora
sudo dnf install libolm-devel
```
### Enable E2EE
Add to your `~/.hermes/.env`:
```bash
MATRIX_ENCRYPTION=true
```
When E2EE is enabled, Hermes:
- Stores encryption keys in `~/.hermes/matrix/store/`
- Uploads device keys on first connection
- Decrypts incoming messages and encrypts outgoing messages automatically
- Auto-joins encrypted rooms when invited
:::warning
If you delete the `~/.hermes/matrix/store/` directory, the bot loses its encryption keys. You'll need to verify the device again in your Matrix client. Back up this directory if you want to preserve encrypted sessions.
:::
:::info
If `matrix-nio[e2e]` is not installed or `libolm` is missing, the bot falls back to a plain (unencrypted) client automatically. You'll see a warning in the logs.
:::
## Home Room
You can designate a "home room" where the bot sends proactive messages (such as cron job output, reminders, and notifications). There are two ways to set it:
### Using the Slash Command
Type `/sethome` in any Matrix room where the bot is present. That room becomes the home room.
### Manual Configuration
Add this to your `~/.hermes/.env`:
```bash
MATRIX_HOME_ROOM=!abc123def456:matrix.example.org
```
:::tip
To find a Room ID: in Element, go to the room → **Settings****Advanced** → the **Internal room ID** is shown there (starts with `!`).
:::
## Troubleshooting
### Bot is not responding to messages
**Cause**: The bot hasn't joined the room, or `MATRIX_ALLOWED_USERS` doesn't include your User ID.
**Fix**: Invite the bot to the room — it auto-joins on invite. Verify your User ID is in `MATRIX_ALLOWED_USERS` (use the full `@user:server` format). Restart the gateway.
### "Failed to authenticate" / "whoami failed" on startup
**Cause**: The access token or homeserver URL is incorrect.
**Fix**: Verify `MATRIX_HOMESERVER` points to your homeserver (include `https://`, no trailing slash). Check that `MATRIX_ACCESS_TOKEN` is valid — try it with curl:
```bash
curl -H "Authorization: Bearer YOUR_TOKEN" \
https://your-server/_matrix/client/v3/account/whoami
```
If this returns your user info, the token is valid. If it returns an error, generate a new token.
### "matrix-nio not installed" error
**Cause**: The `matrix-nio` Python package is not installed.
**Fix**: Install it:
```bash
pip install 'matrix-nio[e2e]'
```
Or with Hermes extras:
```bash
pip install 'hermes-agent[matrix]'
```
### Encryption errors / "could not decrypt event"
**Cause**: Missing encryption keys, `libolm` not installed, or the bot's device isn't trusted.
**Fix**:
1. Verify `libolm` is installed on your system (see the E2EE section above).
2. Make sure `MATRIX_ENCRYPTION=true` is set in your `.env`.
3. In your Matrix client (Element), go to the bot's profile → **Sessions** → verify/trust the bot's device.
4. If the bot just joined an encrypted room, it can only decrypt messages sent *after* it joined. Older messages are inaccessible.
### Sync issues / bot falls behind
**Cause**: Long-running tool executions can delay the sync loop, or the homeserver is slow.
**Fix**: The sync loop automatically retries every 5 seconds on error. Check the Hermes logs for sync-related warnings. If the bot consistently falls behind, ensure your homeserver has adequate resources.
### Bot is offline
**Cause**: The Hermes gateway isn't running, or it failed to connect.
**Fix**: Check that `hermes gateway` is running. Look at the terminal output for error messages. Common issues: wrong homeserver URL, expired access token, homeserver unreachable.
### "User not allowed" / Bot ignores you
**Cause**: Your User ID isn't in `MATRIX_ALLOWED_USERS`.
**Fix**: Add your User ID to `MATRIX_ALLOWED_USERS` in `~/.hermes/.env` and restart the gateway. Use the full `@user:server` format.
## Security
:::warning
Always set `MATRIX_ALLOWED_USERS` to restrict who can interact with the bot. Without it, the gateway denies all users by default as a safety measure. Only add User IDs of people you trust — authorized users have full access to the agent's capabilities, including tool use and system access.
:::
For more information on securing your Hermes Agent deployment, see the [Security Guide](../security.md).
## Notes
- **Any homeserver**: Works with Synapse, Conduit, Dendrite, matrix.org, or any spec-compliant Matrix homeserver. No specific homeserver software required.
- **Federation**: If you're on a federated homeserver, the bot can communicate with users from other servers — just add their full `@user:server` IDs to `MATRIX_ALLOWED_USERS`.
- **Auto-join**: The bot automatically accepts room invites and joins. It starts responding immediately after joining.
- **Media support**: Hermes can send and receive images, audio, video, and file attachments. Media is uploaded to your homeserver using the Matrix content repository API.

View File

@@ -0,0 +1,277 @@
---
sidebar_position: 8
title: "Mattermost"
description: "Set up Hermes Agent as a Mattermost bot"
---
# Mattermost Setup
Hermes Agent integrates with Mattermost as a bot, letting you chat with your AI assistant through direct messages or team channels. Mattermost is a self-hosted, open-source Slack alternative — you run it on your own infrastructure, keeping full control of your data. The bot connects via Mattermost's REST API (v4) and WebSocket for real-time events, processes messages through the Hermes Agent pipeline (including tool use, memory, and reasoning), and responds in real time. It supports text, file attachments, images, and slash commands.
No external Mattermost library is required — the adapter uses `aiohttp`, which is already a Hermes dependency.
Before setup, here's the part most people want to know: how Hermes behaves once it's in your Mattermost instance.
## How Hermes Behaves
| Context | Behavior |
|---------|----------|
| **DMs** | Hermes responds to every message. No `@mention` needed. Each DM has its own session. |
| **Public/private channels** | Hermes responds when you `@mention` it. Without a mention, Hermes ignores the message. |
| **Threads** | If `MATTERMOST_REPLY_MODE=thread`, Hermes replies in a thread under your message. Thread context stays isolated from the parent channel. |
| **Shared channels with multiple users** | By default, Hermes isolates session history per user inside the channel. Two people talking in the same channel do not share one transcript unless you explicitly disable that. |
:::tip
If you want Hermes to reply as threaded conversations (nested under your original message), set `MATTERMOST_REPLY_MODE=thread`. The default is `off`, which sends flat messages in the channel.
:::
### Session Model in Mattermost
By default:
- each DM gets its own session
- each thread gets its own session namespace
- each user in a shared channel gets their own session inside that channel
This is controlled by `config.yaml`:
```yaml
group_sessions_per_user: true
```
Set it to `false` only if you explicitly want one shared conversation for the entire channel:
```yaml
group_sessions_per_user: false
```
Shared sessions can be useful for a collaborative channel, but they also mean:
- users share context growth and token costs
- one person's long tool-heavy task can bloat everyone else's context
- one person's in-flight run can interrupt another person's follow-up in the same channel
This guide walks you through the full setup process — from creating your bot on Mattermost to sending your first message.
## Step 1: Enable Bot Accounts
Bot accounts must be enabled on your Mattermost server before you can create one.
1. Log in to Mattermost as a **System Admin**.
2. Go to **System Console****Integrations****Bot Accounts**.
3. Set **Enable Bot Account Creation** to **true**.
4. Click **Save**.
:::info
If you don't have System Admin access, ask your Mattermost administrator to enable bot accounts and create one for you.
:::
## Step 2: Create a Bot Account
1. In Mattermost, click the **☰** menu (top-left) → **Integrations****Bot Accounts**.
2. Click **Add Bot Account**.
3. Fill in the details:
- **Username**: e.g., `hermes`
- **Display Name**: e.g., `Hermes Agent`
- **Description**: optional
- **Role**: `Member` is sufficient
4. Click **Create Bot Account**.
5. Mattermost will display the **bot token**. **Copy it immediately.**
:::warning[Token shown only once]
The bot token is only displayed once when you create the bot account. If you lose it, you'll need to regenerate it from the bot account settings. Never share your token publicly or commit it to Git — anyone with this token has full control of the bot.
:::
Store the token somewhere safe (a password manager, for example). You'll need it in Step 5.
:::tip
You can also use a **personal access token** instead of a bot account. Go to **Profile****Security****Personal Access Tokens****Create Token**. This is useful if you want Hermes to post as your own user rather than a separate bot user.
:::
## Step 3: Add the Bot to Channels
The bot needs to be a member of any channel where you want it to respond:
1. Open the channel where you want the bot.
2. Click the channel name → **Add Members**.
3. Search for your bot username (e.g., `hermes`) and add it.
For DMs, simply open a direct message with the bot — it will be able to respond immediately.
## Step 4: Find Your Mattermost User ID
Hermes Agent uses your Mattermost User ID to control who can interact with the bot. To find it:
1. Click your **avatar** (top-left corner) → **Profile**.
2. Your User ID is displayed in the profile dialog — click it to copy.
Your User ID is a 26-character alphanumeric string like `3uo8dkh1p7g1mfk49ear5fzs5c`.
:::warning
Your User ID is **not** your username. The username is what appears after `@` (e.g., `@alice`). The User ID is a long alphanumeric identifier that Mattermost uses internally.
:::
**Alternative**: You can also get your User ID via the API:
```bash
curl -H "Authorization: Bearer YOUR_TOKEN" \
https://your-mattermost-server/api/v4/users/me | jq .id
```
:::tip
To get a **Channel ID**: click the channel name → **View Info**. The Channel ID is shown in the info panel. You'll need this if you want to set a home channel manually.
:::
## Step 5: Configure Hermes Agent
### Option A: Interactive Setup (Recommended)
Run the guided setup command:
```bash
hermes gateway setup
```
Select **Mattermost** when prompted, then paste your server URL, bot token, and user ID when asked.
### Option B: Manual Configuration
Add the following to your `~/.hermes/.env` file:
```bash
# Required
MATTERMOST_URL=https://mm.example.com
MATTERMOST_TOKEN=***
MATTERMOST_ALLOWED_USERS=3uo8dkh1p7g1mfk49ear5fzs5c
# Multiple allowed users (comma-separated)
# MATTERMOST_ALLOWED_USERS=3uo8dkh1p7g1mfk49ear5fzs5c,8fk2jd9s0a7bncm1xqw4tp6r3e
# Optional: reply mode (thread or off, default: off)
# MATTERMOST_REPLY_MODE=thread
```
Optional behavior settings in `~/.hermes/config.yaml`:
```yaml
group_sessions_per_user: true
```
- `group_sessions_per_user: true` keeps each participant's context isolated inside shared channels and threads
### Start the Gateway
Once configured, start the Mattermost gateway:
```bash
hermes gateway
```
The bot should connect to your Mattermost server within a few seconds. Send it a message — either a DM or in a channel where it's been added — to test.
:::tip
You can run `hermes gateway` in the background or as a systemd service for persistent operation. See the deployment docs for details.
:::
## Home Channel
You can designate a "home channel" where the bot sends proactive messages (such as cron job output, reminders, and notifications). There are two ways to set it:
### Using the Slash Command
Type `/sethome` in any Mattermost channel where the bot is present. That channel becomes the home channel.
### Manual Configuration
Add this to your `~/.hermes/.env`:
```bash
MATTERMOST_HOME_CHANNEL=abc123def456ghi789jkl012mn
```
Replace the ID with the actual channel ID (click the channel name → View Info → copy the ID).
## Reply Mode
The `MATTERMOST_REPLY_MODE` setting controls how Hermes posts responses:
| Mode | Behavior |
|------|----------|
| `off` (default) | Hermes posts flat messages in the channel, like a normal user. |
| `thread` | Hermes replies in a thread under your original message. Keeps channels clean when there's lots of back-and-forth. |
Set it in your `~/.hermes/.env`:
```bash
MATTERMOST_REPLY_MODE=thread
```
## Troubleshooting
### Bot is not responding to messages
**Cause**: The bot is not a member of the channel, or `MATTERMOST_ALLOWED_USERS` doesn't include your User ID.
**Fix**: Add the bot to the channel (channel name → Add Members → search for the bot). Verify your User ID is in `MATTERMOST_ALLOWED_USERS`. Restart the gateway.
### 403 Forbidden errors
**Cause**: The bot token is invalid, or the bot doesn't have permission to post in the channel.
**Fix**: Check that `MATTERMOST_TOKEN` in your `.env` file is correct. Make sure the bot account hasn't been deactivated. Verify the bot has been added to the channel. If using a personal access token, ensure your account has the required permissions.
### WebSocket disconnects / reconnection loops
**Cause**: Network instability, Mattermost server restarts, or firewall/proxy issues with WebSocket connections.
**Fix**: The adapter automatically reconnects with exponential backoff (2s → 60s). Check your server's WebSocket configuration — reverse proxies (nginx, Apache) need WebSocket upgrade headers configured. Verify no firewall is blocking WebSocket connections on your Mattermost server.
For nginx, ensure your config includes:
```nginx
location /api/v4/websocket {
proxy_pass http://mattermost-backend;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_read_timeout 600s;
}
```
### "Failed to authenticate" on startup
**Cause**: The token or server URL is incorrect.
**Fix**: Verify `MATTERMOST_URL` points to your Mattermost server (include `https://`, no trailing slash). Check that `MATTERMOST_TOKEN` is valid — try it with curl:
```bash
curl -H "Authorization: Bearer YOUR_TOKEN" \
https://your-server/api/v4/users/me
```
If this returns your bot's user info, the token is valid. If it returns an error, regenerate the token.
### Bot is offline
**Cause**: The Hermes gateway isn't running, or it failed to connect.
**Fix**: Check that `hermes gateway` is running. Look at the terminal output for error messages. Common issues: wrong URL, expired token, Mattermost server unreachable.
### "User not allowed" / Bot ignores you
**Cause**: Your User ID isn't in `MATTERMOST_ALLOWED_USERS`.
**Fix**: Add your User ID to `MATTERMOST_ALLOWED_USERS` in `~/.hermes/.env` and restart the gateway. Remember: the User ID is a 26-character alphanumeric string, not your `@username`.
## Security
:::warning
Always set `MATTERMOST_ALLOWED_USERS` to restrict who can interact with the bot. Without it, the gateway denies all users by default as a safety measure. Only add User IDs of people you trust — authorized users have full access to the agent's capabilities, including tool use and system access.
:::
For more information on securing your Hermes Agent deployment, see the [Security Guide](../security.md).
## Notes
- **Self-hosted friendly**: Works with any self-hosted Mattermost instance. No Mattermost Cloud account or subscription required.
- **No extra dependencies**: The adapter uses `aiohttp` for HTTP and WebSocket, which is already included with Hermes Agent.
- **Team Edition compatible**: Works with both Mattermost Team Edition (free) and Enterprise Edition.