feat: add Signal messenger gateway platform (#405)

Complete Signal adapter using signal-cli daemon HTTP API.
Based on PR #268 by ibhagwan, rebuilt on current main with bug fixes.

Architecture:
- SSE streaming for inbound messages with exponential backoff (2s→60s)
- JSON-RPC 2.0 for outbound (send, typing, attachments, contacts)
- Health monitor detects stale SSE connections (120s threshold)
- Phone number redaction in all logs and global redact.py

Features:
- DM and group message support with separate access policies
- DM policies: pairing (default), allowlist, open
- Group policies: disabled (default), allowlist, open
- Attachment download with magic-byte type detection
- Typing indicators (8s refresh interval)
- 100MB attachment size limit, 8000 char message limit
- E.164 phone + UUID allowlist support

Integration:
- Platform.SIGNAL enum in gateway/config.py
- Signal in _is_user_authorized() allowlist maps (gateway/run.py)
- Adapter factory in _create_adapter() (gateway/run.py)
- user_id_alt/chat_id_alt fields in SessionSource for UUIDs
- send_message tool support via httpx JSON-RPC (not aiohttp)
- Interactive setup wizard in 'hermes gateway setup'
- Connectivity testing during setup (pings /api/v1/check)
- signal-cli detection and install guidance

Bug fixes from PR #268:
- Timestamp reads from envelope_data (not outer wrapper)
- Uses httpx consistently (not aiohttp in send_message tool)
- SIGNAL_DEBUG scoped to signal logger (not root)
- extract_images regex NOT modified (preserves group numbering)
- pairing.py NOT modified (no cross-platform side effects)
- No dual authorization (adapter defers to run.py for user auth)
- Wildcard uses set membership ('*' in set, not list equality)
- .zip default for PK magic bytes (not .docx)

No new Python dependencies — uses httpx (already core).
External requirement: signal-cli daemon (user-installed).

Tests: 30 new tests covering config, init, helpers, session source,
phone redaction, authorization, and send_message integration.

Co-authored-by: ibhagwan <ibhagwan@users.noreply.github.com>
This commit is contained in:
teknium1
2026-03-08 20:20:35 -07:00
parent 7a8778ac73
commit 24f549a692
9 changed files with 1232 additions and 2 deletions

View File

@@ -52,6 +52,10 @@ _TELEGRAM_RE = re.compile(
r"(bot)?(\d{8,}):([-A-Za-z0-9_]{30,})",
)
# E.164 phone numbers: +<country><number>, 7-15 digits
# Negative lookahead prevents matching hex strings or identifiers
_SIGNAL_PHONE_RE = re.compile(r"(\+[1-9]\d{6,14})(?![A-Za-z0-9])")
# Compile known prefix patterns into one alternation
_PREFIX_RE = re.compile(
r"(?<![A-Za-z0-9_-])(" + "|".join(_PREFIX_PATTERNS) + r")(?![A-Za-z0-9_-])"
@@ -101,6 +105,14 @@ def redact_sensitive_text(text: str) -> str:
return f"{prefix}{digits}:***"
text = _TELEGRAM_RE.sub(_redact_telegram, text)
# E.164 phone numbers (Signal, WhatsApp)
def _redact_phone(m):
phone = m.group(1)
if len(phone) <= 8:
return phone[:2] + "****" + phone[-2:]
return phone[:4] + "****" + phone[-4:]
text = _SIGNAL_PHONE_RE.sub(_redact_phone, text)
return text

View File

@@ -26,6 +26,7 @@ class Platform(Enum):
DISCORD = "discord"
WHATSAPP = "whatsapp"
SLACK = "slack"
SIGNAL = "signal"
HOMEASSISTANT = "homeassistant"
@@ -155,7 +156,16 @@ class GatewayConfig:
"""Return list of platforms that are enabled and configured."""
connected = []
for platform, config in self.platforms.items():
if config.enabled and (config.token or config.api_key):
if not config.enabled:
continue
# Platforms that use token/api_key auth
if config.token or config.api_key:
connected.append(platform)
# WhatsApp uses enabled flag only (bridge handles auth)
elif platform == Platform.WHATSAPP:
connected.append(platform)
# Signal uses extra dict for config (http_url + account)
elif platform == Platform.SIGNAL and config.extra.get("http_url"):
connected.append(platform)
return connected
@@ -379,6 +389,28 @@ def _apply_env_overrides(config: GatewayConfig) -> None:
name=os.getenv("SLACK_HOME_CHANNEL_NAME", ""),
)
# Signal
signal_url = os.getenv("SIGNAL_HTTP_URL")
signal_account = os.getenv("SIGNAL_ACCOUNT")
if signal_url and signal_account:
if Platform.SIGNAL not in config.platforms:
config.platforms[Platform.SIGNAL] = PlatformConfig()
config.platforms[Platform.SIGNAL].enabled = True
config.platforms[Platform.SIGNAL].extra.update({
"http_url": signal_url,
"account": signal_account,
"dm_policy": os.getenv("SIGNAL_DM_POLICY", "pairing"),
"group_policy": os.getenv("SIGNAL_GROUP_POLICY", "disabled"),
"ignore_stories": os.getenv("SIGNAL_IGNORE_STORIES", "true").lower() in ("true", "1", "yes"),
})
signal_home = os.getenv("SIGNAL_HOME_CHANNEL")
if signal_home:
config.platforms[Platform.SIGNAL].home_channel = HomeChannel(
platform=Platform.SIGNAL,
chat_id=signal_home,
name=os.getenv("SIGNAL_HOME_CHANNEL_NAME", "Home"),
)
# Home Assistant
hass_token = os.getenv("HASS_TOKEN")
if hass_token:

View File

@@ -838,6 +838,8 @@ class BasePlatformAdapter(ABC):
user_name: Optional[str] = None,
thread_id: Optional[str] = None,
chat_topic: Optional[str] = None,
user_id_alt: Optional[str] = None,
chat_id_alt: Optional[str] = None,
) -> SessionSource:
"""Helper to build a SessionSource for this platform."""
# Normalize empty topic to None
@@ -852,6 +854,8 @@ class BasePlatformAdapter(ABC):
user_name=user_name,
thread_id=str(thread_id) if thread_id else None,
chat_topic=chat_topic.strip() if chat_topic else None,
user_id_alt=user_id_alt,
chat_id_alt=chat_id_alt,
)
@abstractmethod

