- Modified the retrieval of tool definitions to use the agent result's "tools" key, ensuring accurate logging in the transcript. - Enhanced the response structure to include tools in the final output, improving the clarity of tool usage in session interactions.
1099 lines
46 KiB
Python
1099 lines
46 KiB
Python
"""
|
|
Gateway runner - entry point for messaging platform integrations.
|
|
|
|
This module provides:
|
|
- start_gateway(): Start all configured platform adapters
|
|
- GatewayRunner: Main class managing the gateway lifecycle
|
|
|
|
Usage:
|
|
# Start the gateway
|
|
python -m gateway.run
|
|
|
|
# Or from CLI
|
|
python cli.py --gateway
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
import re
|
|
import sys
|
|
import signal
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Dict, Optional, Any, List
|
|
|
|
# Add parent directory to path
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
# Load environment variables from ~/.hermes/.env first
|
|
from dotenv import load_dotenv
|
|
_env_path = Path.home() / '.hermes' / '.env'
|
|
if _env_path.exists():
|
|
load_dotenv(_env_path)
|
|
# Also try project .env as fallback
|
|
load_dotenv()
|
|
|
|
# Gateway runs in quiet mode - suppress debug output and use cwd directly (no temp dirs)
|
|
os.environ["HERMES_QUIET"] = "1"
|
|
|
|
# Enable interactive exec approval for dangerous commands on messaging platforms
|
|
os.environ["HERMES_EXEC_ASK"] = "1"
|
|
|
|
# Set terminal working directory for messaging platforms
|
|
# Uses MESSAGING_CWD if set, otherwise defaults to home directory
|
|
# This is separate from CLI which uses the directory where `hermes` is run
|
|
messaging_cwd = os.getenv("MESSAGING_CWD") or str(Path.home())
|
|
os.environ["TERMINAL_CWD"] = messaging_cwd
|
|
|
|
from gateway.config import (
|
|
Platform,
|
|
GatewayConfig,
|
|
load_gateway_config,
|
|
)
|
|
from gateway.session import (
|
|
SessionStore,
|
|
SessionSource,
|
|
SessionContext,
|
|
build_session_context,
|
|
build_session_context_prompt,
|
|
)
|
|
from gateway.delivery import DeliveryRouter, DeliveryTarget
|
|
from gateway.platforms.base import BasePlatformAdapter, MessageEvent, MessageType
|
|
|
|
|
|
class GatewayRunner:
|
|
"""
|
|
Main gateway controller.
|
|
|
|
Manages the lifecycle of all platform adapters and routes
|
|
messages to/from the agent.
|
|
"""
|
|
|
|
def __init__(self, config: Optional[GatewayConfig] = None):
|
|
self.config = config or load_gateway_config()
|
|
self.adapters: Dict[Platform, BasePlatformAdapter] = {}
|
|
self.session_store = SessionStore(self.config.sessions_dir, self.config)
|
|
self.delivery_router = DeliveryRouter(self.config)
|
|
self._running = False
|
|
self._shutdown_event = asyncio.Event()
|
|
|
|
# Track running agents per session for interrupt support
|
|
# Key: session_key, Value: AIAgent instance
|
|
self._running_agents: Dict[str, Any] = {}
|
|
self._pending_messages: Dict[str, str] = {} # Queued messages during interrupt
|
|
|
|
# Track pending exec approvals per session
|
|
# Key: session_key, Value: {"command": str, "pattern_key": str}
|
|
self._pending_approvals: Dict[str, Dict[str, str]] = {}
|
|
|
|
# DM pairing store for code-based user authorization
|
|
from gateway.pairing import PairingStore
|
|
self.pairing_store = PairingStore()
|
|
|
|
# Event hook system
|
|
from gateway.hooks import HookRegistry
|
|
self.hooks = HookRegistry()
|
|
|
|
async def start(self) -> bool:
|
|
"""
|
|
Start the gateway and all configured platform adapters.
|
|
|
|
Returns True if at least one adapter connected successfully.
|
|
"""
|
|
print("[gateway] Starting Hermes Gateway...")
|
|
print(f"[gateway] Session storage: {self.config.sessions_dir}")
|
|
|
|
# Discover and load event hooks
|
|
self.hooks.discover_and_load()
|
|
|
|
connected_count = 0
|
|
|
|
# Initialize and connect each configured platform
|
|
for platform, platform_config in self.config.platforms.items():
|
|
if not platform_config.enabled:
|
|
continue
|
|
|
|
adapter = self._create_adapter(platform, platform_config)
|
|
if not adapter:
|
|
print(f"[gateway] No adapter available for {platform.value}")
|
|
continue
|
|
|
|
# Set up message handler
|
|
adapter.set_message_handler(self._handle_message)
|
|
|
|
# Try to connect
|
|
print(f"[gateway] Connecting to {platform.value}...")
|
|
try:
|
|
success = await adapter.connect()
|
|
if success:
|
|
self.adapters[platform] = adapter
|
|
connected_count += 1
|
|
print(f"[gateway] ✓ {platform.value} connected")
|
|
else:
|
|
print(f"[gateway] ✗ {platform.value} failed to connect")
|
|
except Exception as e:
|
|
print(f"[gateway] ✗ {platform.value} error: {e}")
|
|
|
|
if connected_count == 0:
|
|
print("[gateway] No platforms connected. Check your configuration.")
|
|
return False
|
|
|
|
# Update delivery router with adapters
|
|
self.delivery_router.adapters = self.adapters
|
|
|
|
self._running = True
|
|
|
|
# Emit gateway:startup hook
|
|
hook_count = len(self.hooks.loaded_hooks)
|
|
if hook_count:
|
|
print(f"[gateway] {hook_count} hook(s) loaded")
|
|
await self.hooks.emit("gateway:startup", {
|
|
"platforms": [p.value for p in self.adapters.keys()],
|
|
})
|
|
|
|
print(f"[gateway] Gateway running with {connected_count} platform(s)")
|
|
print("[gateway] Press Ctrl+C to stop")
|
|
|
|
return True
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop the gateway and disconnect all adapters."""
|
|
print("[gateway] Stopping gateway...")
|
|
self._running = False
|
|
|
|
for platform, adapter in self.adapters.items():
|
|
try:
|
|
await adapter.disconnect()
|
|
print(f"[gateway] ✓ {platform.value} disconnected")
|
|
except Exception as e:
|
|
print(f"[gateway] ✗ {platform.value} disconnect error: {e}")
|
|
|
|
self.adapters.clear()
|
|
self._shutdown_event.set()
|
|
print("[gateway] Gateway stopped")
|
|
|
|
async def wait_for_shutdown(self) -> None:
|
|
"""Wait for shutdown signal."""
|
|
await self._shutdown_event.wait()
|
|
|
|
def _create_adapter(
|
|
self,
|
|
platform: Platform,
|
|
config: Any
|
|
) -> Optional[BasePlatformAdapter]:
|
|
"""Create the appropriate adapter for a platform."""
|
|
if platform == Platform.TELEGRAM:
|
|
from gateway.platforms.telegram import TelegramAdapter, check_telegram_requirements
|
|
if not check_telegram_requirements():
|
|
print(f"[gateway] Telegram: python-telegram-bot not installed")
|
|
return None
|
|
return TelegramAdapter(config)
|
|
|
|
elif platform == Platform.DISCORD:
|
|
from gateway.platforms.discord import DiscordAdapter, check_discord_requirements
|
|
if not check_discord_requirements():
|
|
print(f"[gateway] Discord: discord.py not installed")
|
|
return None
|
|
return DiscordAdapter(config)
|
|
|
|
elif platform == Platform.WHATSAPP:
|
|
from gateway.platforms.whatsapp import WhatsAppAdapter, check_whatsapp_requirements
|
|
if not check_whatsapp_requirements():
|
|
print(f"[gateway] WhatsApp: Node.js not installed or bridge not configured")
|
|
return None
|
|
return WhatsAppAdapter(config)
|
|
|
|
elif platform == Platform.SLACK:
|
|
from gateway.platforms.slack import SlackAdapter, check_slack_requirements
|
|
if not check_slack_requirements():
|
|
print(f"[gateway] Slack: slack-bolt not installed. Run: pip install 'hermes-agent[slack]'")
|
|
return None
|
|
return SlackAdapter(config)
|
|
|
|
return None
|
|
|
|
def _is_user_authorized(self, source: SessionSource) -> bool:
|
|
"""
|
|
Check if a user is authorized to use the bot.
|
|
|
|
Checks in order:
|
|
1. Environment variable allowlists (TELEGRAM_ALLOWED_USERS, etc.)
|
|
2. DM pairing approved list
|
|
3. If no allowlists AND no pairing approvals exist, allow all (open access)
|
|
"""
|
|
user_id = source.user_id
|
|
if not user_id:
|
|
return False # Can't verify unknown users
|
|
|
|
# Check platform-specific allowlist first
|
|
platform_env_map = {
|
|
Platform.TELEGRAM: "TELEGRAM_ALLOWED_USERS",
|
|
Platform.DISCORD: "DISCORD_ALLOWED_USERS",
|
|
Platform.WHATSAPP: "WHATSAPP_ALLOWED_USERS",
|
|
Platform.SLACK: "SLACK_ALLOWED_USERS",
|
|
}
|
|
|
|
platform_allowlist = os.getenv(platform_env_map.get(source.platform, ""))
|
|
global_allowlist = os.getenv("GATEWAY_ALLOWED_USERS", "")
|
|
|
|
# Check pairing store (always checked, regardless of allowlists)
|
|
platform_name = source.platform.value if source.platform else ""
|
|
if self.pairing_store.is_approved(platform_name, user_id):
|
|
return True
|
|
|
|
# If no allowlists configured and no pairing approvals, allow all (backward compatible)
|
|
if not platform_allowlist and not global_allowlist:
|
|
return True
|
|
|
|
# Check if user is in any allowlist
|
|
allowed_ids = set()
|
|
if platform_allowlist:
|
|
allowed_ids.update(uid.strip() for uid in platform_allowlist.split(","))
|
|
if global_allowlist:
|
|
allowed_ids.update(uid.strip() for uid in global_allowlist.split(","))
|
|
|
|
return user_id in allowed_ids
|
|
|
|
async def _handle_message(self, event: MessageEvent) -> Optional[str]:
|
|
"""
|
|
Handle an incoming message from any platform.
|
|
|
|
This is the core message processing pipeline:
|
|
1. Check user authorization
|
|
2. Check for commands (/new, /reset, etc.)
|
|
3. Check for running agent and interrupt if needed
|
|
4. Get or create session
|
|
5. Build context for agent
|
|
6. Run agent conversation
|
|
7. Return response
|
|
"""
|
|
source = event.source
|
|
|
|
# Check if user is authorized
|
|
if not self._is_user_authorized(source):
|
|
print(f"[gateway] Unauthorized user: {source.user_id} ({source.user_name}) on {source.platform.value}")
|
|
# In DMs: offer pairing code. In groups: silently ignore.
|
|
if source.chat_type == "dm":
|
|
platform_name = source.platform.value if source.platform else "unknown"
|
|
code = self.pairing_store.generate_code(
|
|
platform_name, source.user_id, source.user_name or ""
|
|
)
|
|
if code:
|
|
adapter = self.adapters.get(source.platform)
|
|
if adapter:
|
|
await adapter.send(
|
|
source.chat_id,
|
|
f"Hi~ I don't recognize you yet!\n\n"
|
|
f"Here's your pairing code: `{code}`\n\n"
|
|
f"Ask the bot owner to run:\n"
|
|
f"`hermes pairing approve {platform_name} {code}`"
|
|
)
|
|
else:
|
|
adapter = self.adapters.get(source.platform)
|
|
if adapter:
|
|
await adapter.send(
|
|
source.chat_id,
|
|
"Too many pairing requests right now~ "
|
|
"Please try again later!"
|
|
)
|
|
return None
|
|
|
|
# Check for commands
|
|
command = event.get_command()
|
|
if command in ["new", "reset"]:
|
|
return await self._handle_reset_command(event)
|
|
|
|
if command == "status":
|
|
return await self._handle_status_command(event)
|
|
|
|
if command == "stop":
|
|
return await self._handle_stop_command(event)
|
|
|
|
# Check for pending exec approval responses
|
|
session_key_preview = f"agent:main:{source.platform.value}:{source.chat_type}:{source.chat_id}" if source.chat_type != "dm" else f"agent:main:{source.platform.value}:dm"
|
|
if session_key_preview in self._pending_approvals:
|
|
user_text = event.text.strip().lower()
|
|
if user_text in ("yes", "y", "approve", "ok", "go", "do it"):
|
|
approval = self._pending_approvals.pop(session_key_preview)
|
|
cmd = approval["command"]
|
|
pattern_key = approval.get("pattern_key", "")
|
|
print(f"[gateway] ✅ User approved dangerous command: {cmd[:60]}...")
|
|
# Approve for session and re-run via terminal_tool with force=True
|
|
from tools.terminal_tool import terminal_tool, _session_approved_patterns
|
|
_session_approved_patterns.add(pattern_key)
|
|
result = terminal_tool(command=cmd, force=True)
|
|
return f"✅ Command approved and executed.\n\n```\n{result[:3500]}\n```"
|
|
elif user_text in ("no", "n", "deny", "cancel", "nope"):
|
|
self._pending_approvals.pop(session_key_preview)
|
|
return "❌ Command denied."
|
|
# If it's not clearly an approval/denial, fall through to normal processing
|
|
|
|
# Get or create session
|
|
session_entry = self.session_store.get_or_create_session(source)
|
|
session_key = session_entry.session_key
|
|
|
|
# Check if there's already a running agent for this session
|
|
if session_key in self._running_agents:
|
|
running_agent = self._running_agents[session_key]
|
|
print(f"[gateway] ⚡ Interrupting running agent for session {session_key[:20]}...")
|
|
running_agent.interrupt(event.text)
|
|
# Store the new message to be processed after current agent finishes
|
|
self._pending_messages[session_key] = event.text
|
|
return None # Don't respond yet - let the interrupt handle it
|
|
|
|
# Build session context
|
|
context = build_session_context(source, self.config, session_entry)
|
|
|
|
# Set environment variables for tools
|
|
self._set_session_env(context)
|
|
|
|
# Build the context prompt to inject
|
|
context_prompt = build_session_context_prompt(context)
|
|
|
|
# Load conversation history from transcript
|
|
history = self.session_store.load_transcript(session_entry.session_id)
|
|
|
|
# -----------------------------------------------------------------
|
|
# Auto-analyze images sent by the user
|
|
#
|
|
# If the user attached image(s), we run the vision tool eagerly so
|
|
# the conversation model always receives a text description. The
|
|
# local file path is also included so the model can re-examine the
|
|
# image later with a more targeted question via vision_analyze.
|
|
#
|
|
# We filter to image paths only (by media_type) so that non-image
|
|
# attachments (documents, audio, etc.) are not sent to the vision
|
|
# tool even when they appear in the same message.
|
|
# -----------------------------------------------------------------
|
|
message_text = event.text or ""
|
|
if event.media_urls:
|
|
image_paths = []
|
|
for i, path in enumerate(event.media_urls):
|
|
# Check media_types if available; otherwise infer from message type
|
|
mtype = event.media_types[i] if i < len(event.media_types) else ""
|
|
is_image = (
|
|
mtype.startswith("image/")
|
|
or event.message_type == MessageType.PHOTO
|
|
)
|
|
if is_image:
|
|
image_paths.append(path)
|
|
if image_paths:
|
|
message_text = await self._enrich_message_with_vision(
|
|
message_text, image_paths
|
|
)
|
|
|
|
# -----------------------------------------------------------------
|
|
# Auto-transcribe voice/audio messages sent by the user
|
|
# -----------------------------------------------------------------
|
|
if event.media_urls:
|
|
audio_paths = []
|
|
for i, path in enumerate(event.media_urls):
|
|
mtype = event.media_types[i] if i < len(event.media_types) else ""
|
|
is_audio = (
|
|
mtype.startswith("audio/")
|
|
or event.message_type in (MessageType.VOICE, MessageType.AUDIO)
|
|
)
|
|
if is_audio:
|
|
audio_paths.append(path)
|
|
if audio_paths:
|
|
message_text = await self._enrich_message_with_transcription(
|
|
message_text, audio_paths
|
|
)
|
|
|
|
try:
|
|
# Emit agent:start hook
|
|
hook_ctx = {
|
|
"platform": source.platform.value if source.platform else "",
|
|
"user_id": source.user_id,
|
|
"session_id": session_entry.session_id,
|
|
"message": message_text[:500],
|
|
}
|
|
await self.hooks.emit("agent:start", hook_ctx)
|
|
|
|
# Run the agent
|
|
agent_result = await self._run_agent(
|
|
message=message_text,
|
|
context_prompt=context_prompt,
|
|
history=history,
|
|
source=source,
|
|
session_id=session_entry.session_id,
|
|
session_key=session_key
|
|
)
|
|
|
|
response = agent_result.get("final_response", "")
|
|
agent_messages = agent_result.get("messages", [])
|
|
|
|
# Emit agent:end hook
|
|
await self.hooks.emit("agent:end", {
|
|
**hook_ctx,
|
|
"response": (response or "")[:500],
|
|
})
|
|
|
|
# Check if the agent encountered a dangerous command needing approval
|
|
# The terminal tool stores the last pending approval globally
|
|
try:
|
|
from tools.terminal_tool import _last_pending_approval
|
|
if _last_pending_approval:
|
|
self._pending_approvals[session_key] = _last_pending_approval.copy()
|
|
# Clear the global so it doesn't leak to other sessions
|
|
_last_pending_approval.clear()
|
|
except Exception:
|
|
pass
|
|
|
|
# Save the full conversation to the transcript, including tool calls.
|
|
# This preserves the complete agent loop (tool_calls, tool results,
|
|
# intermediate reasoning) so sessions can be resumed with full context
|
|
# and transcripts are useful for debugging and training data.
|
|
ts = datetime.now().isoformat()
|
|
|
|
# If this is a fresh session (no history), write the full tool
|
|
# definitions as the first entry so the transcript is self-describing
|
|
# -- the same list of dicts sent as tools=[...] in the API request.
|
|
if not history:
|
|
tool_defs = agent_result.get("tools", [])
|
|
self.session_store.append_to_transcript(
|
|
session_entry.session_id,
|
|
{
|
|
"role": "session_meta",
|
|
"tools": tool_defs or [],
|
|
"model": os.getenv("HERMES_MODEL", ""),
|
|
"platform": source.platform.value if source.platform else "",
|
|
"timestamp": ts,
|
|
}
|
|
)
|
|
|
|
# Find only the NEW messages from this turn (skip history we loaded)
|
|
history_len = len(history)
|
|
new_messages = agent_messages[history_len:] if len(agent_messages) > history_len else agent_messages
|
|
|
|
# If no new messages found (edge case), fall back to simple user/assistant
|
|
if not new_messages:
|
|
self.session_store.append_to_transcript(
|
|
session_entry.session_id,
|
|
{"role": "user", "content": message_text, "timestamp": ts}
|
|
)
|
|
if response:
|
|
self.session_store.append_to_transcript(
|
|
session_entry.session_id,
|
|
{"role": "assistant", "content": response, "timestamp": ts}
|
|
)
|
|
else:
|
|
for msg in new_messages:
|
|
# Skip system messages (they're rebuilt each run)
|
|
if msg.get("role") == "system":
|
|
continue
|
|
# Add timestamp to each message for debugging
|
|
entry = {**msg, "timestamp": ts}
|
|
self.session_store.append_to_transcript(
|
|
session_entry.session_id, entry
|
|
)
|
|
|
|
# Update session
|
|
self.session_store.update_session(session_entry.session_key)
|
|
|
|
return response
|
|
|
|
except Exception as e:
|
|
print(f"[gateway] Agent error: {e}")
|
|
return f"Sorry, I encountered an error: {str(e)}"
|
|
finally:
|
|
# Clear session env
|
|
self._clear_session_env()
|
|
|
|
async def _handle_reset_command(self, event: MessageEvent) -> str:
|
|
"""Handle /new or /reset command."""
|
|
source = event.source
|
|
|
|
# Get existing session key
|
|
session_key = f"agent:main:{source.platform.value}:" + \
|
|
(f"dm" if source.chat_type == "dm" else f"{source.chat_type}:{source.chat_id}")
|
|
|
|
# Reset the session
|
|
new_entry = self.session_store.reset_session(session_key)
|
|
|
|
# Emit session:reset hook
|
|
await self.hooks.emit("session:reset", {
|
|
"platform": source.platform.value if source.platform else "",
|
|
"user_id": source.user_id,
|
|
"session_key": session_key,
|
|
})
|
|
|
|
if new_entry:
|
|
return "✨ Session reset! I've started fresh with no memory of our previous conversation."
|
|
else:
|
|
# No existing session, just create one
|
|
self.session_store.get_or_create_session(source, force_new=True)
|
|
return "✨ New session started!"
|
|
|
|
async def _handle_status_command(self, event: MessageEvent) -> str:
|
|
"""Handle /status command."""
|
|
source = event.source
|
|
session_entry = self.session_store.get_or_create_session(source)
|
|
|
|
connected_platforms = [p.value for p in self.adapters.keys()]
|
|
|
|
# Check if there's an active agent
|
|
session_key = session_entry.session_key
|
|
is_running = session_key in self._running_agents
|
|
|
|
lines = [
|
|
"📊 **Hermes Gateway Status**",
|
|
"",
|
|
f"**Session ID:** `{session_entry.session_id[:12]}...`",
|
|
f"**Created:** {session_entry.created_at.strftime('%Y-%m-%d %H:%M')}",
|
|
f"**Last Activity:** {session_entry.updated_at.strftime('%Y-%m-%d %H:%M')}",
|
|
f"**Tokens:** {session_entry.total_tokens:,}",
|
|
f"**Agent Running:** {'Yes ⚡' if is_running else 'No'}",
|
|
"",
|
|
f"**Connected Platforms:** {', '.join(connected_platforms)}",
|
|
]
|
|
|
|
return "\n".join(lines)
|
|
|
|
async def _handle_stop_command(self, event: MessageEvent) -> str:
|
|
"""Handle /stop command - interrupt a running agent."""
|
|
source = event.source
|
|
session_entry = self.session_store.get_or_create_session(source)
|
|
session_key = session_entry.session_key
|
|
|
|
if session_key in self._running_agents:
|
|
agent = self._running_agents[session_key]
|
|
agent.interrupt()
|
|
return "⚡ Stopping the current task... The agent will finish its current step and respond."
|
|
else:
|
|
return "No active task to stop."
|
|
|
|
def _set_session_env(self, context: SessionContext) -> None:
|
|
"""Set environment variables for the current session."""
|
|
os.environ["HERMES_SESSION_PLATFORM"] = context.source.platform.value
|
|
os.environ["HERMES_SESSION_CHAT_ID"] = context.source.chat_id
|
|
if context.source.chat_name:
|
|
os.environ["HERMES_SESSION_CHAT_NAME"] = context.source.chat_name
|
|
|
|
def _clear_session_env(self) -> None:
|
|
"""Clear session environment variables."""
|
|
for var in ["HERMES_SESSION_PLATFORM", "HERMES_SESSION_CHAT_ID", "HERMES_SESSION_CHAT_NAME"]:
|
|
if var in os.environ:
|
|
del os.environ[var]
|
|
|
|
async def _enrich_message_with_vision(
|
|
self,
|
|
user_text: str,
|
|
image_paths: List[str],
|
|
) -> str:
|
|
"""
|
|
Auto-analyze user-attached images with the vision tool and prepend
|
|
the descriptions to the message text.
|
|
|
|
Each image is analyzed with a general-purpose prompt. The resulting
|
|
description *and* the local cache path are injected so the model can:
|
|
1. Immediately understand what the user sent (no extra tool call).
|
|
2. Re-examine the image with vision_analyze if it needs more detail.
|
|
|
|
Args:
|
|
user_text: The user's original caption / message text.
|
|
image_paths: List of local file paths to cached images.
|
|
|
|
Returns:
|
|
The enriched message string with vision descriptions prepended.
|
|
"""
|
|
from tools.vision_tools import vision_analyze_tool
|
|
import json as _json
|
|
|
|
analysis_prompt = (
|
|
"Describe everything visible in this image in thorough detail. "
|
|
"Include any text, code, data, objects, people, layout, colors, "
|
|
"and any other notable visual information."
|
|
)
|
|
|
|
enriched_parts = []
|
|
for path in image_paths:
|
|
try:
|
|
print(f"[gateway] Auto-analyzing user image: {path}", flush=True)
|
|
result_json = await vision_analyze_tool(
|
|
image_url=path,
|
|
user_prompt=analysis_prompt,
|
|
)
|
|
result = _json.loads(result_json)
|
|
if result.get("success"):
|
|
description = result.get("analysis", "")
|
|
enriched_parts.append(
|
|
f"[The user sent an image~ Here's what I can see:\n{description}]\n"
|
|
f"[If you need a closer look, use vision_analyze with "
|
|
f"image_url: {path} ~]"
|
|
)
|
|
else:
|
|
enriched_parts.append(
|
|
"[The user sent an image but I couldn't quite see it "
|
|
"this time (>_<) You can try looking at it yourself "
|
|
f"with vision_analyze using image_url: {path}]"
|
|
)
|
|
except Exception as e:
|
|
print(f"[gateway] Vision auto-analysis error: {e}", flush=True)
|
|
enriched_parts.append(
|
|
f"[The user sent an image but something went wrong when I "
|
|
f"tried to look at it~ You can try examining it yourself "
|
|
f"with vision_analyze using image_url: {path}]"
|
|
)
|
|
|
|
# Combine: vision descriptions first, then the user's original text
|
|
if enriched_parts:
|
|
prefix = "\n\n".join(enriched_parts)
|
|
if user_text:
|
|
return f"{prefix}\n\n{user_text}"
|
|
return prefix
|
|
return user_text
|
|
|
|
async def _enrich_message_with_transcription(
|
|
self,
|
|
user_text: str,
|
|
audio_paths: List[str],
|
|
) -> str:
|
|
"""
|
|
Auto-transcribe user voice/audio messages using OpenAI Whisper API
|
|
and prepend the transcript to the message text.
|
|
|
|
Args:
|
|
user_text: The user's original caption / message text.
|
|
audio_paths: List of local file paths to cached audio files.
|
|
|
|
Returns:
|
|
The enriched message string with transcriptions prepended.
|
|
"""
|
|
from tools.transcription_tools import transcribe_audio
|
|
import asyncio
|
|
|
|
enriched_parts = []
|
|
for path in audio_paths:
|
|
try:
|
|
print(f"[gateway] Transcribing user voice: {path}", flush=True)
|
|
result = await asyncio.to_thread(transcribe_audio, path)
|
|
if result["success"]:
|
|
transcript = result["transcript"]
|
|
enriched_parts.append(
|
|
f'[The user sent a voice message~ '
|
|
f'Here\'s what they said: "{transcript}"]'
|
|
)
|
|
else:
|
|
error = result.get("error", "unknown error")
|
|
if "OPENAI_API_KEY" in error:
|
|
enriched_parts.append(
|
|
"[The user sent a voice message but I can't listen "
|
|
"to it right now~ OPENAI_API_KEY isn't set up yet "
|
|
"(';w;') Let them know!]"
|
|
)
|
|
else:
|
|
enriched_parts.append(
|
|
"[The user sent a voice message but I had trouble "
|
|
f"transcribing it~ ({error})]"
|
|
)
|
|
except Exception as e:
|
|
print(f"[gateway] Transcription error: {e}", flush=True)
|
|
enriched_parts.append(
|
|
"[The user sent a voice message but something went wrong "
|
|
"when I tried to listen to it~ Let them know!]"
|
|
)
|
|
|
|
if enriched_parts:
|
|
prefix = "\n\n".join(enriched_parts)
|
|
if user_text:
|
|
return f"{prefix}\n\n{user_text}"
|
|
return prefix
|
|
return user_text
|
|
|
|
async def _run_agent(
|
|
self,
|
|
message: str,
|
|
context_prompt: str,
|
|
history: List[Dict[str, Any]],
|
|
source: SessionSource,
|
|
session_id: str,
|
|
session_key: str = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Run the agent with the given message and context.
|
|
|
|
Returns the full result dict from run_conversation, including:
|
|
- "final_response": str (the text to send back)
|
|
- "messages": list (full conversation including tool calls)
|
|
- "api_calls": int
|
|
- "completed": bool
|
|
|
|
This is run in a thread pool to not block the event loop.
|
|
Supports interruption via new messages.
|
|
"""
|
|
from run_agent import AIAgent
|
|
import queue
|
|
|
|
# Determine toolset based on platform
|
|
toolset_map = {
|
|
Platform.LOCAL: "hermes-cli",
|
|
Platform.TELEGRAM: "hermes-telegram",
|
|
Platform.DISCORD: "hermes-discord",
|
|
Platform.WHATSAPP: "hermes-whatsapp",
|
|
}
|
|
toolset = toolset_map.get(source.platform, "hermes-telegram")
|
|
|
|
# Check if tool progress notifications are enabled
|
|
tool_progress_enabled = os.getenv("HERMES_TOOL_PROGRESS", "").lower() in ("1", "true", "yes")
|
|
progress_mode = os.getenv("HERMES_TOOL_PROGRESS_MODE", "new") # "all" or "new" (only new tools)
|
|
|
|
# Queue for progress messages (thread-safe)
|
|
progress_queue = queue.Queue() if tool_progress_enabled else None
|
|
last_tool = [None] # Mutable container for tracking in closure
|
|
|
|
def progress_callback(tool_name: str, preview: str = None):
|
|
"""Callback invoked by agent when a tool is called."""
|
|
if not progress_queue:
|
|
return
|
|
|
|
# "new" mode: only report when tool changes
|
|
if progress_mode == "new" and tool_name == last_tool[0]:
|
|
return
|
|
last_tool[0] = tool_name
|
|
|
|
# Build progress message with primary argument preview
|
|
tool_emojis = {
|
|
"terminal": "💻",
|
|
"web_search": "🔍",
|
|
"web_extract": "📄",
|
|
"read_file": "📖",
|
|
"write_file": "✍️",
|
|
"patch": "🔧",
|
|
"search": "🔎",
|
|
"list_directory": "📂",
|
|
"image_generate": "🎨",
|
|
"text_to_speech": "🔊",
|
|
"browser_navigate": "🌐",
|
|
"browser_click": "👆",
|
|
"browser_type": "⌨️",
|
|
"browser_snapshot": "📸",
|
|
"moa_query": "🧠",
|
|
"mixture_of_agents": "🧠",
|
|
"vision_analyze": "👁️",
|
|
"skill_view": "📚",
|
|
"skills_list": "📋",
|
|
}
|
|
emoji = tool_emojis.get(tool_name, "⚙️")
|
|
|
|
if preview:
|
|
# Truncate preview to keep messages clean
|
|
if len(preview) > 40:
|
|
preview = preview[:37] + "..."
|
|
msg = f"{emoji} {tool_name}... \"{preview}\""
|
|
else:
|
|
msg = f"{emoji} {tool_name}..."
|
|
|
|
progress_queue.put(msg)
|
|
|
|
# Background task to send progress messages
|
|
async def send_progress_messages():
|
|
if not progress_queue:
|
|
return
|
|
|
|
adapter = self.adapters.get(source.platform)
|
|
if not adapter:
|
|
return
|
|
|
|
while True:
|
|
try:
|
|
# Non-blocking check with small timeout
|
|
msg = progress_queue.get_nowait()
|
|
await adapter.send(chat_id=source.chat_id, content=msg)
|
|
# Restore typing indicator after sending progress message
|
|
await asyncio.sleep(0.3)
|
|
await adapter.send_typing(source.chat_id)
|
|
except queue.Empty:
|
|
await asyncio.sleep(0.3) # Check again soon
|
|
except asyncio.CancelledError:
|
|
# Drain remaining messages
|
|
while not progress_queue.empty():
|
|
try:
|
|
msg = progress_queue.get_nowait()
|
|
await adapter.send(chat_id=source.chat_id, content=msg)
|
|
except:
|
|
break
|
|
return
|
|
except Exception as e:
|
|
print(f"[Gateway] Progress message error: {e}")
|
|
await asyncio.sleep(1)
|
|
|
|
# We need to share the agent instance for interrupt support
|
|
agent_holder = [None] # Mutable container for the agent instance
|
|
result_holder = [None] # Mutable container for the result
|
|
tools_holder = [None] # Mutable container for the tool definitions
|
|
|
|
def run_sync():
|
|
# Read from env var or use default (same as CLI)
|
|
max_iterations = int(os.getenv("HERMES_MAX_ITERATIONS", "60"))
|
|
|
|
# Map platform enum to the platform hint key the agent understands.
|
|
# Platform.LOCAL ("local") maps to "cli"; others pass through as-is.
|
|
platform_key = "cli" if source.platform == Platform.LOCAL else source.platform.value
|
|
|
|
agent = AIAgent(
|
|
model=os.getenv("HERMES_MODEL", "anthropic/claude-opus-4.6"),
|
|
max_iterations=max_iterations,
|
|
quiet_mode=True,
|
|
enabled_toolsets=[toolset],
|
|
ephemeral_system_prompt=context_prompt,
|
|
session_id=session_id,
|
|
tool_progress_callback=progress_callback if tool_progress_enabled else None,
|
|
platform=platform_key, # Tells the agent which interface to format for
|
|
)
|
|
|
|
# Store agent reference for interrupt support
|
|
agent_holder[0] = agent
|
|
# Capture the full tool definitions for transcript logging
|
|
tools_holder[0] = agent.tools if hasattr(agent, 'tools') else None
|
|
|
|
# Convert history to agent format.
|
|
# Two cases:
|
|
# 1. Normal path (from transcript): simple {role, content, timestamp} dicts
|
|
# - Strip timestamps, keep role+content
|
|
# 2. Interrupt path (from agent result["messages"]): full agent messages
|
|
# that may include tool_calls, tool_call_id, reasoning, etc.
|
|
# - These must be passed through intact so the API sees valid
|
|
# assistant→tool sequences (dropping tool_calls causes 500 errors)
|
|
agent_history = []
|
|
for msg in history:
|
|
role = msg.get("role")
|
|
if not role:
|
|
continue
|
|
|
|
# Skip metadata entries (tool definitions, session info)
|
|
# -- these are for transcript logging, not for the LLM
|
|
if role in ("session_meta",):
|
|
continue
|
|
|
|
# Skip system messages -- the agent rebuilds its own system prompt
|
|
if role == "system":
|
|
continue
|
|
|
|
# Rich agent messages (tool_calls, tool results) must be passed
|
|
# through intact so the API sees valid assistant→tool sequences
|
|
has_tool_calls = "tool_calls" in msg
|
|
has_tool_call_id = "tool_call_id" in msg
|
|
is_tool_message = role == "tool"
|
|
|
|
if has_tool_calls or has_tool_call_id or is_tool_message:
|
|
clean_msg = {k: v for k, v in msg.items() if k != "timestamp"}
|
|
agent_history.append(clean_msg)
|
|
else:
|
|
# Simple text message - just need role and content
|
|
content = msg.get("content")
|
|
if content:
|
|
agent_history.append({"role": role, "content": content})
|
|
|
|
result = agent.run_conversation(message, conversation_history=agent_history)
|
|
result_holder[0] = result
|
|
|
|
# Return final response, or a message if something went wrong
|
|
final_response = result.get("final_response")
|
|
if not final_response:
|
|
error_msg = f"⚠️ {result['error']}" if result.get("error") else "(No response generated)"
|
|
return {
|
|
"final_response": error_msg,
|
|
"messages": result.get("messages", []),
|
|
"api_calls": result.get("api_calls", 0),
|
|
"tools": tools_holder[0] or [],
|
|
}
|
|
|
|
# Scan tool results for MEDIA:<path> tags that need to be delivered
|
|
# as native audio/file attachments. The TTS tool embeds MEDIA: tags
|
|
# in its JSON response, but the model's final text reply usually
|
|
# doesn't include them. We collect unique tags from tool results and
|
|
# append any that aren't already present in the final response, so the
|
|
# adapter's extract_media() can find and deliver the files exactly once.
|
|
if "MEDIA:" not in final_response:
|
|
media_tags = []
|
|
has_voice_directive = False
|
|
for msg in result.get("messages", []):
|
|
if msg.get("role") == "tool" or msg.get("role") == "function":
|
|
content = msg.get("content", "")
|
|
if "MEDIA:" in content:
|
|
for match in re.finditer(r'MEDIA:(\S+)', content):
|
|
path = match.group(1).strip().rstrip('",}')
|
|
if path:
|
|
media_tags.append(f"MEDIA:{path}")
|
|
if "[[audio_as_voice]]" in content:
|
|
has_voice_directive = True
|
|
|
|
if media_tags:
|
|
# Deduplicate while preserving order
|
|
seen = set()
|
|
unique_tags = []
|
|
for tag in media_tags:
|
|
if tag not in seen:
|
|
seen.add(tag)
|
|
unique_tags.append(tag)
|
|
if has_voice_directive:
|
|
unique_tags.insert(0, "[[audio_as_voice]]")
|
|
final_response = final_response + "\n" + "\n".join(unique_tags)
|
|
|
|
return {
|
|
"final_response": final_response,
|
|
"messages": result_holder[0].get("messages", []) if result_holder[0] else [],
|
|
"api_calls": result_holder[0].get("api_calls", 0) if result_holder[0] else 0,
|
|
"tools": tools_holder[0] or [],
|
|
}
|
|
|
|
# Start progress message sender if enabled
|
|
progress_task = None
|
|
if tool_progress_enabled:
|
|
progress_task = asyncio.create_task(send_progress_messages())
|
|
|
|
# Track this agent as running for this session (for interrupt support)
|
|
# We do this in a callback after the agent is created
|
|
async def track_agent():
|
|
# Wait for agent to be created
|
|
while agent_holder[0] is None:
|
|
await asyncio.sleep(0.05)
|
|
if session_key:
|
|
self._running_agents[session_key] = agent_holder[0]
|
|
|
|
tracking_task = asyncio.create_task(track_agent())
|
|
|
|
# Monitor for interrupts from the adapter (new messages arriving)
|
|
async def monitor_for_interrupt():
|
|
adapter = self.adapters.get(source.platform)
|
|
if not adapter:
|
|
return
|
|
|
|
chat_id = source.chat_id
|
|
while True:
|
|
await asyncio.sleep(0.2) # Check every 200ms
|
|
# Check if adapter has a pending interrupt for this session
|
|
if hasattr(adapter, 'has_pending_interrupt') and adapter.has_pending_interrupt(chat_id):
|
|
agent = agent_holder[0]
|
|
if agent:
|
|
pending_event = adapter.get_pending_message(chat_id)
|
|
pending_text = pending_event.text if pending_event else None
|
|
print(f"[gateway] ⚡ Interrupt detected from adapter, signaling agent...")
|
|
agent.interrupt(pending_text)
|
|
break
|
|
|
|
interrupt_monitor = asyncio.create_task(monitor_for_interrupt())
|
|
|
|
try:
|
|
# Run in thread pool to not block
|
|
loop = asyncio.get_event_loop()
|
|
response = await loop.run_in_executor(None, run_sync)
|
|
|
|
# Check if we were interrupted and have a pending message
|
|
result = result_holder[0]
|
|
adapter = self.adapters.get(source.platform)
|
|
|
|
# Get pending message from adapter if interrupted
|
|
pending = None
|
|
if result and result.get("interrupted") and adapter:
|
|
pending_event = adapter.get_pending_message(source.chat_id)
|
|
if pending_event:
|
|
pending = pending_event.text
|
|
elif result.get("interrupt_message"):
|
|
pending = result.get("interrupt_message")
|
|
|
|
if pending:
|
|
print(f"[gateway] 📨 Processing interrupted message: '{pending[:40]}...'")
|
|
|
|
# Clear the adapter's interrupt event so the next _run_agent call
|
|
# doesn't immediately re-trigger the interrupt before the new agent
|
|
# even makes its first API call (this was causing an infinite loop).
|
|
if adapter and hasattr(adapter, '_active_sessions') and source.chat_id in adapter._active_sessions:
|
|
adapter._active_sessions[source.chat_id].clear()
|
|
|
|
# Don't send the interrupted response to the user — it's just noise
|
|
# like "Operation interrupted." They already know they sent a new
|
|
# message, so go straight to processing it.
|
|
|
|
# Now process the pending message with updated history
|
|
updated_history = result.get("messages", history)
|
|
return await self._run_agent(
|
|
message=pending,
|
|
context_prompt=context_prompt,
|
|
history=updated_history,
|
|
source=source,
|
|
session_id=session_id,
|
|
session_key=session_key
|
|
)
|
|
finally:
|
|
# Stop progress sender and interrupt monitor
|
|
if progress_task:
|
|
progress_task.cancel()
|
|
interrupt_monitor.cancel()
|
|
|
|
# Clean up tracking
|
|
tracking_task.cancel()
|
|
if session_key and session_key in self._running_agents:
|
|
del self._running_agents[session_key]
|
|
|
|
# Wait for cancelled tasks
|
|
for task in [progress_task, interrupt_monitor, tracking_task]:
|
|
if task:
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
return response
|
|
|
|
|
|
async def start_gateway(config: Optional[GatewayConfig] = None) -> bool:
|
|
"""
|
|
Start the gateway and run until interrupted.
|
|
|
|
This is the main entry point for running the gateway.
|
|
Returns True if the gateway ran successfully, False if it failed to start.
|
|
A False return causes a non-zero exit code so systemd can auto-restart.
|
|
"""
|
|
runner = GatewayRunner(config)
|
|
|
|
# Set up signal handlers
|
|
def signal_handler():
|
|
asyncio.create_task(runner.stop())
|
|
|
|
loop = asyncio.get_event_loop()
|
|
for sig in (signal.SIGINT, signal.SIGTERM):
|
|
try:
|
|
loop.add_signal_handler(sig, signal_handler)
|
|
except NotImplementedError:
|
|
# Windows doesn't support add_signal_handler
|
|
pass
|
|
|
|
# Start the gateway
|
|
success = await runner.start()
|
|
if not success:
|
|
return False
|
|
|
|
# Wait for shutdown
|
|
await runner.wait_for_shutdown()
|
|
return True
|
|
|
|
|
|
def main():
|
|
"""CLI entry point for the gateway."""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description="Hermes Gateway - Multi-platform messaging")
|
|
parser.add_argument("--config", "-c", help="Path to gateway config file")
|
|
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
|
|
|
|
args = parser.parse_args()
|
|
|
|
config = None
|
|
if args.config:
|
|
import json
|
|
with open(args.config) as f:
|
|
data = json.load(f)
|
|
config = GatewayConfig.from_dict(data)
|
|
|
|
# Run the gateway - exit with code 1 if no platforms connected,
|
|
# so systemd Restart=on-failure will retry on transient errors (e.g. DNS)
|
|
success = asyncio.run(start_gateway(config))
|
|
if not success:
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|