When TELEGRAM_WEBHOOK_URL is set, the adapter starts an HTTP webhook server (via python-telegram-bot's start_webhook()) instead of long polling. This enables cloud platforms like Fly.io and Railway to auto-wake suspended machines on inbound HTTP traffic. Polling remains the default — no behavior change unless the env var is set. Env vars: TELEGRAM_WEBHOOK_URL Public HTTPS URL for Telegram to push to TELEGRAM_WEBHOOK_PORT Local listen port (default 8443) TELEGRAM_WEBHOOK_SECRET Secret token for update verification Cherry-picked and adapted from PR #2022 by SHL0MS. Preserved all current main enhancements (network error recovery, polling conflict detection, DM topics setup). Co-authored-by: SHL0MS <SHL0MS@users.noreply.github.com>
2116 lines
90 KiB
Python
2116 lines
90 KiB
Python
"""
|
||
Telegram platform adapter.
|
||
|
||
Uses python-telegram-bot library for:
|
||
- Receiving messages from users/groups
|
||
- Sending responses back
|
||
- Handling media and commands
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
from typing import Dict, List, Optional, Any
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
try:
|
||
from telegram import Update, Bot, Message
|
||
from telegram.ext import (
|
||
Application,
|
||
CommandHandler,
|
||
MessageHandler as TelegramMessageHandler,
|
||
ContextTypes,
|
||
filters,
|
||
)
|
||
from telegram.constants import ParseMode, ChatType
|
||
from telegram.request import HTTPXRequest
|
||
TELEGRAM_AVAILABLE = True
|
||
except ImportError:
|
||
TELEGRAM_AVAILABLE = False
|
||
Update = Any
|
||
Bot = Any
|
||
Message = Any
|
||
Application = Any
|
||
CommandHandler = Any
|
||
TelegramMessageHandler = Any
|
||
HTTPXRequest = Any
|
||
filters = None
|
||
ParseMode = None
|
||
ChatType = None
|
||
|
||
# Mock ContextTypes so type annotations using ContextTypes.DEFAULT_TYPE
|
||
# don't crash during class definition when the library isn't installed.
|
||
class _MockContextTypes:
|
||
DEFAULT_TYPE = Any
|
||
ContextTypes = _MockContextTypes
|
||
|
||
import sys
|
||
from pathlib import Path as _Path
|
||
sys.path.insert(0, str(_Path(__file__).resolve().parents[2]))
|
||
|
||
from gateway.config import Platform, PlatformConfig
|
||
from gateway.platforms.base import (
|
||
BasePlatformAdapter,
|
||
MessageEvent,
|
||
MessageType,
|
||
SendResult,
|
||
cache_image_from_bytes,
|
||
cache_audio_from_bytes,
|
||
cache_document_from_bytes,
|
||
SUPPORTED_DOCUMENT_TYPES,
|
||
)
|
||
from gateway.platforms.telegram_network import (
|
||
TelegramFallbackTransport,
|
||
discover_fallback_ips,
|
||
parse_fallback_ip_env,
|
||
)
|
||
|
||
|
||
def check_telegram_requirements() -> bool:
|
||
"""Check if Telegram dependencies are available."""
|
||
return TELEGRAM_AVAILABLE
|
||
|
||
|
||
# Matches every character that MarkdownV2 requires to be backslash-escaped
|
||
# when it appears outside a code span or fenced code block.
|
||
_MDV2_ESCAPE_RE = re.compile(r'([_*\[\]()~`>#\+\-=|{}.!\\])')
|
||
|
||
|
||
def _escape_mdv2(text: str) -> str:
|
||
"""Escape Telegram MarkdownV2 special characters with a preceding backslash."""
|
||
return _MDV2_ESCAPE_RE.sub(r'\\\1', text)
|
||
|
||
|
||
def _strip_mdv2(text: str) -> str:
|
||
"""Strip MarkdownV2 escape backslashes to produce clean plain text.
|
||
|
||
Also removes MarkdownV2 formatting markers so the fallback
|
||
doesn't show stray syntax characters from format_message conversion.
|
||
"""
|
||
# Remove escape backslashes before special characters
|
||
cleaned = re.sub(r'\\([_*\[\]()~`>#\+\-=|{}.!\\])', r'\1', text)
|
||
# Remove MarkdownV2 bold markers that format_message converted from **bold**
|
||
cleaned = re.sub(r'\*([^*]+)\*', r'\1', cleaned)
|
||
# Remove MarkdownV2 italic markers that format_message converted from *italic*
|
||
# Use word boundary (\b) to avoid breaking snake_case like my_variable_name
|
||
cleaned = re.sub(r'(?<!\w)_([^_]+)_(?!\w)', r'\1', cleaned)
|
||
# Remove MarkdownV2 strikethrough markers (~text~ → text)
|
||
cleaned = re.sub(r'~([^~]+)~', r'\1', cleaned)
|
||
# Remove MarkdownV2 spoiler markers (||text|| → text)
|
||
cleaned = re.sub(r'\|\|([^|]+)\|\|', r'\1', cleaned)
|
||
return cleaned
|
||
|
||
|
||
class TelegramAdapter(BasePlatformAdapter):
|
||
"""
|
||
Telegram bot adapter.
|
||
|
||
Handles:
|
||
- Receiving messages from users and groups
|
||
- Sending responses with Telegram markdown
|
||
- Forum topics (thread_id support)
|
||
- Media messages
|
||
"""
|
||
|
||
# Telegram message limits
|
||
MAX_MESSAGE_LENGTH = 4096
|
||
MEDIA_GROUP_WAIT_SECONDS = 0.8
|
||
|
||
def __init__(self, config: PlatformConfig):
|
||
super().__init__(config, Platform.TELEGRAM)
|
||
self._app: Optional[Application] = None
|
||
self._bot: Optional[Bot] = None
|
||
self._webhook_mode: bool = False
|
||
self._mention_patterns = self._compile_mention_patterns()
|
||
self._reply_to_mode: str = getattr(config, 'reply_to_mode', 'first') or 'first'
|
||
# Buffer rapid/album photo updates so Telegram image bursts are handled
|
||
# as a single MessageEvent instead of self-interrupting multiple turns.
|
||
self._media_batch_delay_seconds = float(os.getenv("HERMES_TELEGRAM_MEDIA_BATCH_DELAY_SECONDS", "0.8"))
|
||
self._pending_photo_batches: Dict[str, MessageEvent] = {}
|
||
self._pending_photo_batch_tasks: Dict[str, asyncio.Task] = {}
|
||
self._media_group_events: Dict[str, MessageEvent] = {}
|
||
self._media_group_tasks: Dict[str, asyncio.Task] = {}
|
||
# Buffer rapid text messages so Telegram client-side splits of long
|
||
# messages are aggregated into a single MessageEvent.
|
||
self._text_batch_delay_seconds = float(os.getenv("HERMES_TELEGRAM_TEXT_BATCH_DELAY_SECONDS", "0.6"))
|
||
self._pending_text_batches: Dict[str, MessageEvent] = {}
|
||
self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {}
|
||
self._token_lock_identity: Optional[str] = None
|
||
self._polling_error_task: Optional[asyncio.Task] = None
|
||
self._polling_conflict_count: int = 0
|
||
self._polling_network_error_count: int = 0
|
||
self._polling_error_callback_ref = None
|
||
# DM Topics: map of topic_name -> message_thread_id (populated at startup)
|
||
self._dm_topics: Dict[str, int] = {}
|
||
# DM Topics config from extra.dm_topics
|
||
self._dm_topics_config: List[Dict[str, Any]] = self.config.extra.get("dm_topics", [])
|
||
|
||
def _fallback_ips(self) -> list[str]:
|
||
"""Return validated fallback IPs from config (populated by _apply_env_overrides)."""
|
||
configured = self.config.extra.get("fallback_ips", []) if getattr(self.config, "extra", None) else []
|
||
if isinstance(configured, str):
|
||
configured = configured.split(",")
|
||
return parse_fallback_ip_env(",".join(str(v) for v in configured) if configured else None)
|
||
|
||
@staticmethod
|
||
def _looks_like_polling_conflict(error: Exception) -> bool:
|
||
text = str(error).lower()
|
||
return (
|
||
error.__class__.__name__.lower() == "conflict"
|
||
or "terminated by other getupdates request" in text
|
||
or "another bot instance is running" in text
|
||
)
|
||
|
||
@staticmethod
|
||
def _looks_like_network_error(error: Exception) -> bool:
|
||
"""Return True for transient network errors that warrant a reconnect attempt."""
|
||
name = error.__class__.__name__.lower()
|
||
if name in ("networkerror", "timedout", "connectionerror"):
|
||
return True
|
||
try:
|
||
from telegram.error import NetworkError, TimedOut
|
||
if isinstance(error, (NetworkError, TimedOut)):
|
||
return True
|
||
except ImportError:
|
||
pass
|
||
return isinstance(error, OSError)
|
||
|
||
async def _handle_polling_network_error(self, error: Exception) -> None:
|
||
"""Reconnect polling after a transient network interruption.
|
||
|
||
Triggered by NetworkError/TimedOut in the polling error callback, which
|
||
happen when the host loses connectivity (Mac sleep, WiFi switch, VPN
|
||
reconnect, etc.). The gateway process stays alive but the long-poll
|
||
connection silently dies; without this handler the bot never recovers.
|
||
|
||
Strategy: exponential back-off (5s, 10s, 20s, 40s, 60s cap) up to
|
||
MAX_NETWORK_RETRIES attempts, then mark the adapter retryable-fatal so
|
||
the supervisor restarts the gateway process.
|
||
"""
|
||
if self.has_fatal_error:
|
||
return
|
||
|
||
MAX_NETWORK_RETRIES = 10
|
||
BASE_DELAY = 5
|
||
MAX_DELAY = 60
|
||
|
||
self._polling_network_error_count += 1
|
||
attempt = self._polling_network_error_count
|
||
|
||
if attempt > MAX_NETWORK_RETRIES:
|
||
message = (
|
||
"Telegram polling could not reconnect after %d network error retries. "
|
||
"Restarting gateway." % MAX_NETWORK_RETRIES
|
||
)
|
||
logger.error("[%s] %s Last error: %s", self.name, message, error)
|
||
self._set_fatal_error("telegram_network_error", message, retryable=True)
|
||
await self._notify_fatal_error()
|
||
return
|
||
|
||
delay = min(BASE_DELAY * (2 ** (attempt - 1)), MAX_DELAY)
|
||
logger.warning(
|
||
"[%s] Telegram network error (attempt %d/%d), reconnecting in %ds. Error: %s",
|
||
self.name, attempt, MAX_NETWORK_RETRIES, delay, error,
|
||
)
|
||
await asyncio.sleep(delay)
|
||
|
||
try:
|
||
if self._app and self._app.updater and self._app.updater.running:
|
||
await self._app.updater.stop()
|
||
except Exception:
|
||
pass
|
||
|
||
try:
|
||
await self._app.updater.start_polling(
|
||
allowed_updates=Update.ALL_TYPES,
|
||
drop_pending_updates=False,
|
||
error_callback=self._polling_error_callback_ref,
|
||
)
|
||
logger.info(
|
||
"[%s] Telegram polling resumed after network error (attempt %d)",
|
||
self.name, attempt,
|
||
)
|
||
self._polling_network_error_count = 0
|
||
except Exception as retry_err:
|
||
logger.warning("[%s] Telegram polling reconnect failed: %s", self.name, retry_err)
|
||
# start_polling failed — polling is dead and no further error
|
||
# callbacks will fire, so schedule the next retry ourselves.
|
||
if not self.has_fatal_error:
|
||
task = asyncio.ensure_future(
|
||
self._handle_polling_network_error(retry_err)
|
||
)
|
||
self._background_tasks.add(task)
|
||
task.add_done_callback(self._background_tasks.discard)
|
||
|
||
async def _handle_polling_conflict(self, error: Exception) -> None:
|
||
if self.has_fatal_error and self.fatal_error_code == "telegram_polling_conflict":
|
||
return
|
||
# Track consecutive conflicts — transient 409s can occur when a
|
||
# previous gateway instance hasn't fully released its long-poll
|
||
# session on Telegram's server (e.g. during --replace handoffs or
|
||
# systemd Restart=on-failure respawns). Retry a few times before
|
||
# giving up, so the old session has time to expire.
|
||
self._polling_conflict_count += 1
|
||
|
||
MAX_CONFLICT_RETRIES = 3
|
||
RETRY_DELAY = 10 # seconds
|
||
|
||
if self._polling_conflict_count <= MAX_CONFLICT_RETRIES:
|
||
logger.warning(
|
||
"[%s] Telegram polling conflict (%d/%d), will retry in %ds. Error: %s",
|
||
self.name, self._polling_conflict_count, MAX_CONFLICT_RETRIES,
|
||
RETRY_DELAY, error,
|
||
)
|
||
try:
|
||
if self._app and self._app.updater and self._app.updater.running:
|
||
await self._app.updater.stop()
|
||
except Exception:
|
||
pass
|
||
await asyncio.sleep(RETRY_DELAY)
|
||
try:
|
||
await self._app.updater.start_polling(
|
||
allowed_updates=Update.ALL_TYPES,
|
||
drop_pending_updates=False,
|
||
error_callback=self._polling_error_callback_ref,
|
||
)
|
||
logger.info("[%s] Telegram polling resumed after conflict retry %d", self.name, self._polling_conflict_count)
|
||
self._polling_conflict_count = 0 # reset on success
|
||
return
|
||
except Exception as retry_err:
|
||
logger.warning("[%s] Telegram polling retry failed: %s", self.name, retry_err)
|
||
# Don't fall through to fatal yet — wait for the next conflict
|
||
# to trigger another retry attempt (up to MAX_CONFLICT_RETRIES).
|
||
return
|
||
|
||
# Exhausted retries — fatal
|
||
message = (
|
||
"Another Telegram bot poller is already using this token. "
|
||
"Hermes stopped Telegram polling after %d retries. "
|
||
"Make sure only one gateway instance is running for this bot token."
|
||
% MAX_CONFLICT_RETRIES
|
||
)
|
||
logger.error("[%s] %s Original error: %s", self.name, message, error)
|
||
self._set_fatal_error("telegram_polling_conflict", message, retryable=False)
|
||
try:
|
||
if self._app and self._app.updater:
|
||
await self._app.updater.stop()
|
||
except Exception as stop_error:
|
||
logger.warning("[%s] Failed stopping Telegram polling after conflict: %s", self.name, stop_error, exc_info=True)
|
||
await self._notify_fatal_error()
|
||
|
||
async def _create_dm_topic(
|
||
self,
|
||
chat_id: int,
|
||
name: str,
|
||
icon_color: Optional[int] = None,
|
||
icon_custom_emoji_id: Optional[str] = None,
|
||
) -> Optional[int]:
|
||
"""Create a forum topic in a private (DM) chat.
|
||
|
||
Uses Bot API 9.4's createForumTopic which now works for 1-on-1 chats.
|
||
Returns the message_thread_id on success, None on failure.
|
||
"""
|
||
if not self._bot:
|
||
return None
|
||
try:
|
||
kwargs: Dict[str, Any] = {"chat_id": chat_id, "name": name}
|
||
if icon_color is not None:
|
||
kwargs["icon_color"] = icon_color
|
||
if icon_custom_emoji_id:
|
||
kwargs["icon_custom_emoji_id"] = icon_custom_emoji_id
|
||
|
||
topic = await self._bot.create_forum_topic(**kwargs)
|
||
thread_id = topic.message_thread_id
|
||
logger.info(
|
||
"[%s] Created DM topic '%s' in chat %s -> thread_id=%s",
|
||
self.name, name, chat_id, thread_id,
|
||
)
|
||
return thread_id
|
||
except Exception as e:
|
||
error_text = str(e).lower()
|
||
# If topic already exists, try to find it via getForumTopicIconStickers
|
||
# or we just log and skip — Telegram doesn't provide a "list topics" API
|
||
if "topic_name_duplicate" in error_text or "already" in error_text:
|
||
logger.info(
|
||
"[%s] DM topic '%s' already exists in chat %s (will be mapped from incoming messages)",
|
||
self.name, name, chat_id,
|
||
)
|
||
else:
|
||
logger.warning(
|
||
"[%s] Failed to create DM topic '%s' in chat %s: %s",
|
||
self.name, name, chat_id, e,
|
||
)
|
||
return None
|
||
|
||
def _persist_dm_topic_thread_id(self, chat_id: int, topic_name: str, thread_id: int) -> None:
|
||
"""Save a newly created thread_id back into config.yaml so it persists across restarts."""
|
||
try:
|
||
from hermes_constants import get_hermes_home
|
||
config_path = get_hermes_home() / "config.yaml"
|
||
if not config_path.exists():
|
||
logger.warning("[%s] Config file not found at %s, cannot persist thread_id", self.name, config_path)
|
||
return
|
||
|
||
import yaml as _yaml
|
||
with open(config_path, "r") as f:
|
||
config = _yaml.safe_load(f) or {}
|
||
|
||
# Navigate to platforms.telegram.extra.dm_topics
|
||
dm_topics = (
|
||
config.get("platforms", {})
|
||
.get("telegram", {})
|
||
.get("extra", {})
|
||
.get("dm_topics", [])
|
||
)
|
||
if not dm_topics:
|
||
return
|
||
|
||
changed = False
|
||
for chat_entry in dm_topics:
|
||
if int(chat_entry.get("chat_id", 0)) != int(chat_id):
|
||
continue
|
||
for t in chat_entry.get("topics", []):
|
||
if t.get("name") == topic_name and not t.get("thread_id"):
|
||
t["thread_id"] = thread_id
|
||
changed = True
|
||
break
|
||
|
||
if changed:
|
||
with open(config_path, "w") as f:
|
||
_yaml.dump(config, f, default_flow_style=False, sort_keys=False)
|
||
logger.info(
|
||
"[%s] Persisted thread_id=%s for topic '%s' in config.yaml",
|
||
self.name, thread_id, topic_name,
|
||
)
|
||
except Exception as e:
|
||
logger.warning("[%s] Failed to persist thread_id to config: %s", self.name, e, exc_info=True)
|
||
|
||
async def _setup_dm_topics(self) -> None:
|
||
"""Load or create configured DM topics for specified chats.
|
||
|
||
Reads config.extra['dm_topics'] — a list of dicts:
|
||
[
|
||
{
|
||
"chat_id": 123456789,
|
||
"topics": [
|
||
{"name": "General", "icon_color": 7322096, "thread_id": 100},
|
||
{"name": "Accessibility Auditor", "icon_color": 9367192, "skill": "accessibility-auditor"}
|
||
]
|
||
}
|
||
]
|
||
|
||
If a topic already has a thread_id in the config (persisted from a previous
|
||
creation), it is loaded into the cache without calling createForumTopic.
|
||
Only topics without a thread_id are created via the API, and their thread_id
|
||
is then saved back to config.yaml for future restarts.
|
||
"""
|
||
if not self._dm_topics_config:
|
||
return
|
||
|
||
for chat_entry in self._dm_topics_config:
|
||
chat_id = chat_entry.get("chat_id")
|
||
topics = chat_entry.get("topics", [])
|
||
if not chat_id or not topics:
|
||
continue
|
||
|
||
logger.info(
|
||
"[%s] Setting up %d DM topic(s) for chat %s",
|
||
self.name, len(topics), chat_id,
|
||
)
|
||
|
||
for topic_conf in topics:
|
||
topic_name = topic_conf.get("name")
|
||
if not topic_name:
|
||
continue
|
||
|
||
cache_key = f"{chat_id}:{topic_name}"
|
||
|
||
# If thread_id is already persisted in config, just load into cache
|
||
existing_thread_id = topic_conf.get("thread_id")
|
||
if existing_thread_id:
|
||
self._dm_topics[cache_key] = int(existing_thread_id)
|
||
logger.info(
|
||
"[%s] DM topic loaded from config: %s -> thread_id=%s",
|
||
self.name, cache_key, existing_thread_id,
|
||
)
|
||
continue
|
||
|
||
# No persisted thread_id — create the topic via API
|
||
icon_color = topic_conf.get("icon_color")
|
||
icon_emoji = topic_conf.get("icon_custom_emoji_id")
|
||
|
||
thread_id = await self._create_dm_topic(
|
||
chat_id=int(chat_id),
|
||
name=topic_name,
|
||
icon_color=icon_color,
|
||
icon_custom_emoji_id=icon_emoji,
|
||
)
|
||
|
||
if thread_id:
|
||
self._dm_topics[cache_key] = thread_id
|
||
logger.info(
|
||
"[%s] DM topic cached: %s -> thread_id=%s",
|
||
self.name, cache_key, thread_id,
|
||
)
|
||
# Persist thread_id to config so we don't recreate on next restart
|
||
self._persist_dm_topic_thread_id(int(chat_id), topic_name, thread_id)
|
||
|
||
async def connect(self) -> bool:
|
||
"""Connect to Telegram via polling or webhook.
|
||
|
||
By default, uses long polling (outbound connection to Telegram).
|
||
If ``TELEGRAM_WEBHOOK_URL`` is set, starts an HTTP webhook server
|
||
instead. Webhook mode is useful for cloud deployments (Fly.io,
|
||
Railway) where inbound HTTP can wake a suspended machine.
|
||
|
||
Env vars for webhook mode::
|
||
|
||
TELEGRAM_WEBHOOK_URL Public HTTPS URL (e.g. https://app.fly.dev/telegram)
|
||
TELEGRAM_WEBHOOK_PORT Local listen port (default 8443)
|
||
TELEGRAM_WEBHOOK_SECRET Secret token for update verification
|
||
"""
|
||
if not TELEGRAM_AVAILABLE:
|
||
logger.error(
|
||
"[%s] python-telegram-bot not installed. Run: pip install python-telegram-bot",
|
||
self.name,
|
||
)
|
||
return False
|
||
|
||
if not self.config.token:
|
||
logger.error("[%s] No bot token configured", self.name)
|
||
return False
|
||
|
||
try:
|
||
from gateway.status import acquire_scoped_lock
|
||
|
||
self._token_lock_identity = self.config.token
|
||
acquired, existing = acquire_scoped_lock(
|
||
"telegram-bot-token",
|
||
self._token_lock_identity,
|
||
metadata={"platform": self.platform.value},
|
||
)
|
||
if not acquired:
|
||
owner_pid = existing.get("pid") if isinstance(existing, dict) else None
|
||
message = (
|
||
"Another local Hermes gateway is already using this Telegram bot token"
|
||
+ (f" (PID {owner_pid})." if owner_pid else ".")
|
||
+ " Stop the other gateway before starting a second Telegram poller."
|
||
)
|
||
logger.error("[%s] %s", self.name, message)
|
||
self._set_fatal_error("telegram_token_lock", message, retryable=False)
|
||
return False
|
||
|
||
# Build the application
|
||
builder = Application.builder().token(self.config.token)
|
||
fallback_ips = self._fallback_ips()
|
||
if not fallback_ips:
|
||
fallback_ips = await discover_fallback_ips()
|
||
logger.info(
|
||
"[%s] Auto-discovered Telegram fallback IPs: %s",
|
||
self.name,
|
||
", ".join(fallback_ips),
|
||
)
|
||
if fallback_ips:
|
||
logger.warning(
|
||
"[%s] Telegram fallback IPs active: %s",
|
||
self.name,
|
||
", ".join(fallback_ips),
|
||
)
|
||
transport = TelegramFallbackTransport(fallback_ips)
|
||
request = HTTPXRequest(httpx_kwargs={"transport": transport})
|
||
get_updates_request = HTTPXRequest(httpx_kwargs={"transport": transport})
|
||
builder = builder.request(request).get_updates_request(get_updates_request)
|
||
self._app = builder.build()
|
||
self._bot = self._app.bot
|
||
|
||
# Register handlers
|
||
self._app.add_handler(TelegramMessageHandler(
|
||
filters.TEXT & ~filters.COMMAND,
|
||
self._handle_text_message
|
||
))
|
||
self._app.add_handler(TelegramMessageHandler(
|
||
filters.COMMAND,
|
||
self._handle_command
|
||
))
|
||
self._app.add_handler(TelegramMessageHandler(
|
||
filters.LOCATION | getattr(filters, "VENUE", filters.LOCATION),
|
||
self._handle_location_message
|
||
))
|
||
self._app.add_handler(TelegramMessageHandler(
|
||
filters.PHOTO | filters.VIDEO | filters.AUDIO | filters.VOICE | filters.Document.ALL | filters.Sticker.ALL,
|
||
self._handle_media_message
|
||
))
|
||
|
||
# Start polling — retry initialize() for transient TLS resets
|
||
try:
|
||
from telegram.error import NetworkError, TimedOut
|
||
except ImportError:
|
||
NetworkError = TimedOut = OSError # type: ignore[misc,assignment]
|
||
_max_connect = 3
|
||
for _attempt in range(_max_connect):
|
||
try:
|
||
await self._app.initialize()
|
||
break
|
||
except (NetworkError, TimedOut, OSError) as init_err:
|
||
if _attempt < _max_connect - 1:
|
||
wait = 2 ** _attempt
|
||
logger.warning(
|
||
"[%s] Connect attempt %d/%d failed: %s — retrying in %ds",
|
||
self.name, _attempt + 1, _max_connect, init_err, wait,
|
||
)
|
||
await asyncio.sleep(wait)
|
||
else:
|
||
raise
|
||
await self._app.start()
|
||
|
||
# Decide between webhook and polling mode
|
||
webhook_url = os.getenv("TELEGRAM_WEBHOOK_URL", "").strip()
|
||
|
||
if webhook_url:
|
||
# ── Webhook mode ─────────────────────────────────────
|
||
# Telegram pushes updates to our HTTP endpoint. This
|
||
# enables cloud platforms (Fly.io, Railway) to auto-wake
|
||
# suspended machines on inbound HTTP traffic.
|
||
webhook_port = int(os.getenv("TELEGRAM_WEBHOOK_PORT", "8443"))
|
||
webhook_secret = os.getenv("TELEGRAM_WEBHOOK_SECRET", "").strip() or None
|
||
from urllib.parse import urlparse
|
||
webhook_path = urlparse(webhook_url).path or "/telegram"
|
||
|
||
await self._app.updater.start_webhook(
|
||
listen="0.0.0.0",
|
||
port=webhook_port,
|
||
url_path=webhook_path,
|
||
webhook_url=webhook_url,
|
||
secret_token=webhook_secret,
|
||
allowed_updates=Update.ALL_TYPES,
|
||
drop_pending_updates=True,
|
||
)
|
||
self._webhook_mode = True
|
||
logger.info(
|
||
"[%s] Webhook server listening on 0.0.0.0:%d%s",
|
||
self.name, webhook_port, webhook_path,
|
||
)
|
||
else:
|
||
# ── Polling mode (default) ───────────────────────────
|
||
loop = asyncio.get_running_loop()
|
||
|
||
def _polling_error_callback(error: Exception) -> None:
|
||
if self._polling_error_task and not self._polling_error_task.done():
|
||
return
|
||
if self._looks_like_polling_conflict(error):
|
||
self._polling_error_task = loop.create_task(self._handle_polling_conflict(error))
|
||
elif self._looks_like_network_error(error):
|
||
logger.warning("[%s] Telegram network error, scheduling reconnect: %s", self.name, error)
|
||
self._polling_error_task = loop.create_task(self._handle_polling_network_error(error))
|
||
else:
|
||
logger.error("[%s] Telegram polling error: %s", self.name, error, exc_info=True)
|
||
|
||
# Store reference for retry use in _handle_polling_conflict
|
||
self._polling_error_callback_ref = _polling_error_callback
|
||
|
||
await self._app.updater.start_polling(
|
||
allowed_updates=Update.ALL_TYPES,
|
||
drop_pending_updates=True,
|
||
error_callback=_polling_error_callback,
|
||
)
|
||
|
||
# Register bot commands so Telegram shows a hint menu when users type /
|
||
# List is derived from the central COMMAND_REGISTRY — adding a new
|
||
# gateway command there automatically adds it to the Telegram menu.
|
||
try:
|
||
from telegram import BotCommand
|
||
from hermes_cli.commands import telegram_bot_commands
|
||
await self._bot.set_my_commands([
|
||
BotCommand(name, desc) for name, desc in telegram_bot_commands()
|
||
])
|
||
except Exception as e:
|
||
logger.warning(
|
||
"[%s] Could not register Telegram command menu: %s",
|
||
self.name,
|
||
e,
|
||
exc_info=True,
|
||
)
|
||
|
||
self._mark_connected()
|
||
mode = "webhook" if self._webhook_mode else "polling"
|
||
logger.info("[%s] Connected to Telegram (%s mode)", self.name, mode)
|
||
|
||
# Set up DM topics (Bot API 9.4 — Private Chat Topics)
|
||
# Runs after connection is established so the bot can call createForumTopic.
|
||
# Failures here are non-fatal — the bot works fine without topics.
|
||
try:
|
||
await self._setup_dm_topics()
|
||
except Exception as topics_err:
|
||
logger.warning(
|
||
"[%s] DM topics setup failed (non-fatal): %s",
|
||
self.name, topics_err, exc_info=True,
|
||
)
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
if self._token_lock_identity:
|
||
try:
|
||
from gateway.status import release_scoped_lock
|
||
release_scoped_lock("telegram-bot-token", self._token_lock_identity)
|
||
except Exception:
|
||
pass
|
||
message = f"Telegram startup failed: {e}"
|
||
self._set_fatal_error("telegram_connect_error", message, retryable=True)
|
||
logger.error("[%s] Failed to connect to Telegram: %s", self.name, e, exc_info=True)
|
||
return False
|
||
|
||
async def disconnect(self) -> None:
|
||
"""Stop polling/webhook, cancel pending album flushes, and disconnect."""
|
||
pending_media_group_tasks = list(self._media_group_tasks.values())
|
||
for task in pending_media_group_tasks:
|
||
task.cancel()
|
||
if pending_media_group_tasks:
|
||
await asyncio.gather(*pending_media_group_tasks, return_exceptions=True)
|
||
self._media_group_tasks.clear()
|
||
self._media_group_events.clear()
|
||
|
||
if self._app:
|
||
try:
|
||
# Only stop the updater if it's running
|
||
if self._app.updater and self._app.updater.running:
|
||
await self._app.updater.stop()
|
||
if self._app.running:
|
||
await self._app.stop()
|
||
await self._app.shutdown()
|
||
except Exception as e:
|
||
logger.warning("[%s] Error during Telegram disconnect: %s", self.name, e, exc_info=True)
|
||
if self._token_lock_identity:
|
||
try:
|
||
from gateway.status import release_scoped_lock
|
||
release_scoped_lock("telegram-bot-token", self._token_lock_identity)
|
||
except Exception as e:
|
||
logger.warning("[%s] Error releasing Telegram token lock: %s", self.name, e, exc_info=True)
|
||
|
||
for task in self._pending_photo_batch_tasks.values():
|
||
if task and not task.done():
|
||
task.cancel()
|
||
self._pending_photo_batch_tasks.clear()
|
||
self._pending_photo_batches.clear()
|
||
|
||
self._mark_disconnected()
|
||
self._app = None
|
||
self._bot = None
|
||
self._token_lock_identity = None
|
||
logger.info("[%s] Disconnected from Telegram", self.name)
|
||
|
||
def _should_thread_reply(self, reply_to: Optional[str], chunk_index: int) -> bool:
|
||
"""Determine if this message chunk should thread to the original message.
|
||
|
||
Args:
|
||
reply_to: The original message ID to reply to
|
||
chunk_index: Index of this chunk (0 = first chunk)
|
||
|
||
Returns:
|
||
True if this chunk should be threaded to the original message
|
||
"""
|
||
if not reply_to:
|
||
return False
|
||
mode = self._reply_to_mode
|
||
if mode == "off":
|
||
return False
|
||
elif mode == "all":
|
||
return True
|
||
else: # "first" (default)
|
||
return chunk_index == 0
|
||
|
||
async def send(
|
||
self,
|
||
chat_id: str,
|
||
content: str,
|
||
reply_to: Optional[str] = None,
|
||
metadata: Optional[Dict[str, Any]] = None
|
||
) -> SendResult:
|
||
"""Send a message to a Telegram chat."""
|
||
if not self._bot:
|
||
return SendResult(success=False, error="Not connected")
|
||
|
||
try:
|
||
# Format and split message if needed
|
||
formatted = self.format_message(content)
|
||
chunks = self.truncate_message(formatted, self.MAX_MESSAGE_LENGTH)
|
||
if len(chunks) > 1:
|
||
# truncate_message appends a raw " (1/2)" suffix. Escape the
|
||
# MarkdownV2-special parentheses so Telegram doesn't reject the
|
||
# chunk and fall back to plain text.
|
||
chunks = [
|
||
re.sub(r" \((\d+)/(\d+)\)$", r" \\(\1/\2\\)", chunk)
|
||
for chunk in chunks
|
||
]
|
||
|
||
message_ids = []
|
||
thread_id = metadata.get("thread_id") if metadata else None
|
||
|
||
try:
|
||
from telegram.error import NetworkError as _NetErr
|
||
except ImportError:
|
||
_NetErr = OSError # type: ignore[misc,assignment]
|
||
|
||
try:
|
||
from telegram.error import BadRequest as _BadReq
|
||
except ImportError:
|
||
_BadReq = None # type: ignore[assignment,misc]
|
||
|
||
for i, chunk in enumerate(chunks):
|
||
should_thread = self._should_thread_reply(reply_to, i)
|
||
reply_to_id = int(reply_to) if should_thread else None
|
||
effective_thread_id = int(thread_id) if thread_id else None
|
||
|
||
msg = None
|
||
for _send_attempt in range(3):
|
||
try:
|
||
# Try Markdown first, fall back to plain text if it fails
|
||
try:
|
||
msg = await self._bot.send_message(
|
||
chat_id=int(chat_id),
|
||
text=chunk,
|
||
parse_mode=ParseMode.MARKDOWN_V2,
|
||
reply_to_message_id=reply_to_id,
|
||
message_thread_id=effective_thread_id,
|
||
)
|
||
except Exception as md_error:
|
||
# Markdown parsing failed, try plain text
|
||
if "parse" in str(md_error).lower() or "markdown" in str(md_error).lower():
|
||
logger.warning("[%s] MarkdownV2 parse failed, falling back to plain text: %s", self.name, md_error)
|
||
plain_chunk = _strip_mdv2(chunk)
|
||
msg = await self._bot.send_message(
|
||
chat_id=int(chat_id),
|
||
text=plain_chunk,
|
||
parse_mode=None,
|
||
reply_to_message_id=reply_to_id,
|
||
message_thread_id=effective_thread_id,
|
||
)
|
||
else:
|
||
raise
|
||
break # success
|
||
except _NetErr as send_err:
|
||
# BadRequest is a subclass of NetworkError in
|
||
# python-telegram-bot but represents permanent errors
|
||
# (not transient network issues). Detect and handle
|
||
# specific cases instead of blindly retrying.
|
||
if _BadReq and isinstance(send_err, _BadReq):
|
||
err_lower = str(send_err).lower()
|
||
if "thread not found" in err_lower and effective_thread_id is not None:
|
||
# Thread doesn't exist — retry without
|
||
# message_thread_id so the message still
|
||
# reaches the chat.
|
||
logger.warning(
|
||
"[%s] Thread %s not found, retrying without message_thread_id",
|
||
self.name, effective_thread_id,
|
||
)
|
||
effective_thread_id = None
|
||
continue
|
||
if "message to be replied not found" in err_lower and reply_to_id is not None:
|
||
# Original message was deleted before we
|
||
# could reply — clear reply target and retry
|
||
# so the response is still delivered.
|
||
logger.warning(
|
||
"[%s] Reply target deleted, retrying without reply_to: %s",
|
||
self.name, send_err,
|
||
)
|
||
reply_to_id = None
|
||
continue
|
||
# Other BadRequest errors are permanent — don't retry
|
||
raise
|
||
if _send_attempt < 2:
|
||
wait = 2 ** _send_attempt
|
||
logger.warning("[%s] Network error on send (attempt %d/3), retrying in %ds: %s",
|
||
self.name, _send_attempt + 1, wait, send_err)
|
||
await asyncio.sleep(wait)
|
||
else:
|
||
raise
|
||
message_ids.append(str(msg.message_id))
|
||
|
||
return SendResult(
|
||
success=True,
|
||
message_id=message_ids[0] if message_ids else None,
|
||
raw_response={"message_ids": message_ids}
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error("[%s] Failed to send Telegram message: %s", self.name, e, exc_info=True)
|
||
return SendResult(success=False, error=str(e))
|
||
|
||
async def edit_message(
|
||
self,
|
||
chat_id: str,
|
||
message_id: str,
|
||
content: str,
|
||
) -> SendResult:
|
||
"""Edit a previously sent Telegram message."""
|
||
if not self._bot:
|
||
return SendResult(success=False, error="Not connected")
|
||
try:
|
||
formatted = self.format_message(content)
|
||
try:
|
||
await self._bot.edit_message_text(
|
||
chat_id=int(chat_id),
|
||
message_id=int(message_id),
|
||
text=formatted,
|
||
parse_mode=ParseMode.MARKDOWN_V2,
|
||
)
|
||
except Exception as fmt_err:
|
||
# "Message is not modified" is a no-op, not an error
|
||
if "not modified" in str(fmt_err).lower():
|
||
return SendResult(success=True, message_id=message_id)
|
||
# Fallback: retry without markdown formatting
|
||
await self._bot.edit_message_text(
|
||
chat_id=int(chat_id),
|
||
message_id=int(message_id),
|
||
text=content,
|
||
)
|
||
return SendResult(success=True, message_id=message_id)
|
||
except Exception as e:
|
||
err_str = str(e).lower()
|
||
# "Message is not modified" — content identical, treat as success
|
||
if "not modified" in err_str:
|
||
return SendResult(success=True, message_id=message_id)
|
||
# Message too long — content exceeded 4096 chars (e.g. during
|
||
# streaming). Truncate and succeed so the stream consumer can
|
||
# split the overflow into a new message instead of dying.
|
||
if "message_too_long" in err_str or "too long" in err_str:
|
||
truncated = content[: self.MAX_MESSAGE_LENGTH - 20] + "…"
|
||
try:
|
||
await self._bot.edit_message_text(
|
||
chat_id=int(chat_id),
|
||
message_id=int(message_id),
|
||
text=truncated,
|
||
)
|
||
except Exception:
|
||
pass # best-effort truncation
|
||
return SendResult(success=True, message_id=message_id)
|
||
# Flood control / RetryAfter — back off and retry once
|
||
retry_after = getattr(e, "retry_after", None)
|
||
if retry_after is not None or "retry after" in err_str:
|
||
wait = retry_after if retry_after else 1.0
|
||
logger.warning(
|
||
"[%s] Telegram flood control, waiting %.1fs",
|
||
self.name, wait,
|
||
)
|
||
await asyncio.sleep(wait)
|
||
try:
|
||
await self._bot.edit_message_text(
|
||
chat_id=int(chat_id),
|
||
message_id=int(message_id),
|
||
text=content,
|
||
)
|
||
return SendResult(success=True, message_id=message_id)
|
||
except Exception as retry_err:
|
||
logger.error(
|
||
"[%s] Edit retry failed after flood wait: %s",
|
||
self.name, retry_err,
|
||
)
|
||
return SendResult(success=False, error=str(retry_err))
|
||
logger.error(
|
||
"[%s] Failed to edit Telegram message %s: %s",
|
||
self.name,
|
||
message_id,
|
||
e,
|
||
exc_info=True,
|
||
)
|
||
return SendResult(success=False, error=str(e))
|
||
|
||
async def send_voice(
|
||
self,
|
||
chat_id: str,
|
||
audio_path: str,
|
||
caption: Optional[str] = None,
|
||
reply_to: Optional[str] = None,
|
||
metadata: Optional[Dict[str, Any]] = None,
|
||
**kwargs,
|
||
) -> SendResult:
|
||
"""Send audio as a native Telegram voice message or audio file."""
|
||
if not self._bot:
|
||
return SendResult(success=False, error="Not connected")
|
||
|
||
try:
|
||
import os
|
||
if not os.path.exists(audio_path):
|
||
return SendResult(success=False, error=f"Audio file not found: {audio_path}")
|
||
|
||
with open(audio_path, "rb") as audio_file:
|
||
# .ogg files -> send as voice (round playable bubble)
|
||
if audio_path.endswith(".ogg") or audio_path.endswith(".opus"):
|
||
_voice_thread = metadata.get("thread_id") if metadata else None
|
||
msg = await self._bot.send_voice(
|
||
chat_id=int(chat_id),
|
||
voice=audio_file,
|
||
caption=caption[:1024] if caption else None,
|
||
reply_to_message_id=int(reply_to) if reply_to else None,
|
||
message_thread_id=int(_voice_thread) if _voice_thread else None,
|
||
)
|
||
else:
|
||
# .mp3 and others -> send as audio file
|
||
_audio_thread = metadata.get("thread_id") if metadata else None
|
||
msg = await self._bot.send_audio(
|
||
chat_id=int(chat_id),
|
||
audio=audio_file,
|
||
caption=caption[:1024] if caption else None,
|
||
reply_to_message_id=int(reply_to) if reply_to else None,
|
||
message_thread_id=int(_audio_thread) if _audio_thread else None,
|
||
)
|
||
return SendResult(success=True, message_id=str(msg.message_id))
|
||
except Exception as e:
|
||
logger.error(
|
||
"[%s] Failed to send Telegram voice/audio, falling back to base adapter: %s",
|
||
self.name,
|
||
e,
|
||
exc_info=True,
|
||
)
|
||
return await super().send_voice(chat_id, audio_path, caption, reply_to)
|
||
|
||
async def send_image_file(
|
||
self,
|
||
chat_id: str,
|
||
image_path: str,
|
||
caption: Optional[str] = None,
|
||
reply_to: Optional[str] = None,
|
||
metadata: Optional[Dict[str, Any]] = None,
|
||
**kwargs,
|
||
) -> SendResult:
|
||
"""Send a local image file natively as a Telegram photo."""
|
||
if not self._bot:
|
||
return SendResult(success=False, error="Not connected")
|
||
|
||
try:
|
||
import os
|
||
if not os.path.exists(image_path):
|
||
return SendResult(success=False, error=f"Image file not found: {image_path}")
|
||
|
||
_thread = metadata.get("thread_id") if metadata else None
|
||
with open(image_path, "rb") as image_file:
|
||
msg = await self._bot.send_photo(
|
||
chat_id=int(chat_id),
|
||
photo=image_file,
|
||
caption=caption[:1024] if caption else None,
|
||
reply_to_message_id=int(reply_to) if reply_to else None,
|
||
message_thread_id=int(_thread) if _thread else None,
|
||
)
|
||
return SendResult(success=True, message_id=str(msg.message_id))
|
||
except Exception as e:
|
||
logger.error(
|
||
"[%s] Failed to send Telegram local image, falling back to base adapter: %s",
|
||
self.name,
|
||
e,
|
||
exc_info=True,
|
||
)
|
||
return await super().send_image_file(chat_id, image_path, caption, reply_to)
|
||
|
||
async def send_document(
|
||
self,
|
||
chat_id: str,
|
||
file_path: str,
|
||
caption: Optional[str] = None,
|
||
file_name: Optional[str] = None,
|
||
reply_to: Optional[str] = None,
|
||
metadata: Optional[Dict[str, Any]] = None,
|
||
**kwargs,
|
||
) -> SendResult:
|
||
"""Send a document/file natively as a Telegram file attachment."""
|
||
if not self._bot:
|
||
return SendResult(success=False, error="Not connected")
|
||
|
||
try:
|
||
if not os.path.exists(file_path):
|
||
return SendResult(success=False, error=f"File not found: {file_path}")
|
||
|
||
display_name = file_name or os.path.basename(file_path)
|
||
_thread = metadata.get("thread_id") if metadata else None
|
||
|
||
with open(file_path, "rb") as f:
|
||
msg = await self._bot.send_document(
|
||
chat_id=int(chat_id),
|
||
document=f,
|
||
filename=display_name,
|
||
caption=caption[:1024] if caption else None,
|
||
reply_to_message_id=int(reply_to) if reply_to else None,
|
||
message_thread_id=int(_thread) if _thread else None,
|
||
)
|
||
return SendResult(success=True, message_id=str(msg.message_id))
|
||
except Exception as e:
|
||
print(f"[{self.name}] Failed to send document: {e}")
|
||
return await super().send_document(chat_id, file_path, caption, file_name, reply_to)
|
||
|
||
async def send_video(
|
||
self,
|
||
chat_id: str,
|
||
video_path: str,
|
||
caption: Optional[str] = None,
|
||
reply_to: Optional[str] = None,
|
||
metadata: Optional[Dict[str, Any]] = None,
|
||
**kwargs,
|
||
) -> SendResult:
|
||
"""Send a video natively as a Telegram video message."""
|
||
if not self._bot:
|
||
return SendResult(success=False, error="Not connected")
|
||
|
||
try:
|
||
if not os.path.exists(video_path):
|
||
return SendResult(success=False, error=f"Video file not found: {video_path}")
|
||
|
||
_thread = metadata.get("thread_id") if metadata else None
|
||
with open(video_path, "rb") as f:
|
||
msg = await self._bot.send_video(
|
||
chat_id=int(chat_id),
|
||
video=f,
|
||
caption=caption[:1024] if caption else None,
|
||
reply_to_message_id=int(reply_to) if reply_to else None,
|
||
message_thread_id=int(_thread) if _thread else None,
|
||
)
|
||
return SendResult(success=True, message_id=str(msg.message_id))
|
||
except Exception as e:
|
||
print(f"[{self.name}] Failed to send video: {e}")
|
||
return await super().send_video(chat_id, video_path, caption, reply_to)
|
||
|
||
async def send_image(
|
||
self,
|
||
chat_id: str,
|
||
image_url: str,
|
||
caption: Optional[str] = None,
|
||
reply_to: Optional[str] = None,
|
||
metadata: Optional[Dict[str, Any]] = None,
|
||
) -> SendResult:
|
||
"""Send an image natively as a Telegram photo.
|
||
|
||
Tries URL-based send first (fast, works for <5MB images).
|
||
Falls back to downloading and uploading as file (supports up to 10MB).
|
||
"""
|
||
if not self._bot:
|
||
return SendResult(success=False, error="Not connected")
|
||
|
||
try:
|
||
# Telegram can send photos directly from URLs (up to ~5MB)
|
||
_photo_thread = metadata.get("thread_id") if metadata else None
|
||
msg = await self._bot.send_photo(
|
||
chat_id=int(chat_id),
|
||
photo=image_url,
|
||
caption=caption[:1024] if caption else None, # Telegram caption limit
|
||
reply_to_message_id=int(reply_to) if reply_to else None,
|
||
message_thread_id=int(_photo_thread) if _photo_thread else None,
|
||
)
|
||
return SendResult(success=True, message_id=str(msg.message_id))
|
||
except Exception as e:
|
||
logger.warning(
|
||
"[%s] URL-based send_photo failed, trying file upload: %s",
|
||
self.name,
|
||
e,
|
||
exc_info=True,
|
||
)
|
||
# Fallback: download and upload as file (supports up to 10MB)
|
||
try:
|
||
import httpx
|
||
async with httpx.AsyncClient(timeout=30.0) as client:
|
||
resp = await client.get(image_url)
|
||
resp.raise_for_status()
|
||
image_data = resp.content
|
||
|
||
msg = await self._bot.send_photo(
|
||
chat_id=int(chat_id),
|
||
photo=image_data,
|
||
caption=caption[:1024] if caption else None,
|
||
reply_to_message_id=int(reply_to) if reply_to else None,
|
||
)
|
||
return SendResult(success=True, message_id=str(msg.message_id))
|
||
except Exception as e2:
|
||
logger.error(
|
||
"[%s] File upload send_photo also failed: %s",
|
||
self.name,
|
||
e2,
|
||
exc_info=True,
|
||
)
|
||
# Final fallback: send URL as text
|
||
return await super().send_image(chat_id, image_url, caption, reply_to)
|
||
|
||
async def send_animation(
|
||
self,
|
||
chat_id: str,
|
||
animation_url: str,
|
||
caption: Optional[str] = None,
|
||
reply_to: Optional[str] = None,
|
||
metadata: Optional[Dict[str, Any]] = None,
|
||
) -> SendResult:
|
||
"""Send an animated GIF natively as a Telegram animation (auto-plays inline)."""
|
||
if not self._bot:
|
||
return SendResult(success=False, error="Not connected")
|
||
|
||
try:
|
||
_anim_thread = metadata.get("thread_id") if metadata else None
|
||
msg = await self._bot.send_animation(
|
||
chat_id=int(chat_id),
|
||
animation=animation_url,
|
||
caption=caption[:1024] if caption else None,
|
||
reply_to_message_id=int(reply_to) if reply_to else None,
|
||
message_thread_id=int(_anim_thread) if _anim_thread else None,
|
||
)
|
||
return SendResult(success=True, message_id=str(msg.message_id))
|
||
except Exception as e:
|
||
logger.error(
|
||
"[%s] Failed to send Telegram animation, falling back to photo: %s",
|
||
self.name,
|
||
e,
|
||
exc_info=True,
|
||
)
|
||
# Fallback: try as a regular photo
|
||
return await self.send_image(chat_id, animation_url, caption, reply_to)
|
||
|
||
async def send_typing(self, chat_id: str, metadata: Optional[Dict[str, Any]] = None) -> None:
|
||
"""Send typing indicator."""
|
||
if self._bot:
|
||
try:
|
||
_typing_thread = metadata.get("thread_id") if metadata else None
|
||
await self._bot.send_chat_action(
|
||
chat_id=int(chat_id),
|
||
action="typing",
|
||
message_thread_id=int(_typing_thread) if _typing_thread else None,
|
||
)
|
||
except Exception as e:
|
||
# Typing failures are non-fatal; log at debug level only.
|
||
logger.debug(
|
||
"[%s] Failed to send Telegram typing indicator: %s",
|
||
self.name,
|
||
e,
|
||
exc_info=True,
|
||
)
|
||
|
||
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
|
||
"""Get information about a Telegram chat."""
|
||
if not self._bot:
|
||
return {"name": "Unknown", "type": "dm"}
|
||
|
||
try:
|
||
chat = await self._bot.get_chat(int(chat_id))
|
||
|
||
chat_type = "dm"
|
||
if chat.type == ChatType.GROUP:
|
||
chat_type = "group"
|
||
elif chat.type == ChatType.SUPERGROUP:
|
||
chat_type = "group"
|
||
if chat.is_forum:
|
||
chat_type = "forum"
|
||
elif chat.type == ChatType.CHANNEL:
|
||
chat_type = "channel"
|
||
|
||
return {
|
||
"name": chat.title or chat.full_name or str(chat_id),
|
||
"type": chat_type,
|
||
"username": chat.username,
|
||
"is_forum": getattr(chat, "is_forum", False),
|
||
}
|
||
except Exception as e:
|
||
logger.error(
|
||
"[%s] Failed to get Telegram chat info for %s: %s",
|
||
self.name,
|
||
chat_id,
|
||
e,
|
||
exc_info=True,
|
||
)
|
||
return {"name": str(chat_id), "type": "dm", "error": str(e)}
|
||
|
||
def format_message(self, content: str) -> str:
|
||
"""
|
||
Convert standard markdown to Telegram MarkdownV2 format.
|
||
|
||
Protected regions (code blocks, inline code) are extracted first so
|
||
their contents are never modified. Standard markdown constructs
|
||
(headers, bold, italic, links) are translated to MarkdownV2 syntax,
|
||
and all remaining special characters are escaped.
|
||
"""
|
||
if not content:
|
||
return content
|
||
|
||
placeholders: dict = {}
|
||
counter = [0]
|
||
|
||
def _ph(value: str) -> str:
|
||
"""Stash *value* behind a placeholder token that survives escaping."""
|
||
key = f"\x00PH{counter[0]}\x00"
|
||
counter[0] += 1
|
||
placeholders[key] = value
|
||
return key
|
||
|
||
text = content
|
||
|
||
# 1) Protect fenced code blocks (``` ... ```)
|
||
# Per MarkdownV2 spec, \ and ` inside pre/code must be escaped.
|
||
def _protect_fenced(m):
|
||
raw = m.group(0)
|
||
# Split off opening ``` (with optional language) and closing ```
|
||
open_end = raw.index('\n') + 1 if '\n' in raw[3:] else 3
|
||
opening = raw[:open_end]
|
||
body_and_close = raw[open_end:]
|
||
body = body_and_close[:-3]
|
||
body = body.replace('\\', '\\\\').replace('`', '\\`')
|
||
return _ph(opening + body + '```')
|
||
|
||
text = re.sub(
|
||
r'(```(?:[^\n]*\n)?[\s\S]*?```)',
|
||
_protect_fenced,
|
||
text,
|
||
)
|
||
|
||
# 2) Protect inline code (`...`)
|
||
# Escape \ inside inline code per MarkdownV2 spec.
|
||
text = re.sub(
|
||
r'(`[^`]+`)',
|
||
lambda m: _ph(m.group(0).replace('\\', '\\\\')),
|
||
text,
|
||
)
|
||
|
||
# 3) Convert markdown links – escape the display text; inside the URL
|
||
# only ')' and '\' need escaping per the MarkdownV2 spec.
|
||
def _convert_link(m):
|
||
display = _escape_mdv2(m.group(1))
|
||
url = m.group(2).replace('\\', '\\\\').replace(')', '\\)')
|
||
return _ph(f'[{display}]({url})')
|
||
|
||
text = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', _convert_link, text)
|
||
|
||
# 4) Convert markdown headers (## Title) → bold *Title*
|
||
def _convert_header(m):
|
||
inner = m.group(1).strip()
|
||
# Strip redundant bold markers that may appear inside a header
|
||
inner = re.sub(r'\*\*(.+?)\*\*', r'\1', inner)
|
||
return _ph(f'*{_escape_mdv2(inner)}*')
|
||
|
||
text = re.sub(
|
||
r'^#{1,6}\s+(.+)$', _convert_header, text, flags=re.MULTILINE
|
||
)
|
||
|
||
# 5) Convert bold: **text** → *text* (MarkdownV2 bold)
|
||
text = re.sub(
|
||
r'\*\*(.+?)\*\*',
|
||
lambda m: _ph(f'*{_escape_mdv2(m.group(1))}*'),
|
||
text,
|
||
)
|
||
|
||
# 6) Convert italic: *text* (single asterisk) → _text_ (MarkdownV2 italic)
|
||
# [^*\n]+ prevents matching across newlines (which would corrupt
|
||
# bullet lists using * markers and multi-line content).
|
||
text = re.sub(
|
||
r'\*([^*\n]+)\*',
|
||
lambda m: _ph(f'_{_escape_mdv2(m.group(1))}_'),
|
||
text,
|
||
)
|
||
|
||
# 7) Convert strikethrough: ~~text~~ → ~text~ (MarkdownV2)
|
||
text = re.sub(
|
||
r'~~(.+?)~~',
|
||
lambda m: _ph(f'~{_escape_mdv2(m.group(1))}~'),
|
||
text,
|
||
)
|
||
|
||
# 8) Convert spoiler: ||text|| → ||text|| (protect from | escaping)
|
||
text = re.sub(
|
||
r'\|\|(.+?)\|\|',
|
||
lambda m: _ph(f'||{_escape_mdv2(m.group(1))}||'),
|
||
text,
|
||
)
|
||
|
||
# 9) Convert blockquotes: > at line start → protect > from escaping
|
||
text = re.sub(
|
||
r'^(>{1,3}) (.+)$',
|
||
lambda m: _ph(m.group(1) + ' ' + _escape_mdv2(m.group(2))),
|
||
text,
|
||
flags=re.MULTILINE,
|
||
)
|
||
|
||
# 10) Escape remaining special characters in plain text
|
||
text = _escape_mdv2(text)
|
||
|
||
# 11) Restore placeholders in reverse insertion order so that
|
||
# nested references (a placeholder inside another) resolve correctly.
|
||
for key in reversed(list(placeholders.keys())):
|
||
text = text.replace(key, placeholders[key])
|
||
|
||
# 12) Safety net: escape unescaped ( ) { } that slipped through
|
||
# placeholder processing. Split the text into code/non-code
|
||
# segments so we never touch content inside ``` or ` spans.
|
||
_code_split = re.split(r'(```[\s\S]*?```|`[^`]+`)', text)
|
||
_safe_parts = []
|
||
for _idx, _seg in enumerate(_code_split):
|
||
if _idx % 2 == 1:
|
||
# Inside code span/block — leave untouched
|
||
_safe_parts.append(_seg)
|
||
else:
|
||
# Outside code — escape bare ( ) { }
|
||
def _esc_bare(m, _seg=_seg):
|
||
s = m.start()
|
||
ch = m.group(0)
|
||
# Already escaped
|
||
if s > 0 and _seg[s - 1] == '\\':
|
||
return ch
|
||
# ( that opens a MarkdownV2 link [text](url)
|
||
if ch == '(' and s > 0 and _seg[s - 1] == ']':
|
||
return ch
|
||
# ) that closes a link URL
|
||
if ch == ')':
|
||
before = _seg[:s]
|
||
if '](http' in before or '](' in before:
|
||
# Check depth
|
||
depth = 0
|
||
for j in range(s - 1, max(s - 2000, -1), -1):
|
||
if _seg[j] == '(':
|
||
depth -= 1
|
||
if depth < 0:
|
||
if j > 0 and _seg[j - 1] == ']':
|
||
return ch
|
||
break
|
||
elif _seg[j] == ')':
|
||
depth += 1
|
||
return '\\' + ch
|
||
_safe_parts.append(re.sub(r'[(){}]', _esc_bare, _seg))
|
||
text = ''.join(_safe_parts)
|
||
|
||
return text
|
||
|
||
# ── Group mention gating ──────────────────────────────────────────────
|
||
|
||
def _telegram_require_mention(self) -> bool:
|
||
"""Return whether group chats should require an explicit bot trigger."""
|
||
configured = self.config.extra.get("require_mention")
|
||
if configured is not None:
|
||
if isinstance(configured, str):
|
||
return configured.lower() in ("true", "1", "yes", "on")
|
||
return bool(configured)
|
||
return os.getenv("TELEGRAM_REQUIRE_MENTION", "false").lower() in ("true", "1", "yes", "on")
|
||
|
||
def _telegram_free_response_chats(self) -> set[str]:
|
||
raw = self.config.extra.get("free_response_chats")
|
||
if raw is None:
|
||
raw = os.getenv("TELEGRAM_FREE_RESPONSE_CHATS", "")
|
||
if isinstance(raw, list):
|
||
return {str(part).strip() for part in raw if str(part).strip()}
|
||
return {part.strip() for part in str(raw).split(",") if part.strip()}
|
||
|
||
def _compile_mention_patterns(self) -> List[re.Pattern]:
|
||
"""Compile optional regex wake-word patterns for group triggers."""
|
||
patterns = self.config.extra.get("mention_patterns")
|
||
if patterns is None:
|
||
raw = os.getenv("TELEGRAM_MENTION_PATTERNS", "").strip()
|
||
if raw:
|
||
try:
|
||
loaded = json.loads(raw)
|
||
except Exception:
|
||
loaded = [part.strip() for part in raw.splitlines() if part.strip()]
|
||
if not loaded:
|
||
loaded = [part.strip() for part in raw.split(",") if part.strip()]
|
||
patterns = loaded
|
||
|
||
if patterns is None:
|
||
return []
|
||
if isinstance(patterns, str):
|
||
patterns = [patterns]
|
||
if not isinstance(patterns, list):
|
||
logger.warning(
|
||
"[%s] telegram mention_patterns must be a list or string; got %s",
|
||
self.name,
|
||
type(patterns).__name__,
|
||
)
|
||
return []
|
||
|
||
compiled: List[re.Pattern] = []
|
||
for pattern in patterns:
|
||
if not isinstance(pattern, str) or not pattern.strip():
|
||
continue
|
||
try:
|
||
compiled.append(re.compile(pattern, re.IGNORECASE))
|
||
except re.error as exc:
|
||
logger.warning("[%s] Invalid Telegram mention pattern %r: %s", self.name, pattern, exc)
|
||
if compiled:
|
||
logger.info("[%s] Loaded %d Telegram mention pattern(s)", self.name, len(compiled))
|
||
return compiled
|
||
|
||
def _is_group_chat(self, message: Message) -> bool:
|
||
chat = getattr(message, "chat", None)
|
||
if not chat:
|
||
return False
|
||
chat_type = str(getattr(chat, "type", "")).split(".")[-1].lower()
|
||
return chat_type in ("group", "supergroup")
|
||
|
||
def _is_reply_to_bot(self, message: Message) -> bool:
|
||
if not self._bot or not getattr(message, "reply_to_message", None):
|
||
return False
|
||
reply_user = getattr(message.reply_to_message, "from_user", None)
|
||
return bool(reply_user and getattr(reply_user, "id", None) == getattr(self._bot, "id", None))
|
||
|
||
def _message_mentions_bot(self, message: Message) -> bool:
|
||
if not self._bot:
|
||
return False
|
||
|
||
bot_username = (getattr(self._bot, "username", None) or "").lstrip("@").lower()
|
||
bot_id = getattr(self._bot, "id", None)
|
||
|
||
def _iter_sources():
|
||
yield getattr(message, "text", None) or "", getattr(message, "entities", None) or []
|
||
yield getattr(message, "caption", None) or "", getattr(message, "caption_entities", None) or []
|
||
|
||
for source_text, entities in _iter_sources():
|
||
if bot_username and f"@{bot_username}" in source_text.lower():
|
||
return True
|
||
for entity in entities:
|
||
entity_type = str(getattr(entity, "type", "")).split(".")[-1].lower()
|
||
if entity_type == "mention" and bot_username:
|
||
offset = int(getattr(entity, "offset", -1))
|
||
length = int(getattr(entity, "length", 0))
|
||
if offset < 0 or length <= 0:
|
||
continue
|
||
if source_text[offset:offset + length].strip().lower() == f"@{bot_username}":
|
||
return True
|
||
elif entity_type == "text_mention":
|
||
user = getattr(entity, "user", None)
|
||
if user and getattr(user, "id", None) == bot_id:
|
||
return True
|
||
return False
|
||
|
||
def _message_matches_mention_patterns(self, message: Message) -> bool:
|
||
if not self._mention_patterns:
|
||
return False
|
||
for candidate in (getattr(message, "text", None), getattr(message, "caption", None)):
|
||
if not candidate:
|
||
continue
|
||
for pattern in self._mention_patterns:
|
||
if pattern.search(candidate):
|
||
return True
|
||
return False
|
||
|
||
def _clean_bot_trigger_text(self, text: Optional[str]) -> Optional[str]:
|
||
if not text or not self._bot or not getattr(self._bot, "username", None):
|
||
return text
|
||
username = re.escape(self._bot.username)
|
||
cleaned = re.sub(rf"(?i)@{username}\b[,:\-]*\s*", "", text).strip()
|
||
return cleaned or text
|
||
|
||
def _should_process_message(self, message: Message, *, is_command: bool = False) -> bool:
|
||
"""Apply Telegram group trigger rules.
|
||
|
||
DMs remain unrestricted. Group/supergroup messages are accepted when:
|
||
- the chat is explicitly allowlisted in ``free_response_chats``
|
||
- ``require_mention`` is disabled
|
||
- the message is a command
|
||
- the message replies to the bot
|
||
- the bot is @mentioned
|
||
- the text/caption matches a configured regex wake-word pattern
|
||
"""
|
||
if not self._is_group_chat(message):
|
||
return True
|
||
if str(getattr(getattr(message, "chat", None), "id", "")) in self._telegram_free_response_chats():
|
||
return True
|
||
if not self._telegram_require_mention():
|
||
return True
|
||
if is_command:
|
||
return True
|
||
if self._is_reply_to_bot(message):
|
||
return True
|
||
if self._message_mentions_bot(message):
|
||
return True
|
||
return self._message_matches_mention_patterns(message)
|
||
|
||
async def _handle_text_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Handle incoming text messages.
|
||
|
||
Telegram clients split long messages into multiple updates. Buffer
|
||
rapid successive text messages from the same user/chat and aggregate
|
||
them into a single MessageEvent before dispatching.
|
||
"""
|
||
if not update.message or not update.message.text:
|
||
return
|
||
if not self._should_process_message(update.message):
|
||
return
|
||
|
||
event = self._build_message_event(update.message, MessageType.TEXT)
|
||
event.text = self._clean_bot_trigger_text(event.text)
|
||
self._enqueue_text_event(event)
|
||
|
||
async def _handle_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Handle incoming command messages."""
|
||
if not update.message or not update.message.text:
|
||
return
|
||
if not self._should_process_message(update.message, is_command=True):
|
||
return
|
||
|
||
event = self._build_message_event(update.message, MessageType.COMMAND)
|
||
await self.handle_message(event)
|
||
|
||
async def _handle_location_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Handle incoming location/venue pin messages."""
|
||
if not update.message:
|
||
return
|
||
if not self._should_process_message(update.message):
|
||
return
|
||
|
||
msg = update.message
|
||
venue = getattr(msg, "venue", None)
|
||
location = getattr(venue, "location", None) if venue else getattr(msg, "location", None)
|
||
|
||
if not location:
|
||
return
|
||
|
||
lat = getattr(location, "latitude", None)
|
||
lon = getattr(location, "longitude", None)
|
||
if lat is None or lon is None:
|
||
return
|
||
|
||
# Build a text message with coordinates and context
|
||
parts = ["[The user shared a location pin.]"]
|
||
if venue:
|
||
title = getattr(venue, "title", None)
|
||
address = getattr(venue, "address", None)
|
||
if title:
|
||
parts.append(f"Venue: {title}")
|
||
if address:
|
||
parts.append(f"Address: {address}")
|
||
parts.append(f"latitude: {lat}")
|
||
parts.append(f"longitude: {lon}")
|
||
parts.append(f"Map: https://www.google.com/maps/search/?api=1&query={lat},{lon}")
|
||
parts.append("Ask what they'd like to find nearby (restaurants, cafes, etc.) and any preferences.")
|
||
|
||
event = self._build_message_event(msg, MessageType.LOCATION)
|
||
event.text = "\n".join(parts)
|
||
await self.handle_message(event)
|
||
|
||
# ------------------------------------------------------------------
|
||
# Text message aggregation (handles Telegram client-side splits)
|
||
# ------------------------------------------------------------------
|
||
|
||
def _text_batch_key(self, event: MessageEvent) -> str:
|
||
"""Session-scoped key for text message batching."""
|
||
from gateway.session import build_session_key
|
||
return build_session_key(
|
||
event.source,
|
||
group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True),
|
||
)
|
||
|
||
def _enqueue_text_event(self, event: MessageEvent) -> None:
|
||
"""Buffer a text event and reset the flush timer.
|
||
|
||
When Telegram splits a long user message into multiple updates,
|
||
they arrive within a few hundred milliseconds. This method
|
||
concatenates them and waits for a short quiet period before
|
||
dispatching the combined message.
|
||
"""
|
||
key = self._text_batch_key(event)
|
||
existing = self._pending_text_batches.get(key)
|
||
if existing is None:
|
||
self._pending_text_batches[key] = event
|
||
else:
|
||
# Append text from the follow-up chunk
|
||
if event.text:
|
||
existing.text = f"{existing.text}\n{event.text}" if existing.text else event.text
|
||
# Merge any media that might be attached
|
||
if event.media_urls:
|
||
existing.media_urls.extend(event.media_urls)
|
||
existing.media_types.extend(event.media_types)
|
||
|
||
# Cancel any pending flush and restart the timer
|
||
prior_task = self._pending_text_batch_tasks.get(key)
|
||
if prior_task and not prior_task.done():
|
||
prior_task.cancel()
|
||
self._pending_text_batch_tasks[key] = asyncio.create_task(
|
||
self._flush_text_batch(key)
|
||
)
|
||
|
||
async def _flush_text_batch(self, key: str) -> None:
|
||
"""Wait for the quiet period then dispatch the aggregated text."""
|
||
current_task = asyncio.current_task()
|
||
try:
|
||
await asyncio.sleep(self._text_batch_delay_seconds)
|
||
event = self._pending_text_batches.pop(key, None)
|
||
if not event:
|
||
return
|
||
logger.info(
|
||
"[Telegram] Flushing text batch %s (%d chars)",
|
||
key, len(event.text or ""),
|
||
)
|
||
await self.handle_message(event)
|
||
finally:
|
||
if self._pending_text_batch_tasks.get(key) is current_task:
|
||
self._pending_text_batch_tasks.pop(key, None)
|
||
|
||
# ------------------------------------------------------------------
|
||
# Photo batching
|
||
# ------------------------------------------------------------------
|
||
|
||
def _photo_batch_key(self, event: MessageEvent, msg: Message) -> str:
|
||
"""Return a batching key for Telegram photos/albums."""
|
||
from gateway.session import build_session_key
|
||
session_key = build_session_key(
|
||
event.source,
|
||
group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True),
|
||
)
|
||
media_group_id = getattr(msg, "media_group_id", None)
|
||
if media_group_id:
|
||
return f"{session_key}:album:{media_group_id}"
|
||
return f"{session_key}:photo-burst"
|
||
|
||
async def _flush_photo_batch(self, batch_key: str) -> None:
|
||
"""Send a buffered photo burst/album as a single MessageEvent."""
|
||
current_task = asyncio.current_task()
|
||
try:
|
||
await asyncio.sleep(self._media_batch_delay_seconds)
|
||
event = self._pending_photo_batches.pop(batch_key, None)
|
||
if not event:
|
||
return
|
||
logger.info("[Telegram] Flushing photo batch %s with %d image(s)", batch_key, len(event.media_urls))
|
||
await self.handle_message(event)
|
||
finally:
|
||
if self._pending_photo_batch_tasks.get(batch_key) is current_task:
|
||
self._pending_photo_batch_tasks.pop(batch_key, None)
|
||
|
||
def _enqueue_photo_event(self, batch_key: str, event: MessageEvent) -> None:
|
||
"""Merge photo events into a pending batch and schedule flush."""
|
||
existing = self._pending_photo_batches.get(batch_key)
|
||
if existing is None:
|
||
self._pending_photo_batches[batch_key] = event
|
||
else:
|
||
existing.media_urls.extend(event.media_urls)
|
||
existing.media_types.extend(event.media_types)
|
||
if event.text:
|
||
if not existing.text:
|
||
existing.text = event.text
|
||
elif event.text not in existing.text:
|
||
existing.text = f"{existing.text}\n\n{event.text}".strip()
|
||
|
||
prior_task = self._pending_photo_batch_tasks.get(batch_key)
|
||
if prior_task and not prior_task.done():
|
||
prior_task.cancel()
|
||
|
||
self._pending_photo_batch_tasks[batch_key] = asyncio.create_task(self._flush_photo_batch(batch_key))
|
||
|
||
async def _handle_media_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
"""Handle incoming media messages, downloading images to local cache."""
|
||
if not update.message:
|
||
return
|
||
if not self._should_process_message(update.message):
|
||
return
|
||
|
||
msg = update.message
|
||
|
||
# Determine media type
|
||
if msg.sticker:
|
||
msg_type = MessageType.STICKER
|
||
elif msg.photo:
|
||
msg_type = MessageType.PHOTO
|
||
elif msg.video:
|
||
msg_type = MessageType.VIDEO
|
||
elif msg.audio:
|
||
msg_type = MessageType.AUDIO
|
||
elif msg.voice:
|
||
msg_type = MessageType.VOICE
|
||
elif msg.document:
|
||
msg_type = MessageType.DOCUMENT
|
||
else:
|
||
msg_type = MessageType.DOCUMENT
|
||
|
||
event = self._build_message_event(msg, msg_type)
|
||
|
||
# Add caption as text
|
||
if msg.caption:
|
||
event.text = self._clean_bot_trigger_text(msg.caption)
|
||
|
||
# Handle stickers: describe via vision tool with caching
|
||
if msg.sticker:
|
||
await self._handle_sticker(msg, event)
|
||
await self.handle_message(event)
|
||
return
|
||
|
||
# Download photo to local image cache so the vision tool can access it
|
||
# even after Telegram's ephemeral file URLs expire (~1 hour).
|
||
if msg.photo:
|
||
try:
|
||
# msg.photo is a list of PhotoSize sorted by size; take the largest
|
||
photo = msg.photo[-1]
|
||
file_obj = await photo.get_file()
|
||
# Download the image bytes directly into memory
|
||
image_bytes = await file_obj.download_as_bytearray()
|
||
# Determine extension from the file path if available
|
||
ext = ".jpg"
|
||
if file_obj.file_path:
|
||
for candidate in [".png", ".webp", ".gif", ".jpeg", ".jpg"]:
|
||
if file_obj.file_path.lower().endswith(candidate):
|
||
ext = candidate
|
||
break
|
||
# Save to local cache (for vision tool access)
|
||
cached_path = cache_image_from_bytes(bytes(image_bytes), ext=ext)
|
||
event.media_urls = [cached_path]
|
||
event.media_types = [f"image/{ext.lstrip('.')}" ]
|
||
logger.info("[Telegram] Cached user photo at %s", cached_path)
|
||
media_group_id = getattr(msg, "media_group_id", None)
|
||
if media_group_id:
|
||
await self._queue_media_group_event(str(media_group_id), event)
|
||
else:
|
||
batch_key = self._photo_batch_key(event, msg)
|
||
self._enqueue_photo_event(batch_key, event)
|
||
return
|
||
|
||
except Exception as e:
|
||
logger.warning("[Telegram] Failed to cache photo: %s", e, exc_info=True)
|
||
|
||
# Download voice/audio messages to cache for STT transcription
|
||
if msg.voice:
|
||
try:
|
||
file_obj = await msg.voice.get_file()
|
||
audio_bytes = await file_obj.download_as_bytearray()
|
||
cached_path = cache_audio_from_bytes(bytes(audio_bytes), ext=".ogg")
|
||
event.media_urls = [cached_path]
|
||
event.media_types = ["audio/ogg"]
|
||
logger.info("[Telegram] Cached user voice at %s", cached_path)
|
||
except Exception as e:
|
||
logger.warning("[Telegram] Failed to cache voice: %s", e, exc_info=True)
|
||
elif msg.audio:
|
||
try:
|
||
file_obj = await msg.audio.get_file()
|
||
audio_bytes = await file_obj.download_as_bytearray()
|
||
cached_path = cache_audio_from_bytes(bytes(audio_bytes), ext=".mp3")
|
||
event.media_urls = [cached_path]
|
||
event.media_types = ["audio/mp3"]
|
||
logger.info("[Telegram] Cached user audio at %s", cached_path)
|
||
except Exception as e:
|
||
logger.warning("[Telegram] Failed to cache audio: %s", e, exc_info=True)
|
||
|
||
# Download document files to cache for agent processing
|
||
elif msg.document:
|
||
doc = msg.document
|
||
try:
|
||
# Determine file extension
|
||
ext = ""
|
||
original_filename = doc.file_name or ""
|
||
if original_filename:
|
||
_, ext = os.path.splitext(original_filename)
|
||
ext = ext.lower()
|
||
|
||
# If no extension from filename, reverse-lookup from MIME type
|
||
if not ext and doc.mime_type:
|
||
mime_to_ext = {v: k for k, v in SUPPORTED_DOCUMENT_TYPES.items()}
|
||
ext = mime_to_ext.get(doc.mime_type, "")
|
||
|
||
# Check if supported
|
||
if ext not in SUPPORTED_DOCUMENT_TYPES:
|
||
supported_list = ", ".join(sorted(SUPPORTED_DOCUMENT_TYPES.keys()))
|
||
event.text = (
|
||
f"Unsupported document type '{ext or 'unknown'}'. "
|
||
f"Supported types: {supported_list}"
|
||
)
|
||
logger.info("[Telegram] Unsupported document type: %s", ext or "unknown")
|
||
await self.handle_message(event)
|
||
return
|
||
|
||
# Check file size (Telegram Bot API limit: 20 MB)
|
||
MAX_DOC_BYTES = 20 * 1024 * 1024
|
||
if not doc.file_size or doc.file_size > MAX_DOC_BYTES:
|
||
event.text = (
|
||
"The document is too large or its size could not be verified. "
|
||
"Maximum: 20 MB."
|
||
)
|
||
logger.info("[Telegram] Document too large: %s bytes", doc.file_size)
|
||
await self.handle_message(event)
|
||
return
|
||
|
||
# Download and cache
|
||
file_obj = await doc.get_file()
|
||
doc_bytes = await file_obj.download_as_bytearray()
|
||
raw_bytes = bytes(doc_bytes)
|
||
cached_path = cache_document_from_bytes(raw_bytes, original_filename or f"document{ext}")
|
||
mime_type = SUPPORTED_DOCUMENT_TYPES[ext]
|
||
event.media_urls = [cached_path]
|
||
event.media_types = [mime_type]
|
||
logger.info("[Telegram] Cached user document at %s", cached_path)
|
||
|
||
# For text files, inject content into event.text (capped at 100 KB)
|
||
MAX_TEXT_INJECT_BYTES = 100 * 1024
|
||
if ext in (".md", ".txt") and len(raw_bytes) <= MAX_TEXT_INJECT_BYTES:
|
||
try:
|
||
text_content = raw_bytes.decode("utf-8")
|
||
display_name = original_filename or f"document{ext}"
|
||
display_name = re.sub(r'[^\w.\- ]', '_', display_name)
|
||
injection = f"[Content of {display_name}]:\n{text_content}"
|
||
if event.text:
|
||
event.text = f"{injection}\n\n{event.text}"
|
||
else:
|
||
event.text = injection
|
||
except UnicodeDecodeError:
|
||
logger.warning(
|
||
"[Telegram] Could not decode text file as UTF-8, skipping content injection",
|
||
exc_info=True,
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.warning("[Telegram] Failed to cache document: %s", e, exc_info=True)
|
||
|
||
media_group_id = getattr(msg, "media_group_id", None)
|
||
if media_group_id:
|
||
await self._queue_media_group_event(str(media_group_id), event)
|
||
return
|
||
|
||
await self.handle_message(event)
|
||
|
||
async def _queue_media_group_event(self, media_group_id: str, event: MessageEvent) -> None:
|
||
"""Buffer Telegram media-group items so albums arrive as one logical event.
|
||
|
||
Telegram delivers albums as multiple updates with a shared media_group_id.
|
||
If we forward each item immediately, the gateway thinks the second image is a
|
||
new user message and interrupts the first. We debounce briefly and merge the
|
||
attachments into a single MessageEvent.
|
||
"""
|
||
existing = self._media_group_events.get(media_group_id)
|
||
if existing is None:
|
||
self._media_group_events[media_group_id] = event
|
||
else:
|
||
existing.media_urls.extend(event.media_urls)
|
||
existing.media_types.extend(event.media_types)
|
||
if event.text:
|
||
if existing.text:
|
||
if event.text not in existing.text.split("\n\n"):
|
||
existing.text = f"{existing.text}\n\n{event.text}"
|
||
else:
|
||
existing.text = event.text
|
||
|
||
prior_task = self._media_group_tasks.get(media_group_id)
|
||
if prior_task:
|
||
prior_task.cancel()
|
||
|
||
self._media_group_tasks[media_group_id] = asyncio.create_task(
|
||
self._flush_media_group_event(media_group_id)
|
||
)
|
||
|
||
async def _flush_media_group_event(self, media_group_id: str) -> None:
|
||
try:
|
||
await asyncio.sleep(self.MEDIA_GROUP_WAIT_SECONDS)
|
||
event = self._media_group_events.pop(media_group_id, None)
|
||
if event is not None:
|
||
await self.handle_message(event)
|
||
except asyncio.CancelledError:
|
||
return
|
||
finally:
|
||
self._media_group_tasks.pop(media_group_id, None)
|
||
|
||
async def _handle_sticker(self, msg: Message, event: "MessageEvent") -> None:
|
||
"""
|
||
Describe a Telegram sticker via vision analysis, with caching.
|
||
|
||
For static stickers (WEBP), we download, analyze with vision, and cache
|
||
the description by file_unique_id. For animated/video stickers, we inject
|
||
a placeholder noting the emoji.
|
||
"""
|
||
from gateway.sticker_cache import (
|
||
get_cached_description,
|
||
cache_sticker_description,
|
||
build_sticker_injection,
|
||
build_animated_sticker_injection,
|
||
STICKER_VISION_PROMPT,
|
||
)
|
||
|
||
sticker = msg.sticker
|
||
emoji = sticker.emoji or ""
|
||
set_name = sticker.set_name or ""
|
||
|
||
# Animated and video stickers can't be analyzed as static images
|
||
if sticker.is_animated or sticker.is_video:
|
||
event.text = build_animated_sticker_injection(emoji)
|
||
return
|
||
|
||
# Check the cache first
|
||
cached = get_cached_description(sticker.file_unique_id)
|
||
if cached:
|
||
event.text = build_sticker_injection(
|
||
cached["description"], cached.get("emoji", emoji), cached.get("set_name", set_name)
|
||
)
|
||
logger.info("[Telegram] Sticker cache hit: %s", sticker.file_unique_id)
|
||
return
|
||
|
||
# Cache miss -- download and analyze
|
||
try:
|
||
file_obj = await sticker.get_file()
|
||
image_bytes = await file_obj.download_as_bytearray()
|
||
cached_path = cache_image_from_bytes(bytes(image_bytes), ext=".webp")
|
||
logger.info("[Telegram] Analyzing sticker at %s", cached_path)
|
||
|
||
from tools.vision_tools import vision_analyze_tool
|
||
import json as _json
|
||
|
||
result_json = await vision_analyze_tool(
|
||
image_url=cached_path,
|
||
user_prompt=STICKER_VISION_PROMPT,
|
||
)
|
||
result = _json.loads(result_json)
|
||
|
||
if result.get("success"):
|
||
description = result.get("analysis", "a sticker")
|
||
cache_sticker_description(sticker.file_unique_id, description, emoji, set_name)
|
||
event.text = build_sticker_injection(description, emoji, set_name)
|
||
else:
|
||
# Vision failed -- use emoji as fallback
|
||
event.text = build_sticker_injection(
|
||
f"a sticker with emoji {emoji}" if emoji else "a sticker",
|
||
emoji, set_name,
|
||
)
|
||
except Exception as e:
|
||
logger.warning("[Telegram] Sticker analysis error: %s", e, exc_info=True)
|
||
event.text = build_sticker_injection(
|
||
f"a sticker with emoji {emoji}" if emoji else "a sticker",
|
||
emoji, set_name,
|
||
)
|
||
|
||
def _reload_dm_topics_from_config(self) -> None:
|
||
"""Re-read dm_topics from config.yaml and load any new thread_ids into cache.
|
||
|
||
This allows topics created externally (e.g. by the agent via API) to be
|
||
recognized without a gateway restart.
|
||
"""
|
||
try:
|
||
from hermes_constants import get_hermes_home
|
||
config_path = get_hermes_home() / "config.yaml"
|
||
if not config_path.exists():
|
||
return
|
||
|
||
import yaml as _yaml
|
||
with open(config_path, "r") as f:
|
||
config = _yaml.safe_load(f) or {}
|
||
|
||
dm_topics = (
|
||
config.get("platforms", {})
|
||
.get("telegram", {})
|
||
.get("extra", {})
|
||
.get("dm_topics", [])
|
||
)
|
||
if not dm_topics:
|
||
return
|
||
|
||
# Update in-memory config and cache any new thread_ids
|
||
self._dm_topics_config = dm_topics
|
||
for chat_entry in dm_topics:
|
||
cid = chat_entry.get("chat_id")
|
||
if not cid:
|
||
continue
|
||
for t in chat_entry.get("topics", []):
|
||
tid = t.get("thread_id")
|
||
name = t.get("name")
|
||
if tid and name:
|
||
cache_key = f"{cid}:{name}"
|
||
if cache_key not in self._dm_topics:
|
||
self._dm_topics[cache_key] = int(tid)
|
||
logger.info(
|
||
"[%s] Hot-loaded DM topic from config: %s -> thread_id=%s",
|
||
self.name, cache_key, tid,
|
||
)
|
||
except Exception as e:
|
||
logger.debug("[%s] Failed to reload dm_topics from config: %s", self.name, e)
|
||
|
||
def _get_dm_topic_info(self, chat_id: str, thread_id: Optional[str]) -> Optional[Dict[str, Any]]:
|
||
"""Look up DM topic config by chat_id and thread_id.
|
||
|
||
Returns the topic config dict (name, skill, etc.) if this thread_id
|
||
matches a known DM topic, or None.
|
||
"""
|
||
if not thread_id:
|
||
return None
|
||
|
||
thread_id_int = int(thread_id)
|
||
|
||
# Check cached topics first (created by us or loaded at startup)
|
||
for key, cached_tid in self._dm_topics.items():
|
||
if cached_tid == thread_id_int and key.startswith(f"{chat_id}:"):
|
||
topic_name = key.split(":", 1)[1]
|
||
# Find the full config for this topic
|
||
for chat_entry in self._dm_topics_config:
|
||
if str(chat_entry.get("chat_id")) == chat_id:
|
||
for t in chat_entry.get("topics", []):
|
||
if t.get("name") == topic_name:
|
||
return t
|
||
return {"name": topic_name}
|
||
|
||
# Not in cache — hot-reload config in case topics were added externally
|
||
self._reload_dm_topics_from_config()
|
||
|
||
# Check cache again after reload
|
||
for key, cached_tid in self._dm_topics.items():
|
||
if cached_tid == thread_id_int and key.startswith(f"{chat_id}:"):
|
||
topic_name = key.split(":", 1)[1]
|
||
for chat_entry in self._dm_topics_config:
|
||
if str(chat_entry.get("chat_id")) == chat_id:
|
||
for t in chat_entry.get("topics", []):
|
||
if t.get("name") == topic_name:
|
||
return t
|
||
return {"name": topic_name}
|
||
|
||
return None
|
||
|
||
def _cache_dm_topic_from_message(self, chat_id: str, thread_id: str, topic_name: str) -> None:
|
||
"""Cache a thread_id -> topic_name mapping discovered from an incoming message."""
|
||
cache_key = f"{chat_id}:{topic_name}"
|
||
if cache_key not in self._dm_topics:
|
||
self._dm_topics[cache_key] = int(thread_id)
|
||
logger.info(
|
||
"[%s] Cached DM topic from message: %s -> thread_id=%s",
|
||
self.name, cache_key, thread_id,
|
||
)
|
||
|
||
def _build_message_event(self, message: Message, msg_type: MessageType) -> MessageEvent:
|
||
"""Build a MessageEvent from a Telegram message."""
|
||
chat = message.chat
|
||
user = message.from_user
|
||
|
||
# Determine chat type
|
||
chat_type = "dm"
|
||
if chat.type in (ChatType.GROUP, ChatType.SUPERGROUP):
|
||
chat_type = "group"
|
||
elif chat.type == ChatType.CHANNEL:
|
||
chat_type = "channel"
|
||
|
||
# Resolve DM topic name and skill binding
|
||
thread_id_raw = message.message_thread_id
|
||
thread_id_str = str(thread_id_raw) if thread_id_raw else None
|
||
chat_topic = None
|
||
topic_skill = None
|
||
|
||
if chat_type == "dm" and thread_id_str:
|
||
topic_info = self._get_dm_topic_info(str(chat.id), thread_id_str)
|
||
if topic_info:
|
||
chat_topic = topic_info.get("name")
|
||
topic_skill = topic_info.get("skill")
|
||
|
||
# Also check forum_topic_created service message for topic discovery
|
||
if hasattr(message, "forum_topic_created") and message.forum_topic_created:
|
||
created_name = message.forum_topic_created.name
|
||
if created_name:
|
||
self._cache_dm_topic_from_message(str(chat.id), thread_id_str, created_name)
|
||
if not chat_topic:
|
||
chat_topic = created_name
|
||
|
||
# Build source
|
||
source = self.build_source(
|
||
chat_id=str(chat.id),
|
||
chat_name=chat.title or (chat.full_name if hasattr(chat, "full_name") else None),
|
||
chat_type=chat_type,
|
||
user_id=str(user.id) if user else None,
|
||
user_name=user.full_name if user else None,
|
||
thread_id=thread_id_str,
|
||
chat_topic=chat_topic,
|
||
)
|
||
|
||
# Extract reply context if this message is a reply
|
||
reply_to_id = None
|
||
reply_to_text = None
|
||
if message.reply_to_message:
|
||
reply_to_id = str(message.reply_to_message.message_id)
|
||
reply_to_text = message.reply_to_message.text or message.reply_to_message.caption or None
|
||
|
||
return MessageEvent(
|
||
text=message.text or "",
|
||
message_type=msg_type,
|
||
source=source,
|
||
raw_message=message,
|
||
message_id=str(message.message_id),
|
||
reply_to_message_id=reply_to_id,
|
||
reply_to_text=reply_to_text,
|
||
auto_skill=topic_skill,
|
||
timestamp=message.date,
|
||
)
|