688
gateway/platforms/signal.py Normal file
View File

@@ -0,0 +1,688 @@
"""Signal messenger platform adapter.
Connects to a signal-cli daemon running in HTTP mode.
Inbound messages arrive via SSE (Server-Sent Events) streaming.
Outbound messages and actions use JSON-RPC 2.0 over HTTP.
Based on PR #268 by ibhagwan, rebuilt with bug fixes.
Requires:
- signal-cli installed and running: signal-cli daemon --http 127.0.0.1:8080
- SIGNAL_HTTP_URL and SIGNAL_ACCOUNT environment variables set
"""
import asyncio
import base64
import json
import logging
import os
import re
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Any
from urllib.parse import unquote
import httpx
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import (
BasePlatformAdapter,
MessageEvent,
MessageType,
SendResult,
cache_image_from_bytes,
cache_audio_from_bytes,
cache_document_from_bytes,
cache_image_from_url,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
SIGNAL_MAX_ATTACHMENT_SIZE = 100 * 1024 * 1024 # 100 MB
MAX_MESSAGE_LENGTH = 8000 # Signal message size limit
TYPING_INTERVAL = 8.0 # seconds between typing indicator refreshes
SSE_RETRY_DELAY_INITIAL = 2.0
SSE_RETRY_DELAY_MAX = 60.0
HEALTH_CHECK_INTERVAL = 30.0 # seconds between health checks
HEALTH_CHECK_STALE_THRESHOLD = 120.0 # seconds without SSE activity before concern
# E.164 phone number pattern for redaction
_PHONE_RE = re.compile(r"\+[1-9]\d{6,14}")
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _redact_phone(phone: str) -> str:
"""Redact a phone number for logging: +15551234567 -> +155****4567."""
if not phone:
return "<none>"
if len(phone) <= 8:
return phone[:2] + "****" + phone[-2:] if len(phone) > 4 else "****"
return phone[:4] + "****" + phone[-4:]
def _parse_comma_list(value: str) -> List[str]:
"""Split a comma-separated string into a list, stripping whitespace."""
return [v.strip() for v in value.split(",") if v.strip()]
def _guess_extension(data: bytes) -> str:
"""Guess file extension from magic bytes."""
if data[:4] == b"\x89PNG":
return ".png"
if data[:2] == b"\xff\xd8":
return ".jpg"
if data[:4] == b"GIF8":
return ".gif"
if len(data) >= 12 and data[:4] == b"RIFF" and data[8:12] == b"WEBP":
return ".webp"
if data[:4] == b"%PDF":
return ".pdf"
if len(data) >= 8 and data[4:8] == b"ftyp":
return ".mp4"
if data[:4] == b"OggS":
return ".ogg"
if len(data) >= 2 and data[0] == 0xFF and (data[1] & 0xE0) == 0xE0:
return ".mp3"
if data[:2] == b"PK":
return ".zip"
return ".bin"
def _is_image_ext(ext: str) -> bool:
return ext.lower() in (".jpg", ".jpeg", ".png", ".gif", ".webp")
def _is_audio_ext(ext: str) -> bool:
return ext.lower() in (".mp3", ".wav", ".ogg", ".m4a", ".aac")
def check_signal_requirements() -> bool:
"""Check if Signal is configured (has URL and account)."""
return bool(os.getenv("SIGNAL_HTTP_URL") and os.getenv("SIGNAL_ACCOUNT"))
# ---------------------------------------------------------------------------
# Signal Adapter
# ---------------------------------------------------------------------------
class SignalAdapter(BasePlatformAdapter):
"""Signal messenger adapter using signal-cli HTTP daemon."""
platform = Platform.SIGNAL
def __init__(self, config: PlatformConfig):
super().__init__(config, Platform.SIGNAL)
extra = config.extra or {}
self.http_url = extra.get("http_url", "http://127.0.0.1:8080").rstrip("/")
self.account = extra.get("account", "")
self.dm_policy = extra.get("dm_policy", "pairing")
self.group_policy = extra.get("group_policy", "disabled")
self.ignore_stories = extra.get("ignore_stories", True)
# Parse allowlists
allowed_str = os.getenv("SIGNAL_ALLOWED_USERS", "")
self.allowed_users = set(_parse_comma_list(allowed_str))
group_allowed_str = os.getenv("SIGNAL_GROUP_ALLOWED_USERS", "")
self.group_allow_from = set(_parse_comma_list(group_allowed_str))
# HTTP client
self.client: Optional[httpx.AsyncClient] = None
# Background tasks
self._sse_task: Optional[asyncio.Task] = None
self._health_monitor_task: Optional[asyncio.Task] = None
self._typing_tasks: Dict[str, asyncio.Task] = {}
self._running = False
self._last_sse_activity = 0.0
self._sse_response: Optional[httpx.Response] = None
# Pairing store (lazy import to avoid circular deps)
from gateway.pairing import PairingStore
self.pairing_store = PairingStore()
# Debug logging (scoped to this module, NOT root logger)
if os.getenv("SIGNAL_DEBUG", "").lower() in ("true", "1", "yes"):
logger.setLevel(logging.DEBUG)
logger.info("Signal adapter initialized: url=%s account=%s dm_policy=%s group_policy=%s",
self.http_url, _redact_phone(self.account), self.dm_policy, self.group_policy)
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
async def connect(self) -> bool:
"""Connect to signal-cli daemon and start SSE listener."""
if not self.http_url or not self.account:
logger.error("Signal: SIGNAL_HTTP_URL and SIGNAL_ACCOUNT are required")
return False
self.client = httpx.AsyncClient(timeout=30.0)
# Health check — verify signal-cli daemon is reachable
try:
resp = await self.client.get(f"{self.http_url}/api/v1/check", timeout=10.0)
if resp.status_code != 200:
logger.error("Signal: health check failed (status %d)", resp.status_code)
return False
except Exception as e:
logger.error("Signal: cannot reach signal-cli at %s: %s", self.http_url, e)
return False
self._running = True
self._last_sse_activity = time.time()
self._sse_task = asyncio.create_task(self._sse_listener())
self._health_monitor_task = asyncio.create_task(self._health_monitor())
logger.info("Signal: connected to %s", self.http_url)
return True
async def disconnect(self) -> None:
"""Stop SSE listener and clean up."""
self._running = False
if self._sse_task:
self._sse_task.cancel()
try:
await self._sse_task
except asyncio.CancelledError:
pass
if self._health_monitor_task:
self._health_monitor_task.cancel()
try:
await self._health_monitor_task
except asyncio.CancelledError:
pass
# Cancel all typing tasks
for task in self._typing_tasks.values():
task.cancel()
self._typing_tasks.clear()
if self.client:
await self.client.aclose()
self.client = None
logger.info("Signal: disconnected")
# ------------------------------------------------------------------
# SSE Streaming (inbound messages)
# ------------------------------------------------------------------
async def _sse_listener(self) -> None:
"""Listen for SSE events from signal-cli daemon."""
url = f"{self.http_url}/api/v1/events?account={self.account}"
backoff = SSE_RETRY_DELAY_INITIAL
while self._running:
try:
logger.debug("Signal SSE: connecting to %s", url)
async with self.client.stream(
"GET", url,
headers={"Accept": "text/event-stream"},
timeout=None,
) as response:
self._sse_response = response
backoff = SSE_RETRY_DELAY_INITIAL # Reset on successful connection
self._last_sse_activity = time.time()
logger.info("Signal SSE: connected")
buffer = ""
async for chunk in response.aiter_text():
if not self._running:
break
buffer += chunk
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.strip()
if not line:
continue
# Parse SSE data lines
if line.startswith("data:"):
data_str = line[5:].strip()
if not data_str:
continue
self._last_sse_activity = time.time()
try:
data = json.loads(data_str)
await self._handle_envelope(data)
except json.JSONDecodeError:
logger.debug("Signal SSE: invalid JSON: %s", data_str[:100])
except Exception:
logger.exception("Signal SSE: error handling event")
except asyncio.CancelledError:
break
except httpx.HTTPError as e:
if self._running:
logger.warning("Signal SSE: HTTP error: %s (reconnecting in %.0fs)", e, backoff)
except Exception as e:
if self._running:
logger.warning("Signal SSE: error: %s (reconnecting in %.0fs)", e, backoff)
if self._running:
await asyncio.sleep(backoff)
backoff = min(backoff * 2, SSE_RETRY_DELAY_MAX)
self._sse_response = None
# ------------------------------------------------------------------
# Health Monitor
# ------------------------------------------------------------------
async def _health_monitor(self) -> None:
"""Monitor SSE connection health and force reconnect if stale."""
while self._running:
await asyncio.sleep(HEALTH_CHECK_INTERVAL)
if not self._running:
break
elapsed = time.time() - self._last_sse_activity
if elapsed > HEALTH_CHECK_STALE_THRESHOLD:
logger.warning("Signal: SSE idle for %.0fs, checking daemon health", elapsed)
try:
resp = await self.client.get(
f"{self.http_url}/api/v1/check", timeout=10.0
)
if resp.status_code == 200:
# Daemon is alive but SSE is idle — update activity to
# avoid repeated warnings (connection may just be quiet)
self._last_sse_activity = time.time()
logger.debug("Signal: daemon healthy, SSE idle")
else:
logger.warning("Signal: health check failed (%d), forcing reconnect", resp.status_code)
self._force_reconnect()
except Exception as e:
logger.warning("Signal: health check error: %s, forcing reconnect", e)
self._force_reconnect()
def _force_reconnect(self) -> None:
"""Force SSE reconnection by closing the current response."""
if self._sse_response and not self._sse_response.is_stream_consumed:
try:
asyncio.create_task(self._sse_response.aclose())
except Exception:
pass
self._sse_response = None
# ------------------------------------------------------------------
# Message Handling
# ------------------------------------------------------------------
async def _handle_envelope(self, envelope: dict) -> None:
"""Process an incoming signal-cli envelope."""
# Unwrap nested envelope if present
envelope_data = envelope.get("envelope", envelope)
# Extract sender info
sender = (
envelope_data.get("sourceNumber")
or envelope_data.get("sourceUuid")
or envelope_data.get("source")
)
sender_name = envelope_data.get("sourceName", "")
sender_uuid = envelope_data.get("sourceUuid", "")
if not sender:
logger.debug("Signal: ignoring envelope with no sender")
return
# Filter stories
if self.ignore_stories and envelope_data.get("storyMessage"):
return
# Get data message (skip receipts, typing indicators, etc.)
data_message = envelope_data.get("dataMessage")
if not data_message:
return
# Check for group message
group_info = data_message.get("groupInfo")
group_id = group_info.get("groupId") if group_info else None
is_group = bool(group_id)
# Authorization check — delegated to run.py's _is_user_authorized()
# for DM allowlists. We only do group policy filtering here since
# that's Signal-specific and not in the base auth system.
if is_group:
if self.group_policy == "disabled":
logger.debug("Signal: ignoring group message (group_policy=disabled)")
return
if self.group_policy == "allowlist":
if "*" not in self.group_allow_from and group_id not in self.group_allow_from:
logger.debug("Signal: group %s not in allowlist", group_id[:8] if group_id else "?")
return
# group_policy == "open" — allow through
# DM policy "open" — for non-group, let all through to run.py auth
# (run.py will still check SIGNAL_ALLOWED_USERS / pairing)
# DM policy "pairing" / "allowlist" — handled by run.py
# Build chat info
chat_id = sender if not is_group else f"group:{group_id}"
chat_type = "group" if is_group else "dm"
# Extract text
text = data_message.get("message", "")
# Process attachments
attachments_data = data_message.get("attachments", [])
image_paths = []
audio_path = None
document_paths = []
if attachments_data and not getattr(self, "ignore_attachments", False):
for att in attachments_data:
att_id = att.get("id")
att_size = att.get("size", 0)
if not att_id:
continue
if att_size > SIGNAL_MAX_ATTACHMENT_SIZE:
logger.warning("Signal: attachment too large (%d bytes), skipping", att_size)
continue
try:
cached_path, ext = await self._fetch_attachment(att_id)
if cached_path:
if _is_image_ext(ext):
image_paths.append(cached_path)
elif _is_audio_ext(ext):
audio_path = cached_path
else:
document_paths.append(cached_path)
except Exception:
logger.exception("Signal: failed to fetch attachment %s", att_id)
# Build session source
source = self.build_source(
chat_id=chat_id,
chat_name=group_info.get("groupName") if group_info else sender_name,
chat_type=chat_type,
user_id=sender,
user_name=sender_name or sender,
user_id_alt=sender_uuid if sender_uuid else None,
chat_id_alt=group_id if is_group else None,
)
# Determine message type
msg_type = MessageType.TEXT
if audio_path:
msg_type = MessageType.VOICE
elif image_paths:
msg_type = MessageType.IMAGE
# Parse timestamp from envelope data (milliseconds since epoch)
ts_ms = envelope_data.get("timestamp", 0)
if ts_ms:
try:
timestamp = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc)
except (ValueError, OSError):
timestamp = datetime.now(tz=timezone.utc)
else:
timestamp = datetime.now(tz=timezone.utc)
# Build and dispatch event
event = MessageEvent(
source=source,
text=text or "",
message_type=msg_type,
image_paths=image_paths,
audio_path=audio_path,
document_paths=document_paths,
timestamp=timestamp,
)
logger.debug("Signal: message from %s in %s: %s",
_redact_phone(sender), chat_id[:20], (text or "")[:50])
await self.handle_message(event)
# ------------------------------------------------------------------
# Attachment Handling
# ------------------------------------------------------------------
async def _fetch_attachment(self, attachment_id: str) -> tuple:
"""Fetch an attachment via JSON-RPC and cache it. Returns (path, ext)."""
result = await self._rpc("getAttachment", {
"account": self.account,
"attachmentId": attachment_id,
})
if not result:
return None, ""
# Result is base64-encoded file content
raw_data = base64.b64decode(result)
ext = _guess_extension(raw_data)
if _is_image_ext(ext):
path = cache_image_from_bytes(raw_data, ext)
elif _is_audio_ext(ext):
path = cache_audio_from_bytes(raw_data, ext)
else:
path = cache_document_from_bytes(raw_data, ext)
return path, ext
# ------------------------------------------------------------------
# JSON-RPC Communication
# ------------------------------------------------------------------
async def _rpc(self, method: str, params: dict, rpc_id: str = None) -> Any:
"""Send a JSON-RPC 2.0 request to signal-cli daemon."""
if not self.client:
logger.warning("Signal: RPC called but client not connected")
return None
if rpc_id is None:
rpc_id = f"{method}_{int(time.time() * 1000)}"
payload = {
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": rpc_id,
}
try:
resp = await self.client.post(
f"{self.http_url}/api/v1/rpc",
json=payload,
timeout=30.0,
)
resp.raise_for_status()
data = resp.json()
if "error" in data:
logger.warning("Signal RPC error (%s): %s", method, data["error"])
return None
return data.get("result")
except Exception as e:
logger.warning("Signal RPC %s failed: %s", method, e)
return None
# ------------------------------------------------------------------
# Sending
# ------------------------------------------------------------------
async def send(
self,
chat_id: str,
text: str,
reply_to_message_id: Optional[str] = None,
**kwargs,
) -> SendResult:
"""Send a text message."""
await self._stop_typing_indicator(chat_id)
params: Dict[str, Any] = {
"account": self.account,
"message": text,
}
if chat_id.startswith("group:"):
params["groupId"] = chat_id[6:]
else:
params["recipient"] = [chat_id]
result = await self._rpc("send", params)
if result is not None:
return SendResult(success=True)
return SendResult(success=False, error="RPC send failed")
async def send_typing(self, chat_id: str) -> None:
"""Send a typing indicator."""
params: Dict[str, Any] = {
"account": self.account,
}
if chat_id.startswith("group:"):
params["groupId"] = chat_id[6:]
else:
params["recipient"] = [chat_id]
await self._rpc("sendTyping", params, rpc_id="typing")
async def send_image(
self,
chat_id: str,
image_url: str,
caption: Optional[str] = None,
**kwargs,
) -> SendResult:
"""Send an image. Supports http(s):// and file:// URLs."""
await self._stop_typing_indicator(chat_id)
# Resolve image to local path
if image_url.startswith("file://"):
file_path = unquote(image_url[7:])
else:
# Download remote image to cache
try:
file_path = await cache_image_from_url(image_url)
except Exception as e:
logger.warning("Signal: failed to download image: %s", e)
return SendResult(success=False, error=str(e))
if not file_path or not Path(file_path).exists():
return SendResult(success=False, error="Image file not found")
# Validate size
file_size = Path(file_path).stat().st_size
if file_size > SIGNAL_MAX_ATTACHMENT_SIZE:
return SendResult(success=False, error=f"Image too large ({file_size} bytes)")
params: Dict[str, Any] = {
"account": self.account,
"message": caption or "",
"attachments": [file_path],
}
if chat_id.startswith("group:"):
params["groupId"] = chat_id[6:]
else:
params["recipient"] = [chat_id]
result = await self._rpc("send", params)
if result is not None:
return SendResult(success=True)
return SendResult(success=False, error="RPC send with attachment failed")
async def send_document(
self,
chat_id: str,
file_path: str,
caption: Optional[str] = None,
filename: Optional[str] = None,
**kwargs,
) -> SendResult:
"""Send a document/file attachment."""
await self._stop_typing_indicator(chat_id)
if not Path(file_path).exists():
return SendResult(success=False, error="File not found")
params: Dict[str, Any] = {
"account": self.account,
"message": caption or "",
"attachments": [file_path],
}
if chat_id.startswith("group:"):
params["groupId"] = chat_id[6:]
else:
params["recipient"] = [chat_id]
result = await self._rpc("send", params)
if result is not None:
return SendResult(success=True)
return SendResult(success=False, error="RPC send document failed")
# ------------------------------------------------------------------
# Typing Indicators
# ------------------------------------------------------------------
async def _start_typing_indicator(self, chat_id: str) -> None:
"""Start a typing indicator loop for a chat."""
if chat_id in self._typing_tasks:
return # Already running
async def _typing_loop():
try:
while True:
await self.send_typing(chat_id)
await asyncio.sleep(TYPING_INTERVAL)
except asyncio.CancelledError:
pass
self._typing_tasks[chat_id] = asyncio.create_task(_typing_loop())
async def _stop_typing_indicator(self, chat_id: str) -> None:
"""Stop a typing indicator loop for a chat."""
task = self._typing_tasks.pop(chat_id, None)
if task:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# ------------------------------------------------------------------
# Chat Info
# ------------------------------------------------------------------
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
"""Get information about a chat/contact."""
if chat_id.startswith("group:"):
return {
"name": chat_id,
"type": "group",
"chat_id": chat_id,
}
# Try to resolve contact name
result = await self._rpc("getContact", {
"account": self.account,
"contactAddress": chat_id,
})
name = chat_id
if result and isinstance(result, dict):
name = result.get("name") or result.get("profileName") or chat_id
return {
"name": name,
"type": "dm",
"chat_id": chat_id,
}

View File

@@ -591,6 +591,13 @@ class GatewayRunner:
return None
return SlackAdapter(config)
elif platform == Platform.SIGNAL:
from gateway.platforms.signal import SignalAdapter, check_signal_requirements
if not check_signal_requirements():
logger.warning("Signal: SIGNAL_HTTP_URL or SIGNAL_ACCOUNT not configured")
return None
return SignalAdapter(config)
elif platform == Platform.HOMEASSISTANT:
from gateway.platforms.homeassistant import HomeAssistantAdapter, check_ha_requirements
if not check_ha_requirements():
@@ -626,12 +633,14 @@ class GatewayRunner:
Platform.DISCORD: "DISCORD_ALLOWED_USERS",
Platform.WHATSAPP: "WHATSAPP_ALLOWED_USERS",
Platform.SLACK: "SLACK_ALLOWED_USERS",
Platform.SIGNAL: "SIGNAL_ALLOWED_USERS",
}
platform_allow_all_map = {
Platform.TELEGRAM: "TELEGRAM_ALLOW_ALL_USERS",
Platform.DISCORD: "DISCORD_ALLOW_ALL_USERS",
Platform.WHATSAPP: "WHATSAPP_ALLOW_ALL_USERS",
Platform.SLACK: "SLACK_ALLOW_ALL_USERS",
Platform.SIGNAL: "SIGNAL_ALLOW_ALL_USERS",
}
# Per-platform allow-all flag (e.g., DISCORD_ALLOW_ALL_USERS=true)

View File

@@ -45,6 +45,8 @@ class SessionSource:
user_name: Optional[str] = None
thread_id: Optional[str] = None # For forum topics, Discord threads, etc.
chat_topic: Optional[str] = None # Channel topic/description (Discord, Slack)
user_id_alt: Optional[str] = None # Signal UUID (alternative to phone number)
chat_id_alt: Optional[str] = None # Signal group internal ID
@property
def description(self) -> str:
@@ -68,7 +70,7 @@ class SessionSource:
return ", ".join(parts)
def to_dict(self) -> Dict[str, Any]:
return {
d = {
"platform": self.platform.value,
"chat_id": self.chat_id,
"chat_name": self.chat_name,
@@ -78,6 +80,11 @@ class SessionSource:
"thread_id": self.thread_id,
"chat_topic": self.chat_topic,
}
if self.user_id_alt:
d["user_id_alt"] = self.user_id_alt
if self.chat_id_alt:
d["chat_id_alt"] = self.chat_id_alt
return d
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "SessionSource":
@@ -90,6 +97,8 @@ class SessionSource:
user_name=data.get("user_name"),
thread_id=data.get("thread_id"),
chat_topic=data.get("chat_topic"),
user_id_alt=data.get("user_id_alt"),
chat_id_alt=data.get("chat_id_alt"),
)
@classmethod

View File

@@ -507,6 +507,12 @@ _PLATFORMS = [
"emoji": "📲",
"token_var": "WHATSAPP_ENABLED",
},
{
"key": "signal",
"label": "Signal",
"emoji": "📡",
"token_var": "SIGNAL_HTTP_URL",
},
]
@@ -525,6 +531,13 @@ def _platform_status(platform: dict) -> str:
return "configured + paired"
return "enabled, not paired"
return "not configured"
if platform.get("key") == "signal":
account = get_env_value("SIGNAL_ACCOUNT")
if val and account:
return "configured"
if val or account:
return "partially configured"
return "not configured"
if val:
return "configured"
return "not configured"
@@ -650,6 +663,138 @@ def _is_service_running() -> bool:
return len(find_gateway_pids()) > 0
def _setup_signal():
"""Interactive setup for Signal messenger."""
import shutil
print()
print(color(" ─── 📡 Signal Setup ───", Colors.CYAN))
existing_url = get_env_value("SIGNAL_HTTP_URL")
existing_account = get_env_value("SIGNAL_ACCOUNT")
if existing_url and existing_account:
print()
print_success("Signal is already configured.")
if not prompt_yes_no(" Reconfigure Signal?", False):
return
# Check if signal-cli is available
print()
if shutil.which("signal-cli"):
print_success("signal-cli found on PATH.")
else:
print_warning("signal-cli not found on PATH.")
print_info(" Signal requires signal-cli running as an HTTP daemon.")
print_info(" Install options:")
print_info(" Linux: sudo apt install signal-cli")
print_info(" or download from https://github.com/AsamK/signal-cli")
print_info(" macOS: brew install signal-cli")
print_info(" Docker: bbernhard/signal-cli-rest-api")
print()
print_info(" After installing, link your account and start the daemon:")
print_info(" signal-cli link -n \"HermesAgent\"")
print_info(" signal-cli --account +YOURNUMBER daemon --http 127.0.0.1:8080")
print()
# HTTP URL
print()
print_info(" Enter the URL where signal-cli HTTP daemon is running.")
default_url = existing_url or "http://127.0.0.1:8080"
try:
url = input(f" HTTP URL [{default_url}]: ").strip() or default_url
except (EOFError, KeyboardInterrupt):
print("\n Setup cancelled.")
return
# Test connectivity
print_info(" Testing connection...")
try:
import httpx
resp = httpx.get(f"{url.rstrip('/')}/api/v1/check", timeout=10.0)
if resp.status_code == 200:
print_success(" signal-cli daemon is reachable!")
else:
print_warning(f" signal-cli responded with status {resp.status_code}.")
if not prompt_yes_no(" Continue anyway?", False):
return
except Exception as e:
print_warning(f" Could not reach signal-cli at {url}: {e}")
if not prompt_yes_no(" Save this URL anyway? (you can start signal-cli later)", True):
return
save_env_value("SIGNAL_HTTP_URL", url)
# Account phone number
print()
print_info(" Enter your Signal account phone number in E.164 format.")
print_info(" Example: +15551234567")
default_account = existing_account or ""
try:
account = input(f" Account number{f' [{default_account}]' if default_account else ''}: ").strip()
if not account:
account = default_account
except (EOFError, KeyboardInterrupt):
print("\n Setup cancelled.")
return
if not account:
print_error(" Account number is required.")
return
save_env_value("SIGNAL_ACCOUNT", account)
# Allowed users
print()
print_info(" The gateway DENIES all users by default for security.")
print_info(" Enter phone numbers or UUIDs of allowed users (comma-separated).")
existing_allowed = get_env_value("SIGNAL_ALLOWED_USERS") or ""
default_allowed = existing_allowed or account
try:
allowed = input(f" Allowed users [{default_allowed}]: ").strip() or default_allowed
except (EOFError, KeyboardInterrupt):
print("\n Setup cancelled.")
return
save_env_value("SIGNAL_ALLOWED_USERS", allowed)
# DM policy
print()
policies = ["pairing (default — new users get a pairing code to approve)",
"allowlist (only explicitly listed users)",
"open (anyone can message)"]
dm_choice = prompt_choice(" DM access policy:", policies, 0)
dm_policy = ["pairing", "allowlist", "open"][dm_choice]
save_env_value("SIGNAL_DM_POLICY", dm_policy)
# Group policy
print()
group_policies = ["disabled (default — ignore group messages)",
"allowlist (only specific groups)",
"open (respond in any group the bot is in)"]
group_choice = prompt_choice(" Group message policy:", group_policies, 0)
group_policy = ["disabled", "allowlist", "open"][group_choice]
save_env_value("SIGNAL_GROUP_POLICY", group_policy)
if group_policy == "allowlist":
print()
print_info(" Enter group IDs to allow (comma-separated).")
existing_groups = get_env_value("SIGNAL_GROUP_ALLOWED_USERS") or ""
try:
groups = input(f" Group IDs [{existing_groups}]: ").strip() or existing_groups
except (EOFError, KeyboardInterrupt):
print("\n Setup cancelled.")
return
if groups:
save_env_value("SIGNAL_GROUP_ALLOWED_USERS", groups)
print()
print_success("Signal configured!")
print_info(f" URL: {url}")
print_info(f" Account: {account}")
print_info(f" DM policy: {dm_policy}")
print_info(f" Group policy: {group_policy}")
def gateway_setup():
"""Interactive setup for messaging platforms + gateway service."""
@@ -702,6 +847,8 @@ def gateway_setup():
if platform["key"] == "whatsapp":
_setup_whatsapp()
elif platform["key"] == "signal":
_setup_signal()
else:
_setup_standard_platform(platform)

View File

@@ -0,0 +1,289 @@
"""Tests for Signal messenger platform adapter."""
import json
import pytest
from unittest.mock import MagicMock, patch, AsyncMock
from gateway.config import Platform, PlatformConfig
# ---------------------------------------------------------------------------
# Platform & Config
# ---------------------------------------------------------------------------
class TestSignalPlatformEnum:
def test_signal_enum_exists(self):
assert Platform.SIGNAL.value == "signal"
def test_signal_in_platform_list(self):
platforms = [p.value for p in Platform]
assert "signal" in platforms
class TestSignalConfigLoading:
def test_apply_env_overrides_signal(self, monkeypatch):
monkeypatch.setenv("SIGNAL_HTTP_URL", "http://localhost:9090")
monkeypatch.setenv("SIGNAL_ACCOUNT", "+15551234567")
monkeypatch.setenv("SIGNAL_DM_POLICY", "open")
monkeypatch.setenv("SIGNAL_GROUP_POLICY", "allowlist")
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
assert Platform.SIGNAL in config.platforms
sc = config.platforms[Platform.SIGNAL]
assert sc.enabled is True
assert sc.extra["http_url"] == "http://localhost:9090"
assert sc.extra["account"] == "+15551234567"
assert sc.extra["dm_policy"] == "open"
assert sc.extra["group_policy"] == "allowlist"
def test_signal_not_loaded_without_both_vars(self, monkeypatch):
monkeypatch.setenv("SIGNAL_HTTP_URL", "http://localhost:9090")
# No SIGNAL_ACCOUNT
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
assert Platform.SIGNAL not in config.platforms
def test_connected_platforms_includes_signal(self, monkeypatch):
monkeypatch.setenv("SIGNAL_HTTP_URL", "http://localhost:8080")
monkeypatch.setenv("SIGNAL_ACCOUNT", "+15551234567")
from gateway.config import GatewayConfig, _apply_env_overrides
config = GatewayConfig()
_apply_env_overrides(config)
connected = config.get_connected_platforms()
assert Platform.SIGNAL in connected
# ---------------------------------------------------------------------------
# Adapter Init & Helpers
# ---------------------------------------------------------------------------
class TestSignalAdapterInit:
def _make_config(self, **extra):
config = PlatformConfig()
config.enabled = True
config.extra = {
"http_url": "http://localhost:8080",
"account": "+15551234567",
"dm_policy": "pairing",
"group_policy": "disabled",
**extra,
}
return config
def test_init_parses_config(self, monkeypatch):
monkeypatch.setenv("SIGNAL_ALLOWED_USERS", "+15559876543,+15551111111")
monkeypatch.setenv("SIGNAL_GROUP_ALLOWED_USERS", "group123,group456")
monkeypatch.delenv("SIGNAL_DEBUG", raising=False)
from gateway.platforms.signal import SignalAdapter
adapter = SignalAdapter(self._make_config())
assert adapter.http_url == "http://localhost:8080"
assert adapter.account == "+15551234567"
assert adapter.dm_policy == "pairing"
assert adapter.group_policy == "disabled"
assert "+15559876543" in adapter.allowed_users
assert "+15551111111" in adapter.allowed_users
assert "group123" in adapter.group_allow_from
def test_init_empty_allowlist(self, monkeypatch):
monkeypatch.setenv("SIGNAL_ALLOWED_USERS", "")
monkeypatch.setenv("SIGNAL_GROUP_ALLOWED_USERS", "")
monkeypatch.delenv("SIGNAL_DEBUG", raising=False)
from gateway.platforms.signal import SignalAdapter
adapter = SignalAdapter(self._make_config())
assert len(adapter.allowed_users) == 0
assert len(adapter.group_allow_from) == 0
def test_init_strips_trailing_slash(self, monkeypatch):
monkeypatch.setenv("SIGNAL_ALLOWED_USERS", "")
monkeypatch.setenv("SIGNAL_GROUP_ALLOWED_USERS", "")
monkeypatch.delenv("SIGNAL_DEBUG", raising=False)
from gateway.platforms.signal import SignalAdapter
adapter = SignalAdapter(self._make_config(http_url="http://localhost:8080/"))
assert adapter.http_url == "http://localhost:8080"
class TestSignalHelpers:
def test_redact_phone_long(self):
from gateway.platforms.signal import _redact_phone
assert _redact_phone("+15551234567") == "+155****4567"
def test_redact_phone_short(self):
from gateway.platforms.signal import _redact_phone
assert _redact_phone("+12345") == "+1****45"
def test_redact_phone_empty(self):
from gateway.platforms.signal import _redact_phone
assert _redact_phone("") == "<none>"
def test_parse_comma_list(self):
from gateway.platforms.signal import _parse_comma_list
assert _parse_comma_list("+1234, +5678 , +9012") == ["+1234", "+5678", "+9012"]
assert _parse_comma_list("") == []
assert _parse_comma_list(" , , ") == []
def test_guess_extension_png(self):
from gateway.platforms.signal import _guess_extension
assert _guess_extension(b"\x89PNG\r\n\x1a\n" + b"\x00" * 100) == ".png"
def test_guess_extension_jpeg(self):
from gateway.platforms.signal import _guess_extension
assert _guess_extension(b"\xff\xd8\xff\xe0" + b"\x00" * 100) == ".jpg"
def test_guess_extension_pdf(self):
from gateway.platforms.signal import _guess_extension
assert _guess_extension(b"%PDF-1.4" + b"\x00" * 100) == ".pdf"
def test_guess_extension_zip(self):
from gateway.platforms.signal import _guess_extension
assert _guess_extension(b"PK\x03\x04" + b"\x00" * 100) == ".zip"
def test_guess_extension_mp4(self):
from gateway.platforms.signal import _guess_extension
assert _guess_extension(b"\x00\x00\x00\x18ftypisom" + b"\x00" * 100) == ".mp4"
def test_guess_extension_unknown(self):
from gateway.platforms.signal import _guess_extension
assert _guess_extension(b"\x00\x01\x02\x03" * 10) == ".bin"
def test_is_image_ext(self):
from gateway.platforms.signal import _is_image_ext
assert _is_image_ext(".png") is True
assert _is_image_ext(".jpg") is True
assert _is_image_ext(".gif") is True
assert _is_image_ext(".pdf") is False
def test_is_audio_ext(self):
from gateway.platforms.signal import _is_audio_ext
assert _is_audio_ext(".mp3") is True
assert _is_audio_ext(".ogg") is True
assert _is_audio_ext(".png") is False
def test_check_requirements(self, monkeypatch):
from gateway.platforms.signal import check_signal_requirements
monkeypatch.setenv("SIGNAL_HTTP_URL", "http://localhost:8080")
monkeypatch.setenv("SIGNAL_ACCOUNT", "+15551234567")
assert check_signal_requirements() is True
def test_check_requirements_missing(self, monkeypatch):
from gateway.platforms.signal import check_signal_requirements
monkeypatch.delenv("SIGNAL_HTTP_URL", raising=False)
monkeypatch.delenv("SIGNAL_ACCOUNT", raising=False)
assert check_signal_requirements() is False
# ---------------------------------------------------------------------------
# Session Source
# ---------------------------------------------------------------------------
class TestSignalSessionSource:
def test_session_source_alt_fields(self):
from gateway.session import SessionSource
source = SessionSource(
platform=Platform.SIGNAL,
chat_id="+15551234567",
user_id="+15551234567",
user_id_alt="uuid:abc-123",
chat_id_alt=None,
)
d = source.to_dict()
assert d["user_id_alt"] == "uuid:abc-123"
assert "chat_id_alt" not in d # None fields excluded
def test_session_source_roundtrip(self):
from gateway.session import SessionSource
source = SessionSource(
platform=Platform.SIGNAL,
chat_id="group:xyz",
chat_type="group",
user_id="+15551234567",
user_id_alt="uuid:abc",
chat_id_alt="xyz",
)
d = source.to_dict()
restored = SessionSource.from_dict(d)
assert restored.user_id_alt == "uuid:abc"
assert restored.chat_id_alt == "xyz"
assert restored.platform == Platform.SIGNAL
# ---------------------------------------------------------------------------
# Phone Redaction in agent/redact.py
# ---------------------------------------------------------------------------
class TestSignalPhoneRedaction:
def test_us_number(self):
from agent.redact import redact_sensitive_text
result = redact_sensitive_text("Call +15551234567 now")
assert "+15551234567" not in result
assert "+155" in result # Prefix preserved
assert "4567" in result # Suffix preserved
def test_uk_number(self):
from agent.redact import redact_sensitive_text
result = redact_sensitive_text("UK: +442071838750")
assert "+442071838750" not in result
assert "****" in result
def test_multiple_numbers(self):
from agent.redact import redact_sensitive_text
text = "From +15551234567 to +442071838750"
result = redact_sensitive_text(text)
assert "+15551234567" not in result
assert "+442071838750" not in result
def test_short_number_not_matched(self):
from agent.redact import redact_sensitive_text
result = redact_sensitive_text("Code: +12345")
# 5 digits after + is below the 7-digit minimum
assert "+12345" in result # Too short to redact
# ---------------------------------------------------------------------------
# Authorization in run.py
# ---------------------------------------------------------------------------
class TestSignalAuthorization:
def test_signal_in_allowlist_maps(self):
"""Signal should be in the platform auth maps."""
from gateway.run import GatewayRunner
from gateway.config import GatewayConfig
gw = GatewayRunner.__new__(GatewayRunner)
gw.config = GatewayConfig()
gw.pairing_store = MagicMock()
gw.pairing_store.is_approved.return_value = False
source = MagicMock()
source.platform = Platform.SIGNAL
source.user_id = "+15559999999"
# No allowlists set — should check GATEWAY_ALLOW_ALL_USERS
with patch.dict("os.environ", {}, clear=True):
result = gw._is_user_authorized(source)
assert result is False
# ---------------------------------------------------------------------------
# Send Message Tool
# ---------------------------------------------------------------------------
class TestSignalSendMessage:
def test_signal_in_platform_map(self):
"""Signal should be in the send_message tool's platform map."""
from tools.send_message_tool import send_message_tool
# Just verify the import works and Signal is a valid platform
from gateway.config import Platform
assert Platform.SIGNAL.value == "signal"

View File

@@ -8,6 +8,7 @@ human-friendly channel names to IDs. Works in both CLI and gateway contexts.
import json
import logging
import os
import time
logger = logging.getLogger(__name__)
@@ -107,6 +108,7 @@ def _handle_send(args):
"discord": Platform.DISCORD,
"slack": Platform.SLACK,
"whatsapp": Platform.WHATSAPP,
"signal": Platform.SIGNAL,
}
platform = platform_map.get(platform_name)
if not platform:
@@ -160,6 +162,8 @@ async def _send_to_platform(platform, pconfig, chat_id, message):
return await _send_discord(pconfig.token, chat_id, message)
elif platform == Platform.SLACK:
return await _send_slack(pconfig.token, chat_id, message)
elif platform == Platform.SIGNAL:
return await _send_signal(pconfig.extra, chat_id, message)
return {"error": f"Direct sending not yet implemented for {platform.value}"}
@@ -219,6 +223,42 @@ async def _send_slack(token, chat_id, message):
return {"error": f"Slack send failed: {e}"}
async def _send_signal(extra, chat_id, message):
"""Send via signal-cli JSON-RPC API."""
try:
import httpx
except ImportError:
return {"error": "httpx not installed"}
try:
http_url = extra.get("http_url", "http://127.0.0.1:8080").rstrip("/")
account = extra.get("account", "")
if not account:
return {"error": "Signal account not configured"}
params = {"account": account, "message": message}
if chat_id.startswith("group:"):
params["groupId"] = chat_id[6:]
else:
params["recipient"] = [chat_id]
payload = {
"jsonrpc": "2.0",
"method": "send",
"params": params,
"id": f"send_{int(time.time() * 1000)}",
}
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(f"{http_url}/api/v1/rpc", json=payload)
resp.raise_for_status()
data = resp.json()
if "error" in data:
return {"error": f"Signal RPC error: {data['error']}"}
return {"success": True, "platform": "signal", "chat_id": chat_id}
except Exception as e:
return {"error": f"Signal send failed: {e}"}
def _check_send_message():
"""Gate send_message on gateway running (always available on messaging platforms)."""
platform = os.getenv("HERMES_SESSION_PLATFORM", "")