Adds Feishu (ByteDance's enterprise messaging platform) as a gateway platform adapter with full feature parity: WebSocket + webhook transports, message batching, dedup, rate limiting, rich post/card content parsing, media handling (images/audio/files/video), group @mention gating, reaction routing, and interactive card button support. Cherry-picked from PR #1793 by penwyp with: - Moved to current main (PR was 458 commits behind) - Fixed _send_with_retry shadowing BasePlatformAdapter method (renamed to _feishu_send_with_retry to avoid signature mismatch crash) - Fixed import structure: aiohttp/websockets imported independently of lark_oapi so they remain available when SDK is missing - Fixed get_hermes_home import (hermes_constants, not hermes_cli.config) - Added skip decorators for tests requiring lark_oapi SDK - All 16 integration points added surgically to current main New dependency: lark-oapi>=1.5.3,<2 (optional, pip install hermes-agent[feishu]) Fixes #1788 Co-authored-by: penwyp <penwyp@users.noreply.github.com>
1602 lines
64 KiB
Python
1602 lines
64 KiB
Python
"""
|
|
Unified tool configuration for Hermes Agent.
|
|
|
|
`hermes tools` and `hermes setup tools` both enter this module.
|
|
Select a platform → toggle toolsets on/off → for newly enabled tools
|
|
that need API keys, run through provider-aware configuration.
|
|
|
|
Saves per-platform tool configuration to ~/.hermes/config.yaml under
|
|
the `platform_toolsets` key.
|
|
"""
|
|
|
|
import json as _json
|
|
import logging
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Set
|
|
|
|
|
|
from hermes_cli.config import (
|
|
load_config, save_config, get_env_value, save_env_value,
|
|
)
|
|
from hermes_cli.colors import Colors, color
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
PROJECT_ROOT = Path(__file__).parent.parent.resolve()
|
|
|
|
|
|
# ─── UI Helpers (shared with setup.py) ────────────────────────────────────────
|
|
|
|
def _print_info(text: str):
|
|
print(color(f" {text}", Colors.DIM))
|
|
|
|
def _print_success(text: str):
|
|
print(color(f"✓ {text}", Colors.GREEN))
|
|
|
|
def _print_warning(text: str):
|
|
print(color(f"⚠ {text}", Colors.YELLOW))
|
|
|
|
def _print_error(text: str):
|
|
print(color(f"✗ {text}", Colors.RED))
|
|
|
|
def _prompt(question: str, default: str = None, password: bool = False) -> str:
|
|
if default:
|
|
display = f"{question} [{default}]: "
|
|
else:
|
|
display = f"{question}: "
|
|
try:
|
|
if password:
|
|
import getpass
|
|
value = getpass.getpass(color(display, Colors.YELLOW))
|
|
else:
|
|
value = input(color(display, Colors.YELLOW))
|
|
return value.strip() or default or ""
|
|
except (KeyboardInterrupt, EOFError):
|
|
print()
|
|
return default or ""
|
|
|
|
def _prompt_yes_no(question: str, default: bool = True) -> bool:
|
|
default_str = "Y/n" if default else "y/N"
|
|
while True:
|
|
try:
|
|
value = input(color(f"{question} [{default_str}]: ", Colors.YELLOW)).strip().lower()
|
|
except (KeyboardInterrupt, EOFError):
|
|
print()
|
|
return default
|
|
if not value:
|
|
return default
|
|
if value in ('y', 'yes'):
|
|
return True
|
|
if value in ('n', 'no'):
|
|
return False
|
|
|
|
|
|
# ─── Toolset Registry ─────────────────────────────────────────────────────────
|
|
|
|
# Toolsets shown in the configurator, grouped for display.
|
|
# Each entry: (toolset_name, label, description)
|
|
# These map to keys in toolsets.py TOOLSETS dict.
|
|
CONFIGURABLE_TOOLSETS = [
|
|
("web", "🔍 Web Search & Scraping", "web_search, web_extract"),
|
|
("browser", "🌐 Browser Automation", "navigate, click, type, scroll"),
|
|
("terminal", "💻 Terminal & Processes", "terminal, process"),
|
|
("file", "📁 File Operations", "read, write, patch, search"),
|
|
("code_execution", "⚡ Code Execution", "execute_code"),
|
|
("vision", "👁️ Vision / Image Analysis", "vision_analyze"),
|
|
("image_gen", "🎨 Image Generation", "image_generate"),
|
|
("moa", "🧠 Mixture of Agents", "mixture_of_agents"),
|
|
("tts", "🔊 Text-to-Speech", "text_to_speech"),
|
|
("skills", "📚 Skills", "list, view, manage"),
|
|
("todo", "📋 Task Planning", "todo"),
|
|
("memory", "💾 Memory", "persistent memory across sessions"),
|
|
("session_search", "🔎 Session Search", "search past conversations"),
|
|
("clarify", "❓ Clarifying Questions", "clarify"),
|
|
("delegation", "👥 Task Delegation", "delegate_task"),
|
|
("cronjob", "⏰ Cron Jobs", "create/list/update/pause/resume/run, with optional attached skills"),
|
|
("rl", "🧪 RL Training", "Tinker-Atropos training tools"),
|
|
("homeassistant", "🏠 Home Assistant", "smart home device control"),
|
|
]
|
|
|
|
# Toolsets that are OFF by default for new installs.
|
|
# They're still in _HERMES_CORE_TOOLS (available at runtime if enabled),
|
|
# but the setup checklist won't pre-select them for first-time users.
|
|
_DEFAULT_OFF_TOOLSETS = {"moa", "homeassistant", "rl"}
|
|
|
|
|
|
def _get_effective_configurable_toolsets():
|
|
"""Return CONFIGURABLE_TOOLSETS + any plugin-provided toolsets.
|
|
|
|
Plugin toolsets are appended at the end so they appear after the
|
|
built-in toolsets in the TUI checklist.
|
|
"""
|
|
result = list(CONFIGURABLE_TOOLSETS)
|
|
try:
|
|
from hermes_cli.plugins import discover_plugins, get_plugin_toolsets
|
|
discover_plugins() # idempotent — ensures plugins are loaded
|
|
result.extend(get_plugin_toolsets())
|
|
except Exception:
|
|
pass
|
|
return result
|
|
|
|
|
|
def _get_plugin_toolset_keys() -> set:
|
|
"""Return the set of toolset keys provided by plugins."""
|
|
try:
|
|
from hermes_cli.plugins import discover_plugins, get_plugin_toolsets
|
|
discover_plugins() # idempotent — ensures plugins are loaded
|
|
return {ts_key for ts_key, _, _ in get_plugin_toolsets()}
|
|
except Exception:
|
|
return set()
|
|
|
|
# Platform display config
|
|
PLATFORMS = {
|
|
"cli": {"label": "🖥️ CLI", "default_toolset": "hermes-cli"},
|
|
"telegram": {"label": "📱 Telegram", "default_toolset": "hermes-telegram"},
|
|
"discord": {"label": "💬 Discord", "default_toolset": "hermes-discord"},
|
|
"slack": {"label": "💼 Slack", "default_toolset": "hermes-slack"},
|
|
"whatsapp": {"label": "📱 WhatsApp", "default_toolset": "hermes-whatsapp"},
|
|
"signal": {"label": "📡 Signal", "default_toolset": "hermes-signal"},
|
|
"homeassistant": {"label": "🏠 Home Assistant", "default_toolset": "hermes-homeassistant"},
|
|
"email": {"label": "📧 Email", "default_toolset": "hermes-email"},
|
|
"matrix": {"label": "💬 Matrix", "default_toolset": "hermes-matrix"},
|
|
"dingtalk": {"label": "💬 DingTalk", "default_toolset": "hermes-dingtalk"},
|
|
"feishu": {"label": "🪽 Feishu", "default_toolset": "hermes-feishu"},
|
|
"api_server": {"label": "🌐 API Server", "default_toolset": "hermes-api-server"},
|
|
"mattermost": {"label": "💬 Mattermost", "default_toolset": "hermes-mattermost"},
|
|
}
|
|
|
|
|
|
# ─── Tool Categories (provider-aware configuration) ──────────────────────────
|
|
# Maps toolset keys to their provider options. When a toolset is newly enabled,
|
|
# we use this to show provider selection and prompt for the right API keys.
|
|
# Toolsets not in this map either need no config or use the simple fallback.
|
|
|
|
TOOL_CATEGORIES = {
|
|
"tts": {
|
|
"name": "Text-to-Speech",
|
|
"icon": "🔊",
|
|
"providers": [
|
|
{
|
|
"name": "Microsoft Edge TTS",
|
|
"tag": "Free - no API key needed",
|
|
"env_vars": [],
|
|
"tts_provider": "edge",
|
|
},
|
|
{
|
|
"name": "OpenAI TTS",
|
|
"tag": "Premium - high quality voices",
|
|
"env_vars": [
|
|
{"key": "VOICE_TOOLS_OPENAI_KEY", "prompt": "OpenAI API key", "url": "https://platform.openai.com/api-keys"},
|
|
],
|
|
"tts_provider": "openai",
|
|
},
|
|
{
|
|
"name": "ElevenLabs",
|
|
"tag": "Premium - most natural voices",
|
|
"env_vars": [
|
|
{"key": "ELEVENLABS_API_KEY", "prompt": "ElevenLabs API key", "url": "https://elevenlabs.io/app/settings/api-keys"},
|
|
],
|
|
"tts_provider": "elevenlabs",
|
|
},
|
|
],
|
|
},
|
|
"web": {
|
|
"name": "Web Search & Extract",
|
|
"setup_title": "Select Search Provider",
|
|
"setup_note": "A free DuckDuckGo search skill is also included — skip this if you don't need a premium provider.",
|
|
"icon": "🔍",
|
|
"providers": [
|
|
{
|
|
"name": "Firecrawl Cloud",
|
|
"tag": "Hosted service - search, extract, and crawl",
|
|
"web_backend": "firecrawl",
|
|
"env_vars": [
|
|
{"key": "FIRECRAWL_API_KEY", "prompt": "Firecrawl API key", "url": "https://firecrawl.dev"},
|
|
],
|
|
},
|
|
{
|
|
"name": "Exa",
|
|
"tag": "AI-native search and contents",
|
|
"web_backend": "exa",
|
|
"env_vars": [
|
|
{"key": "EXA_API_KEY", "prompt": "Exa API key", "url": "https://exa.ai"},
|
|
],
|
|
},
|
|
{
|
|
"name": "Parallel",
|
|
"tag": "AI-native search and extract",
|
|
"web_backend": "parallel",
|
|
"env_vars": [
|
|
{"key": "PARALLEL_API_KEY", "prompt": "Parallel API key", "url": "https://parallel.ai"},
|
|
],
|
|
},
|
|
{
|
|
"name": "Tavily",
|
|
"tag": "AI-native search, extract, and crawl",
|
|
"web_backend": "tavily",
|
|
"env_vars": [
|
|
{"key": "TAVILY_API_KEY", "prompt": "Tavily API key", "url": "https://app.tavily.com/home"},
|
|
],
|
|
},
|
|
{
|
|
"name": "Firecrawl Self-Hosted",
|
|
"tag": "Free - run your own instance",
|
|
"web_backend": "firecrawl",
|
|
"env_vars": [
|
|
{"key": "FIRECRAWL_API_URL", "prompt": "Your Firecrawl instance URL (e.g., http://localhost:3002)"},
|
|
],
|
|
},
|
|
],
|
|
},
|
|
"image_gen": {
|
|
"name": "Image Generation",
|
|
"icon": "🎨",
|
|
"providers": [
|
|
{
|
|
"name": "FAL.ai",
|
|
"tag": "FLUX 2 Pro with auto-upscaling",
|
|
"env_vars": [
|
|
{"key": "FAL_KEY", "prompt": "FAL API key", "url": "https://fal.ai/dashboard/keys"},
|
|
],
|
|
},
|
|
],
|
|
},
|
|
"browser": {
|
|
"name": "Browser Automation",
|
|
"icon": "🌐",
|
|
"providers": [
|
|
{
|
|
"name": "Local Browser",
|
|
"tag": "Free headless Chromium (no API key needed)",
|
|
"env_vars": [],
|
|
"browser_provider": None,
|
|
"post_setup": "browserbase", # Same npm install for agent-browser
|
|
},
|
|
{
|
|
"name": "Browserbase",
|
|
"tag": "Cloud browser with stealth & proxies",
|
|
"env_vars": [
|
|
{"key": "BROWSERBASE_API_KEY", "prompt": "Browserbase API key", "url": "https://browserbase.com"},
|
|
{"key": "BROWSERBASE_PROJECT_ID", "prompt": "Browserbase project ID"},
|
|
],
|
|
"browser_provider": "browserbase",
|
|
"post_setup": "browserbase",
|
|
},
|
|
{
|
|
"name": "Browser Use",
|
|
"tag": "Cloud browser with remote execution",
|
|
"env_vars": [
|
|
{"key": "BROWSER_USE_API_KEY", "prompt": "Browser Use API key", "url": "https://browser-use.com"},
|
|
],
|
|
"browser_provider": "browser-use",
|
|
"post_setup": "browserbase",
|
|
},
|
|
],
|
|
},
|
|
"homeassistant": {
|
|
"name": "Smart Home",
|
|
"icon": "🏠",
|
|
"providers": [
|
|
{
|
|
"name": "Home Assistant",
|
|
"tag": "REST API integration",
|
|
"env_vars": [
|
|
{"key": "HASS_TOKEN", "prompt": "Home Assistant Long-Lived Access Token"},
|
|
{"key": "HASS_URL", "prompt": "Home Assistant URL", "default": "http://homeassistant.local:8123"},
|
|
],
|
|
},
|
|
],
|
|
},
|
|
"rl": {
|
|
"name": "RL Training",
|
|
"icon": "🧪",
|
|
"requires_python": (3, 11),
|
|
"providers": [
|
|
{
|
|
"name": "Tinker / Atropos",
|
|
"tag": "RL training platform",
|
|
"env_vars": [
|
|
{"key": "TINKER_API_KEY", "prompt": "Tinker API key", "url": "https://tinker-console.thinkingmachines.ai/keys"},
|
|
{"key": "WANDB_API_KEY", "prompt": "WandB API key", "url": "https://wandb.ai/authorize"},
|
|
],
|
|
"post_setup": "rl_training",
|
|
},
|
|
],
|
|
},
|
|
}
|
|
|
|
# Simple env-var requirements for toolsets NOT in TOOL_CATEGORIES.
|
|
# Used as a fallback for tools like vision/moa that just need an API key.
|
|
TOOLSET_ENV_REQUIREMENTS = {
|
|
"vision": [("OPENROUTER_API_KEY", "https://openrouter.ai/keys")],
|
|
"moa": [("OPENROUTER_API_KEY", "https://openrouter.ai/keys")],
|
|
}
|
|
|
|
|
|
# ─── Post-Setup Hooks ─────────────────────────────────────────────────────────
|
|
|
|
def _run_post_setup(post_setup_key: str):
|
|
"""Run post-setup hooks for tools that need extra installation steps."""
|
|
import shutil
|
|
if post_setup_key == "browserbase":
|
|
node_modules = PROJECT_ROOT / "node_modules" / "agent-browser"
|
|
if not node_modules.exists() and shutil.which("npm"):
|
|
_print_info(" Installing Node.js dependencies for browser tools...")
|
|
import subprocess
|
|
result = subprocess.run(
|
|
["npm", "install", "--silent"],
|
|
capture_output=True, text=True, cwd=str(PROJECT_ROOT)
|
|
)
|
|
if result.returncode == 0:
|
|
_print_success(" Node.js dependencies installed")
|
|
else:
|
|
from hermes_constants import display_hermes_home
|
|
_print_warning(f" npm install failed - run manually: cd {display_hermes_home()}/hermes-agent && npm install")
|
|
elif not node_modules.exists():
|
|
_print_warning(" Node.js not found - browser tools require: npm install (in hermes-agent directory)")
|
|
|
|
elif post_setup_key == "rl_training":
|
|
try:
|
|
__import__("tinker_atropos")
|
|
except ImportError:
|
|
tinker_dir = PROJECT_ROOT / "tinker-atropos"
|
|
if tinker_dir.exists() and (tinker_dir / "pyproject.toml").exists():
|
|
_print_info(" Installing tinker-atropos submodule...")
|
|
import subprocess
|
|
uv_bin = shutil.which("uv")
|
|
if uv_bin:
|
|
result = subprocess.run(
|
|
[uv_bin, "pip", "install", "--python", sys.executable, "-e", str(tinker_dir)],
|
|
capture_output=True, text=True
|
|
)
|
|
else:
|
|
result = subprocess.run(
|
|
[sys.executable, "-m", "pip", "install", "-e", str(tinker_dir)],
|
|
capture_output=True, text=True
|
|
)
|
|
if result.returncode == 0:
|
|
_print_success(" tinker-atropos installed")
|
|
else:
|
|
_print_warning(" tinker-atropos install failed - run manually:")
|
|
_print_info(' uv pip install -e "./tinker-atropos"')
|
|
else:
|
|
_print_warning(" tinker-atropos submodule not found - run:")
|
|
_print_info(" git submodule update --init --recursive")
|
|
_print_info(' uv pip install -e "./tinker-atropos"')
|
|
|
|
|
|
# ─── Platform / Toolset Helpers ───────────────────────────────────────────────
|
|
|
|
def _get_enabled_platforms() -> List[str]:
|
|
"""Return platform keys that are configured (have tokens or are CLI)."""
|
|
enabled = ["cli"]
|
|
if get_env_value("TELEGRAM_BOT_TOKEN"):
|
|
enabled.append("telegram")
|
|
if get_env_value("DISCORD_BOT_TOKEN"):
|
|
enabled.append("discord")
|
|
if get_env_value("SLACK_BOT_TOKEN"):
|
|
enabled.append("slack")
|
|
if get_env_value("WHATSAPP_ENABLED"):
|
|
enabled.append("whatsapp")
|
|
return enabled
|
|
|
|
|
|
def _platform_toolset_summary(config: dict, platforms: Optional[List[str]] = None) -> Dict[str, Set[str]]:
|
|
"""Return a summary of enabled toolsets per platform.
|
|
|
|
When ``platforms`` is None, this uses ``_get_enabled_platforms`` to
|
|
auto-detect platforms. Tests can pass an explicit list to avoid relying
|
|
on environment variables.
|
|
"""
|
|
if platforms is None:
|
|
platforms = _get_enabled_platforms()
|
|
|
|
summary: Dict[str, Set[str]] = {}
|
|
for pkey in platforms:
|
|
summary[pkey] = _get_platform_tools(config, pkey)
|
|
return summary
|
|
|
|
|
|
def _parse_enabled_flag(value, default: bool = True) -> bool:
|
|
"""Parse bool-like config values used by tool/platform settings."""
|
|
if value is None:
|
|
return default
|
|
if isinstance(value, bool):
|
|
return value
|
|
if isinstance(value, int):
|
|
return value != 0
|
|
if isinstance(value, str):
|
|
lowered = value.strip().lower()
|
|
if lowered in {"true", "1", "yes", "on"}:
|
|
return True
|
|
if lowered in {"false", "0", "no", "off"}:
|
|
return False
|
|
return default
|
|
|
|
|
|
def _get_platform_tools(
|
|
config: dict,
|
|
platform: str,
|
|
*,
|
|
include_default_mcp_servers: bool = True,
|
|
) -> Set[str]:
|
|
"""Resolve which individual toolset names are enabled for a platform."""
|
|
from toolsets import resolve_toolset
|
|
|
|
platform_toolsets = config.get("platform_toolsets", {})
|
|
toolset_names = platform_toolsets.get(platform)
|
|
|
|
if toolset_names is None or not isinstance(toolset_names, list):
|
|
default_ts = PLATFORMS[platform]["default_toolset"]
|
|
toolset_names = [default_ts]
|
|
|
|
configurable_keys = {ts_key for ts_key, _, _ in CONFIGURABLE_TOOLSETS}
|
|
|
|
# If the saved list contains any configurable keys directly, the user
|
|
# has explicitly configured this platform — use direct membership.
|
|
# This avoids the subset-inference bug where composite toolsets like
|
|
# "hermes-cli" (which include all _HERMES_CORE_TOOLS) cause disabled
|
|
# toolsets to re-appear as enabled.
|
|
has_explicit_config = any(ts in configurable_keys for ts in toolset_names)
|
|
|
|
if has_explicit_config:
|
|
enabled_toolsets = {ts for ts in toolset_names if ts in configurable_keys}
|
|
else:
|
|
# No explicit config — fall back to resolving composite toolset names
|
|
# (e.g. "hermes-cli") to individual tool names and reverse-mapping.
|
|
all_tool_names = set()
|
|
for ts_name in toolset_names:
|
|
all_tool_names.update(resolve_toolset(ts_name))
|
|
|
|
enabled_toolsets = set()
|
|
for ts_key, _, _ in CONFIGURABLE_TOOLSETS:
|
|
ts_tools = set(resolve_toolset(ts_key))
|
|
if ts_tools and ts_tools.issubset(all_tool_names):
|
|
enabled_toolsets.add(ts_key)
|
|
|
|
# Plugin toolsets: enabled by default unless explicitly disabled.
|
|
# A plugin toolset is "known" for a platform once `hermes tools`
|
|
# has been saved for that platform (tracked via known_plugin_toolsets).
|
|
# Unknown plugins default to enabled; known-but-absent = disabled.
|
|
plugin_ts_keys = _get_plugin_toolset_keys()
|
|
if plugin_ts_keys:
|
|
known_map = config.get("known_plugin_toolsets", {})
|
|
known_for_platform = set(known_map.get(platform, []))
|
|
for pts in plugin_ts_keys:
|
|
if pts in toolset_names:
|
|
# Explicitly listed in config — enabled
|
|
enabled_toolsets.add(pts)
|
|
elif pts not in known_for_platform:
|
|
# New plugin not yet seen by hermes tools — default enabled
|
|
enabled_toolsets.add(pts)
|
|
# else: known but not in config = user disabled it
|
|
|
|
# Preserve any explicit non-configurable toolset entries (for example,
|
|
# custom toolsets or MCP server names saved in platform_toolsets).
|
|
platform_default_keys = {p["default_toolset"] for p in PLATFORMS.values()}
|
|
explicit_passthrough = {
|
|
ts
|
|
for ts in toolset_names
|
|
if ts not in configurable_keys
|
|
and ts not in plugin_ts_keys
|
|
and ts not in platform_default_keys
|
|
}
|
|
|
|
# MCP servers are expected to be available on all platforms by default.
|
|
# If the platform explicitly lists one or more MCP server names, treat that
|
|
# as an allowlist. Otherwise include every globally enabled MCP server.
|
|
mcp_servers = config.get("mcp_servers", {})
|
|
enabled_mcp_servers = {
|
|
name
|
|
for name, server_cfg in mcp_servers.items()
|
|
if isinstance(server_cfg, dict)
|
|
and _parse_enabled_flag(server_cfg.get("enabled", True), default=True)
|
|
}
|
|
explicit_mcp_servers = explicit_passthrough & enabled_mcp_servers
|
|
enabled_toolsets.update(explicit_passthrough - enabled_mcp_servers)
|
|
if include_default_mcp_servers:
|
|
if explicit_mcp_servers:
|
|
enabled_toolsets.update(explicit_mcp_servers)
|
|
else:
|
|
enabled_toolsets.update(enabled_mcp_servers)
|
|
else:
|
|
enabled_toolsets.update(explicit_mcp_servers)
|
|
|
|
return enabled_toolsets
|
|
|
|
|
|
def _save_platform_tools(config: dict, platform: str, enabled_toolset_keys: Set[str]):
|
|
"""Save the selected toolset keys for a platform to config.
|
|
|
|
Preserves any non-configurable toolset entries (like MCP server names)
|
|
that were already in the config for this platform.
|
|
"""
|
|
config.setdefault("platform_toolsets", {})
|
|
|
|
# Get the set of all configurable toolset keys (built-in + plugin)
|
|
configurable_keys = {ts_key for ts_key, _, _ in CONFIGURABLE_TOOLSETS}
|
|
plugin_keys = _get_plugin_toolset_keys()
|
|
configurable_keys |= plugin_keys
|
|
|
|
# Also exclude platform default toolsets (hermes-cli, hermes-telegram, etc.)
|
|
# These are "super" toolsets that resolve to ALL tools, so preserving them
|
|
# would silently override the user's unchecked selections on the next read.
|
|
platform_default_keys = {p["default_toolset"] for p in PLATFORMS.values()}
|
|
|
|
# Get existing toolsets for this platform
|
|
existing_toolsets = config.get("platform_toolsets", {}).get(platform, [])
|
|
if not isinstance(existing_toolsets, list):
|
|
existing_toolsets = []
|
|
|
|
# Preserve any entries that are NOT configurable toolsets and NOT platform
|
|
# defaults (i.e. only MCP server names should be preserved)
|
|
preserved_entries = {
|
|
entry for entry in existing_toolsets
|
|
if entry not in configurable_keys and entry not in platform_default_keys
|
|
}
|
|
|
|
# Merge preserved entries with new enabled toolsets
|
|
config["platform_toolsets"][platform] = sorted(enabled_toolset_keys | preserved_entries)
|
|
|
|
# Track which plugin toolsets are "known" for this platform so we can
|
|
# distinguish "new plugin, default enabled" from "user disabled it".
|
|
if plugin_keys:
|
|
config.setdefault("known_plugin_toolsets", {})
|
|
config["known_plugin_toolsets"][platform] = sorted(plugin_keys)
|
|
|
|
save_config(config)
|
|
|
|
|
|
def _toolset_has_keys(ts_key: str) -> bool:
|
|
"""Check if a toolset's required API keys are configured."""
|
|
if ts_key == "vision":
|
|
try:
|
|
from agent.auxiliary_client import resolve_vision_provider_client
|
|
|
|
_provider, client, _model = resolve_vision_provider_client()
|
|
return client is not None
|
|
except Exception:
|
|
return False
|
|
|
|
# Check TOOL_CATEGORIES first (provider-aware)
|
|
cat = TOOL_CATEGORIES.get(ts_key)
|
|
if cat:
|
|
for provider in cat.get("providers", []):
|
|
env_vars = provider.get("env_vars", [])
|
|
if env_vars and all(get_env_value(e["key"]) for e in env_vars):
|
|
return True
|
|
return False
|
|
|
|
# Fallback to simple requirements
|
|
requirements = TOOLSET_ENV_REQUIREMENTS.get(ts_key, [])
|
|
if not requirements:
|
|
return True
|
|
return all(get_env_value(var) for var, _ in requirements)
|
|
|
|
|
|
# ─── Menu Helpers ─────────────────────────────────────────────────────────────
|
|
|
|
def _prompt_choice(question: str, choices: list, default: int = 0) -> int:
|
|
"""Single-select menu (arrow keys). Uses curses to avoid simple_term_menu
|
|
rendering bugs in tmux, iTerm, and other non-standard terminals."""
|
|
|
|
# Curses-based single-select — works in tmux, iTerm, and standard terminals
|
|
try:
|
|
import curses
|
|
result_holder = [default]
|
|
|
|
def _curses_menu(stdscr):
|
|
curses.curs_set(0)
|
|
if curses.has_colors():
|
|
curses.start_color()
|
|
curses.use_default_colors()
|
|
curses.init_pair(1, curses.COLOR_GREEN, -1)
|
|
curses.init_pair(2, curses.COLOR_YELLOW, -1)
|
|
cursor = default
|
|
|
|
while True:
|
|
stdscr.clear()
|
|
max_y, max_x = stdscr.getmaxyx()
|
|
try:
|
|
stdscr.addnstr(0, 0, question, max_x - 1,
|
|
curses.A_BOLD | (curses.color_pair(2) if curses.has_colors() else 0))
|
|
except curses.error:
|
|
pass
|
|
|
|
for i, c in enumerate(choices):
|
|
y = i + 2
|
|
if y >= max_y - 1:
|
|
break
|
|
arrow = "→" if i == cursor else " "
|
|
line = f" {arrow} {c}"
|
|
attr = curses.A_NORMAL
|
|
if i == cursor:
|
|
attr = curses.A_BOLD
|
|
if curses.has_colors():
|
|
attr |= curses.color_pair(1)
|
|
try:
|
|
stdscr.addnstr(y, 0, line, max_x - 1, attr)
|
|
except curses.error:
|
|
pass
|
|
|
|
stdscr.refresh()
|
|
key = stdscr.getch()
|
|
|
|
if key in (curses.KEY_UP, ord('k')):
|
|
cursor = (cursor - 1) % len(choices)
|
|
elif key in (curses.KEY_DOWN, ord('j')):
|
|
cursor = (cursor + 1) % len(choices)
|
|
elif key in (curses.KEY_ENTER, 10, 13):
|
|
result_holder[0] = cursor
|
|
return
|
|
elif key in (27, ord('q')):
|
|
return
|
|
|
|
curses.wrapper(_curses_menu)
|
|
return result_holder[0]
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
# Fallback: numbered input (Windows without curses, etc.)
|
|
print(color(question, Colors.YELLOW))
|
|
for i, c in enumerate(choices):
|
|
marker = "●" if i == default else "○"
|
|
style = Colors.GREEN if i == default else ""
|
|
print(color(f" {marker} {i+1}. {c}", style) if style else f" {marker} {i+1}. {c}")
|
|
while True:
|
|
try:
|
|
val = input(color(f" Select [1-{len(choices)}] ({default + 1}): ", Colors.DIM))
|
|
if not val:
|
|
return default
|
|
idx = int(val) - 1
|
|
if 0 <= idx < len(choices):
|
|
return idx
|
|
except (ValueError, KeyboardInterrupt, EOFError):
|
|
print()
|
|
return default
|
|
|
|
|
|
# ─── Token Estimation ────────────────────────────────────────────────────────
|
|
|
|
# Module-level cache so discovery + tokenization runs at most once per process.
|
|
_tool_token_cache: Optional[Dict[str, int]] = None
|
|
|
|
|
|
def _estimate_tool_tokens() -> Dict[str, int]:
|
|
"""Return estimated token counts per individual tool name.
|
|
|
|
Uses tiktoken (cl100k_base) to count tokens in the JSON-serialised
|
|
OpenAI-format tool schema. Triggers tool discovery on first call,
|
|
then caches the result for the rest of the process.
|
|
|
|
Returns an empty dict when tiktoken or the registry is unavailable.
|
|
"""
|
|
global _tool_token_cache
|
|
if _tool_token_cache is not None:
|
|
return _tool_token_cache
|
|
|
|
try:
|
|
import tiktoken
|
|
enc = tiktoken.get_encoding("cl100k_base")
|
|
except Exception:
|
|
logger.debug("tiktoken unavailable; skipping tool token estimation")
|
|
_tool_token_cache = {}
|
|
return _tool_token_cache
|
|
|
|
try:
|
|
# Trigger full tool discovery (imports all tool modules).
|
|
import model_tools # noqa: F401
|
|
from tools.registry import registry
|
|
except Exception:
|
|
logger.debug("Tool registry unavailable; skipping token estimation")
|
|
_tool_token_cache = {}
|
|
return _tool_token_cache
|
|
|
|
counts: Dict[str, int] = {}
|
|
for name in registry.get_all_tool_names():
|
|
schema = registry.get_schema(name)
|
|
if schema:
|
|
# Mirror what gets sent to the API:
|
|
# {"type": "function", "function": <schema>}
|
|
text = _json.dumps({"type": "function", "function": schema})
|
|
counts[name] = len(enc.encode(text))
|
|
_tool_token_cache = counts
|
|
return _tool_token_cache
|
|
|
|
|
|
def _prompt_toolset_checklist(platform_label: str, enabled: Set[str]) -> Set[str]:
|
|
"""Multi-select checklist of toolsets. Returns set of selected toolset keys."""
|
|
from hermes_cli.curses_ui import curses_checklist
|
|
from toolsets import resolve_toolset
|
|
|
|
# Pre-compute per-tool token counts (cached after first call).
|
|
tool_tokens = _estimate_tool_tokens()
|
|
|
|
effective = _get_effective_configurable_toolsets()
|
|
|
|
labels = []
|
|
for ts_key, ts_label, ts_desc in effective:
|
|
suffix = ""
|
|
if not _toolset_has_keys(ts_key) and (TOOL_CATEGORIES.get(ts_key) or TOOLSET_ENV_REQUIREMENTS.get(ts_key)):
|
|
suffix = " [no API key]"
|
|
labels.append(f"{ts_label} ({ts_desc}){suffix}")
|
|
|
|
pre_selected = {
|
|
i for i, (ts_key, _, _) in enumerate(effective)
|
|
if ts_key in enabled
|
|
}
|
|
|
|
# Build a live status function that shows deduplicated total token cost.
|
|
status_fn = None
|
|
if tool_tokens:
|
|
ts_keys = [ts_key for ts_key, _, _ in effective]
|
|
|
|
def status_fn(chosen: set) -> str:
|
|
# Collect unique tool names across all selected toolsets
|
|
all_tools: set = set()
|
|
for idx in chosen:
|
|
all_tools.update(resolve_toolset(ts_keys[idx]))
|
|
total = sum(tool_tokens.get(name, 0) for name in all_tools)
|
|
if total >= 1000:
|
|
return f"Est. tool context: ~{total / 1000:.1f}k tokens"
|
|
return f"Est. tool context: ~{total} tokens"
|
|
|
|
chosen = curses_checklist(
|
|
f"Tools for {platform_label}",
|
|
labels,
|
|
pre_selected,
|
|
cancel_returns=pre_selected,
|
|
status_fn=status_fn,
|
|
)
|
|
return {effective[i][0] for i in chosen}
|
|
|
|
|
|
# ─── Provider-Aware Configuration ────────────────────────────────────────────
|
|
|
|
def _configure_toolset(ts_key: str, config: dict):
|
|
"""Configure a toolset - provider selection + API keys.
|
|
|
|
Uses TOOL_CATEGORIES for provider-aware config, falls back to simple
|
|
env var prompts for toolsets not in TOOL_CATEGORIES.
|
|
"""
|
|
cat = TOOL_CATEGORIES.get(ts_key)
|
|
|
|
if cat:
|
|
_configure_tool_category(ts_key, cat, config)
|
|
else:
|
|
# Simple fallback for vision, moa, etc.
|
|
_configure_simple_requirements(ts_key)
|
|
|
|
|
|
def _configure_tool_category(ts_key: str, cat: dict, config: dict):
|
|
"""Configure a tool category with provider selection."""
|
|
icon = cat.get("icon", "")
|
|
name = cat["name"]
|
|
providers = cat["providers"]
|
|
|
|
# Check Python version requirement
|
|
if cat.get("requires_python"):
|
|
req = cat["requires_python"]
|
|
if sys.version_info < req:
|
|
print()
|
|
_print_error(f" {name} requires Python {req[0]}.{req[1]}+ (current: {sys.version_info.major}.{sys.version_info.minor})")
|
|
_print_info(" Upgrade Python and reinstall to enable this tool.")
|
|
return
|
|
|
|
if len(providers) == 1:
|
|
# Single provider - configure directly
|
|
provider = providers[0]
|
|
print()
|
|
print(color(f" --- {icon} {name} ({provider['name']}) ---", Colors.CYAN))
|
|
if provider.get("tag"):
|
|
_print_info(f" {provider['tag']}")
|
|
# For single-provider tools, show a note if available
|
|
if cat.get("setup_note"):
|
|
_print_info(f" {cat['setup_note']}")
|
|
_configure_provider(provider, config)
|
|
else:
|
|
# Multiple providers - let user choose
|
|
print()
|
|
# Use custom title if provided (e.g. "Select Search Provider")
|
|
title = cat.get("setup_title", "Choose a provider")
|
|
print(color(f" --- {icon} {name} - {title} ---", Colors.CYAN))
|
|
if cat.get("setup_note"):
|
|
_print_info(f" {cat['setup_note']}")
|
|
print()
|
|
|
|
# Plain text labels only (no ANSI codes in menu items)
|
|
provider_choices = []
|
|
for p in providers:
|
|
tag = f" ({p['tag']})" if p.get("tag") else ""
|
|
configured = ""
|
|
env_vars = p.get("env_vars", [])
|
|
if not env_vars or all(get_env_value(v["key"]) for v in env_vars):
|
|
if _is_provider_active(p, config):
|
|
configured = " [active]"
|
|
elif not env_vars:
|
|
configured = ""
|
|
else:
|
|
configured = " [configured]"
|
|
provider_choices.append(f"{p['name']}{tag}{configured}")
|
|
|
|
# Add skip option
|
|
provider_choices.append("Skip — keep defaults / configure later")
|
|
|
|
# Detect current provider as default
|
|
default_idx = _detect_active_provider_index(providers, config)
|
|
|
|
provider_idx = _prompt_choice(f" {title}:", provider_choices, default_idx)
|
|
|
|
# Skip selected
|
|
if provider_idx >= len(providers):
|
|
_print_info(f" Skipped {name}")
|
|
return
|
|
|
|
_configure_provider(providers[provider_idx], config)
|
|
|
|
|
|
def _is_provider_active(provider: dict, config: dict) -> bool:
|
|
"""Check if a provider entry matches the currently active config."""
|
|
if provider.get("tts_provider"):
|
|
return config.get("tts", {}).get("provider") == provider["tts_provider"]
|
|
if "browser_provider" in provider:
|
|
current = config.get("browser", {}).get("cloud_provider")
|
|
return provider["browser_provider"] == current
|
|
if provider.get("web_backend"):
|
|
current = config.get("web", {}).get("backend")
|
|
return current == provider["web_backend"]
|
|
return False
|
|
|
|
|
|
def _detect_active_provider_index(providers: list, config: dict) -> int:
|
|
"""Return the index of the currently active provider, or 0."""
|
|
for i, p in enumerate(providers):
|
|
if _is_provider_active(p, config):
|
|
return i
|
|
# Fallback: env vars present → likely configured
|
|
env_vars = p.get("env_vars", [])
|
|
if env_vars and all(get_env_value(v["key"]) for v in env_vars):
|
|
return i
|
|
return 0
|
|
|
|
|
|
def _configure_provider(provider: dict, config: dict):
|
|
"""Configure a single provider - prompt for API keys and set config."""
|
|
env_vars = provider.get("env_vars", [])
|
|
|
|
# Set TTS provider in config if applicable
|
|
if provider.get("tts_provider"):
|
|
config.setdefault("tts", {})["provider"] = provider["tts_provider"]
|
|
|
|
# Set browser cloud provider in config if applicable
|
|
if "browser_provider" in provider:
|
|
bp = provider["browser_provider"]
|
|
if bp:
|
|
config.setdefault("browser", {})["cloud_provider"] = bp
|
|
_print_success(f" Browser cloud provider set to: {bp}")
|
|
else:
|
|
config.get("browser", {}).pop("cloud_provider", None)
|
|
|
|
# Set web search backend in config if applicable
|
|
if provider.get("web_backend"):
|
|
config.setdefault("web", {})["backend"] = provider["web_backend"]
|
|
_print_success(f" Web backend set to: {provider['web_backend']}")
|
|
|
|
if not env_vars:
|
|
_print_success(f" {provider['name']} - no configuration needed!")
|
|
return
|
|
|
|
# Prompt for each required env var
|
|
all_configured = True
|
|
for var in env_vars:
|
|
existing = get_env_value(var["key"])
|
|
if existing:
|
|
_print_success(f" {var['key']}: already configured")
|
|
# Don't ask to update - this is a new enable flow.
|
|
# Reconfigure is handled separately.
|
|
else:
|
|
url = var.get("url", "")
|
|
if url:
|
|
_print_info(f" Get yours at: {url}")
|
|
|
|
default_val = var.get("default", "")
|
|
if default_val:
|
|
value = _prompt(f" {var.get('prompt', var['key'])}", default_val)
|
|
else:
|
|
value = _prompt(f" {var.get('prompt', var['key'])}", password=True)
|
|
|
|
if value:
|
|
save_env_value(var["key"], value)
|
|
_print_success(" Saved")
|
|
else:
|
|
_print_warning(" Skipped")
|
|
all_configured = False
|
|
|
|
# Run post-setup hooks if needed
|
|
if provider.get("post_setup") and all_configured:
|
|
_run_post_setup(provider["post_setup"])
|
|
|
|
if all_configured:
|
|
_print_success(f" {provider['name']} configured!")
|
|
|
|
|
|
def _configure_simple_requirements(ts_key: str):
|
|
"""Simple fallback for toolsets that just need env vars (no provider selection)."""
|
|
if ts_key == "vision":
|
|
if _toolset_has_keys("vision"):
|
|
return
|
|
print()
|
|
print(color(" Vision / Image Analysis requires a multimodal backend:", Colors.YELLOW))
|
|
choices = [
|
|
"OpenRouter — uses Gemini",
|
|
"OpenAI-compatible endpoint — base URL, API key, and vision model",
|
|
"Skip",
|
|
]
|
|
idx = _prompt_choice(" Configure vision backend", choices, 2)
|
|
if idx == 0:
|
|
_print_info(" Get key at: https://openrouter.ai/keys")
|
|
value = _prompt(" OPENROUTER_API_KEY", password=True)
|
|
if value and value.strip():
|
|
save_env_value("OPENROUTER_API_KEY", value.strip())
|
|
_print_success(" Saved")
|
|
else:
|
|
_print_warning(" Skipped")
|
|
elif idx == 1:
|
|
base_url = _prompt(" OPENAI_BASE_URL (blank for OpenAI)").strip() or "https://api.openai.com/v1"
|
|
key_label = " OPENAI_API_KEY" if "api.openai.com" in base_url.lower() else " API key"
|
|
api_key = _prompt(key_label, password=True)
|
|
if api_key and api_key.strip():
|
|
save_env_value("OPENAI_BASE_URL", base_url)
|
|
save_env_value("OPENAI_API_KEY", api_key.strip())
|
|
if "api.openai.com" in base_url.lower():
|
|
save_env_value("AUXILIARY_VISION_MODEL", "gpt-4o-mini")
|
|
_print_success(" Saved")
|
|
else:
|
|
_print_warning(" Skipped")
|
|
return
|
|
|
|
requirements = TOOLSET_ENV_REQUIREMENTS.get(ts_key, [])
|
|
if not requirements:
|
|
return
|
|
|
|
missing = [(var, url) for var, url in requirements if not get_env_value(var)]
|
|
if not missing:
|
|
return
|
|
|
|
ts_label = next((l for k, l, _ in _get_effective_configurable_toolsets() if k == ts_key), ts_key)
|
|
print()
|
|
print(color(f" {ts_label} requires configuration:", Colors.YELLOW))
|
|
|
|
for var, url in missing:
|
|
if url:
|
|
_print_info(f" Get key at: {url}")
|
|
value = _prompt(f" {var}", password=True)
|
|
if value and value.strip():
|
|
save_env_value(var, value.strip())
|
|
_print_success(" Saved")
|
|
else:
|
|
_print_warning(" Skipped")
|
|
|
|
|
|
def _reconfigure_tool(config: dict):
|
|
"""Let user reconfigure an existing tool's provider or API key."""
|
|
# Build list of configurable tools that are currently set up
|
|
configurable = []
|
|
for ts_key, ts_label, _ in _get_effective_configurable_toolsets():
|
|
cat = TOOL_CATEGORIES.get(ts_key)
|
|
reqs = TOOLSET_ENV_REQUIREMENTS.get(ts_key)
|
|
if cat or reqs:
|
|
if _toolset_has_keys(ts_key):
|
|
configurable.append((ts_key, ts_label))
|
|
|
|
if not configurable:
|
|
_print_info("No configured tools to reconfigure.")
|
|
return
|
|
|
|
choices = [label for _, label in configurable]
|
|
choices.append("Cancel")
|
|
|
|
idx = _prompt_choice(" Which tool would you like to reconfigure?", choices, len(choices) - 1)
|
|
|
|
if idx >= len(configurable):
|
|
return # Cancel
|
|
|
|
ts_key, ts_label = configurable[idx]
|
|
cat = TOOL_CATEGORIES.get(ts_key)
|
|
|
|
if cat:
|
|
_configure_tool_category_for_reconfig(ts_key, cat, config)
|
|
else:
|
|
_reconfigure_simple_requirements(ts_key)
|
|
|
|
save_config(config)
|
|
|
|
|
|
def _configure_tool_category_for_reconfig(ts_key: str, cat: dict, config: dict):
|
|
"""Reconfigure a tool category - provider selection + API key update."""
|
|
icon = cat.get("icon", "")
|
|
name = cat["name"]
|
|
providers = cat["providers"]
|
|
|
|
if len(providers) == 1:
|
|
provider = providers[0]
|
|
print()
|
|
print(color(f" --- {icon} {name} ({provider['name']}) ---", Colors.CYAN))
|
|
_reconfigure_provider(provider, config)
|
|
else:
|
|
print()
|
|
print(color(f" --- {icon} {name} - Choose a provider ---", Colors.CYAN))
|
|
print()
|
|
|
|
provider_choices = []
|
|
for p in providers:
|
|
tag = f" ({p['tag']})" if p.get("tag") else ""
|
|
configured = ""
|
|
env_vars = p.get("env_vars", [])
|
|
if not env_vars or all(get_env_value(v["key"]) for v in env_vars):
|
|
if _is_provider_active(p, config):
|
|
configured = " [active]"
|
|
elif not env_vars:
|
|
configured = ""
|
|
else:
|
|
configured = " [configured]"
|
|
provider_choices.append(f"{p['name']}{tag}{configured}")
|
|
|
|
default_idx = _detect_active_provider_index(providers, config)
|
|
|
|
provider_idx = _prompt_choice(" Select provider:", provider_choices, default_idx)
|
|
_reconfigure_provider(providers[provider_idx], config)
|
|
|
|
|
|
def _reconfigure_provider(provider: dict, config: dict):
|
|
"""Reconfigure a provider - update API keys."""
|
|
env_vars = provider.get("env_vars", [])
|
|
|
|
if provider.get("tts_provider"):
|
|
config.setdefault("tts", {})["provider"] = provider["tts_provider"]
|
|
_print_success(f" TTS provider set to: {provider['tts_provider']}")
|
|
|
|
if "browser_provider" in provider:
|
|
bp = provider["browser_provider"]
|
|
if bp:
|
|
config.setdefault("browser", {})["cloud_provider"] = bp
|
|
_print_success(f" Browser cloud provider set to: {bp}")
|
|
else:
|
|
config.get("browser", {}).pop("cloud_provider", None)
|
|
_print_success(" Browser set to local mode")
|
|
|
|
# Set web search backend in config if applicable
|
|
if provider.get("web_backend"):
|
|
config.setdefault("web", {})["backend"] = provider["web_backend"]
|
|
_print_success(f" Web backend set to: {provider['web_backend']}")
|
|
|
|
if not env_vars:
|
|
_print_success(f" {provider['name']} - no configuration needed!")
|
|
return
|
|
|
|
for var in env_vars:
|
|
existing = get_env_value(var["key"])
|
|
if existing:
|
|
_print_info(f" {var['key']}: configured ({existing[:8]}...)")
|
|
url = var.get("url", "")
|
|
if url:
|
|
_print_info(f" Get yours at: {url}")
|
|
default_val = var.get("default", "")
|
|
value = _prompt(f" {var.get('prompt', var['key'])} (Enter to keep current)", password=not default_val)
|
|
if value and value.strip():
|
|
save_env_value(var["key"], value.strip())
|
|
_print_success(" Updated")
|
|
else:
|
|
_print_info(" Kept current")
|
|
|
|
|
|
def _reconfigure_simple_requirements(ts_key: str):
|
|
"""Reconfigure simple env var requirements."""
|
|
requirements = TOOLSET_ENV_REQUIREMENTS.get(ts_key, [])
|
|
if not requirements:
|
|
return
|
|
|
|
ts_label = next((l for k, l, _ in _get_effective_configurable_toolsets() if k == ts_key), ts_key)
|
|
print()
|
|
print(color(f" {ts_label}:", Colors.CYAN))
|
|
|
|
for var, url in requirements:
|
|
existing = get_env_value(var)
|
|
if existing:
|
|
_print_info(f" {var}: configured ({existing[:8]}...)")
|
|
if url:
|
|
_print_info(f" Get key at: {url}")
|
|
value = _prompt(f" {var} (Enter to keep current)", password=True)
|
|
if value and value.strip():
|
|
save_env_value(var, value.strip())
|
|
_print_success(" Updated")
|
|
else:
|
|
_print_info(" Kept current")
|
|
|
|
|
|
# ─── Main Entry Point ─────────────────────────────────────────────────────────
|
|
|
|
def tools_command(args=None, first_install: bool = False, config: dict = None):
|
|
"""Entry point for `hermes tools` and `hermes setup tools`.
|
|
|
|
Args:
|
|
first_install: When True (set by the setup wizard on fresh installs),
|
|
skip the platform menu, go straight to the CLI checklist, and
|
|
prompt for API keys on all enabled tools that need them.
|
|
config: Optional config dict to use. When called from the setup
|
|
wizard, the wizard passes its own dict so that platform_toolsets
|
|
are written into it and survive the wizard's final save_config().
|
|
"""
|
|
if config is None:
|
|
config = load_config()
|
|
enabled_platforms = _get_enabled_platforms()
|
|
|
|
print()
|
|
|
|
# Non-interactive summary mode for CLI usage
|
|
if getattr(args, "summary", False):
|
|
total = len(_get_effective_configurable_toolsets())
|
|
print(color("⚕ Tool Summary", Colors.CYAN, Colors.BOLD))
|
|
print()
|
|
summary = _platform_toolset_summary(config, enabled_platforms)
|
|
for pkey in enabled_platforms:
|
|
pinfo = PLATFORMS[pkey]
|
|
enabled = summary.get(pkey, set())
|
|
count = len(enabled)
|
|
print(color(f" {pinfo['label']}", Colors.BOLD) + color(f" ({count}/{total})", Colors.DIM))
|
|
if enabled:
|
|
for ts_key in sorted(enabled):
|
|
label = next((l for k, l, _ in _get_effective_configurable_toolsets() if k == ts_key), ts_key)
|
|
print(color(f" ✓ {label}", Colors.GREEN))
|
|
else:
|
|
print(color(" (none enabled)", Colors.DIM))
|
|
print()
|
|
return
|
|
print(color("⚕ Hermes Tool Configuration", Colors.CYAN, Colors.BOLD))
|
|
print(color(" Enable or disable tools per platform.", Colors.DIM))
|
|
print(color(" Tools that need API keys will be configured when enabled.", Colors.DIM))
|
|
print()
|
|
|
|
# ── First-time install: linear flow, no platform menu ──
|
|
if first_install:
|
|
for pkey in enabled_platforms:
|
|
pinfo = PLATFORMS[pkey]
|
|
current_enabled = _get_platform_tools(config, pkey, include_default_mcp_servers=False)
|
|
|
|
# Uncheck toolsets that should be off by default
|
|
checklist_preselected = current_enabled - _DEFAULT_OFF_TOOLSETS
|
|
|
|
# Show checklist
|
|
new_enabled = _prompt_toolset_checklist(pinfo["label"], checklist_preselected)
|
|
|
|
added = new_enabled - current_enabled
|
|
removed = current_enabled - new_enabled
|
|
if added:
|
|
for ts in sorted(added):
|
|
label = next((l for k, l, _ in _get_effective_configurable_toolsets() if k == ts), ts)
|
|
print(color(f" + {label}", Colors.GREEN))
|
|
if removed:
|
|
for ts in sorted(removed):
|
|
label = next((l for k, l, _ in _get_effective_configurable_toolsets() if k == ts), ts)
|
|
print(color(f" - {label}", Colors.RED))
|
|
|
|
# Walk through ALL selected tools that have provider options or
|
|
# need API keys. This ensures browser (Local vs Browserbase),
|
|
# TTS (Edge vs OpenAI vs ElevenLabs), etc. are shown even when
|
|
# a free provider exists.
|
|
to_configure = [
|
|
ts_key for ts_key in sorted(new_enabled)
|
|
if TOOL_CATEGORIES.get(ts_key) or TOOLSET_ENV_REQUIREMENTS.get(ts_key)
|
|
]
|
|
|
|
if to_configure:
|
|
print()
|
|
print(color(f" Configuring {len(to_configure)} tool(s):", Colors.YELLOW))
|
|
for ts_key in to_configure:
|
|
label = next((l for k, l, _ in _get_effective_configurable_toolsets() if k == ts_key), ts_key)
|
|
print(color(f" • {label}", Colors.DIM))
|
|
print(color(" You can skip any tool you don't need right now.", Colors.DIM))
|
|
print()
|
|
for ts_key in to_configure:
|
|
_configure_toolset(ts_key, config)
|
|
|
|
_save_platform_tools(config, pkey, new_enabled)
|
|
save_config(config)
|
|
print(color(f" ✓ Saved {pinfo['label']} tool configuration", Colors.GREEN))
|
|
print()
|
|
|
|
return
|
|
|
|
# ── Returning user: platform menu loop ──
|
|
# Build platform choices
|
|
platform_choices = []
|
|
platform_keys = []
|
|
for pkey in enabled_platforms:
|
|
pinfo = PLATFORMS[pkey]
|
|
current = _get_platform_tools(config, pkey, include_default_mcp_servers=False)
|
|
count = len(current)
|
|
total = len(_get_effective_configurable_toolsets())
|
|
platform_choices.append(f"Configure {pinfo['label']} ({count}/{total} enabled)")
|
|
platform_keys.append(pkey)
|
|
|
|
if len(platform_keys) > 1:
|
|
platform_choices.append("Configure all platforms (global)")
|
|
platform_choices.append("Reconfigure an existing tool's provider or API key")
|
|
|
|
# Show MCP option if any MCP servers are configured
|
|
_has_mcp = bool(config.get("mcp_servers"))
|
|
if _has_mcp:
|
|
platform_choices.append("Configure MCP server tools")
|
|
|
|
platform_choices.append("Done")
|
|
|
|
# Index offsets for the extra options after per-platform entries
|
|
_global_idx = len(platform_keys) if len(platform_keys) > 1 else -1
|
|
_reconfig_idx = len(platform_keys) + (1 if len(platform_keys) > 1 else 0)
|
|
_mcp_idx = (_reconfig_idx + 1) if _has_mcp else -1
|
|
_done_idx = _reconfig_idx + (2 if _has_mcp else 1)
|
|
|
|
while True:
|
|
idx = _prompt_choice("Select an option:", platform_choices, default=0)
|
|
|
|
# "Done" selected
|
|
if idx == _done_idx:
|
|
break
|
|
|
|
# "Reconfigure" selected
|
|
if idx == _reconfig_idx:
|
|
_reconfigure_tool(config)
|
|
print()
|
|
continue
|
|
|
|
# "Configure MCP tools" selected
|
|
if idx == _mcp_idx:
|
|
_configure_mcp_tools_interactive(config)
|
|
print()
|
|
continue
|
|
|
|
# "Configure all platforms (global)" selected
|
|
if idx == _global_idx:
|
|
# Use the union of all platforms' current tools as the starting state
|
|
all_current = set()
|
|
for pk in platform_keys:
|
|
all_current |= _get_platform_tools(config, pk, include_default_mcp_servers=False)
|
|
new_enabled = _prompt_toolset_checklist("All platforms", all_current)
|
|
if new_enabled != all_current:
|
|
for pk in platform_keys:
|
|
prev = _get_platform_tools(config, pk, include_default_mcp_servers=False)
|
|
added = new_enabled - prev
|
|
removed = prev - new_enabled
|
|
pinfo_inner = PLATFORMS[pk]
|
|
if added or removed:
|
|
print(color(f" {pinfo_inner['label']}:", Colors.DIM))
|
|
for ts in sorted(added):
|
|
label = next((l for k, l, _ in _get_effective_configurable_toolsets() if k == ts), ts)
|
|
print(color(f" + {label}", Colors.GREEN))
|
|
for ts in sorted(removed):
|
|
label = next((l for k, l, _ in _get_effective_configurable_toolsets() if k == ts), ts)
|
|
print(color(f" - {label}", Colors.RED))
|
|
# Configure API keys for newly enabled tools
|
|
for ts_key in sorted(added):
|
|
if (TOOL_CATEGORIES.get(ts_key) or TOOLSET_ENV_REQUIREMENTS.get(ts_key)):
|
|
if not _toolset_has_keys(ts_key):
|
|
_configure_toolset(ts_key, config)
|
|
_save_platform_tools(config, pk, new_enabled)
|
|
save_config(config)
|
|
print(color(" ✓ Saved configuration for all platforms", Colors.GREEN))
|
|
# Update choice labels
|
|
for ci, pk in enumerate(platform_keys):
|
|
new_count = len(_get_platform_tools(config, pk, include_default_mcp_servers=False))
|
|
total = len(_get_effective_configurable_toolsets())
|
|
platform_choices[ci] = f"Configure {PLATFORMS[pk]['label']} ({new_count}/{total} enabled)"
|
|
else:
|
|
print(color(" No changes", Colors.DIM))
|
|
print()
|
|
continue
|
|
|
|
pkey = platform_keys[idx]
|
|
pinfo = PLATFORMS[pkey]
|
|
|
|
# Get current enabled toolsets for this platform
|
|
current_enabled = _get_platform_tools(config, pkey, include_default_mcp_servers=False)
|
|
|
|
# Show checklist
|
|
new_enabled = _prompt_toolset_checklist(pinfo["label"], current_enabled)
|
|
|
|
if new_enabled != current_enabled:
|
|
added = new_enabled - current_enabled
|
|
removed = current_enabled - new_enabled
|
|
|
|
if added:
|
|
for ts in sorted(added):
|
|
label = next((l for k, l, _ in _get_effective_configurable_toolsets() if k == ts), ts)
|
|
print(color(f" + {label}", Colors.GREEN))
|
|
if removed:
|
|
for ts in sorted(removed):
|
|
label = next((l for k, l, _ in _get_effective_configurable_toolsets() if k == ts), ts)
|
|
print(color(f" - {label}", Colors.RED))
|
|
|
|
# Configure newly enabled toolsets that need API keys
|
|
for ts_key in sorted(added):
|
|
if (TOOL_CATEGORIES.get(ts_key) or TOOLSET_ENV_REQUIREMENTS.get(ts_key)):
|
|
if not _toolset_has_keys(ts_key):
|
|
_configure_toolset(ts_key, config)
|
|
|
|
_save_platform_tools(config, pkey, new_enabled)
|
|
save_config(config)
|
|
print(color(f" ✓ Saved {pinfo['label']} configuration", Colors.GREEN))
|
|
else:
|
|
print(color(f" No changes to {pinfo['label']}", Colors.DIM))
|
|
|
|
print()
|
|
|
|
# Update the choice label with new count
|
|
new_count = len(_get_platform_tools(config, pkey, include_default_mcp_servers=False))
|
|
total = len(_get_effective_configurable_toolsets())
|
|
platform_choices[idx] = f"Configure {pinfo['label']} ({new_count}/{total} enabled)"
|
|
|
|
print()
|
|
from hermes_constants import display_hermes_home
|
|
print(color(f" Tool configuration saved to {display_hermes_home()}/config.yaml", Colors.DIM))
|
|
print(color(" Changes take effect on next 'hermes' or gateway restart.", Colors.DIM))
|
|
print()
|
|
|
|
|
|
# ─── MCP Tools Interactive Configuration ─────────────────────────────────────
|
|
|
|
|
|
def _configure_mcp_tools_interactive(config: dict):
|
|
"""Probe MCP servers for available tools and let user toggle them on/off.
|
|
|
|
Connects to each configured MCP server, discovers tools, then shows
|
|
a per-server curses checklist. Writes changes back as ``tools.exclude``
|
|
entries in config.yaml.
|
|
"""
|
|
from hermes_cli.curses_ui import curses_checklist
|
|
|
|
mcp_servers = config.get("mcp_servers") or {}
|
|
if not mcp_servers:
|
|
_print_info("No MCP servers configured.")
|
|
return
|
|
|
|
# Count enabled servers
|
|
enabled_names = [
|
|
k for k, v in mcp_servers.items()
|
|
if v.get("enabled", True) not in (False, "false", "0", "no", "off")
|
|
]
|
|
if not enabled_names:
|
|
_print_info("All MCP servers are disabled.")
|
|
return
|
|
|
|
print()
|
|
print(color(" Discovering tools from MCP servers...", Colors.YELLOW))
|
|
print(color(f" Connecting to {len(enabled_names)} server(s): {', '.join(enabled_names)}", Colors.DIM))
|
|
|
|
try:
|
|
from tools.mcp_tool import probe_mcp_server_tools
|
|
server_tools = probe_mcp_server_tools()
|
|
except Exception as exc:
|
|
_print_error(f"Failed to probe MCP servers: {exc}")
|
|
return
|
|
|
|
if not server_tools:
|
|
_print_warning("Could not discover tools from any MCP server.")
|
|
_print_info("Check that server commands/URLs are correct and dependencies are installed.")
|
|
return
|
|
|
|
# Report discovery results
|
|
failed = [n for n in enabled_names if n not in server_tools]
|
|
if failed:
|
|
for name in failed:
|
|
_print_warning(f" Could not connect to '{name}'")
|
|
|
|
total_tools = sum(len(tools) for tools in server_tools.values())
|
|
print(color(f" Found {total_tools} tool(s) across {len(server_tools)} server(s)", Colors.GREEN))
|
|
print()
|
|
|
|
any_changes = False
|
|
|
|
for server_name, tools in server_tools.items():
|
|
if not tools:
|
|
_print_info(f" {server_name}: no tools found")
|
|
continue
|
|
|
|
srv_cfg = mcp_servers.get(server_name, {})
|
|
tools_cfg = srv_cfg.get("tools") or {}
|
|
include_list = tools_cfg.get("include") or []
|
|
exclude_list = tools_cfg.get("exclude") or []
|
|
|
|
# Build checklist labels
|
|
labels = []
|
|
for tool_name, description in tools:
|
|
desc_short = description[:70] + "..." if len(description) > 70 else description
|
|
if desc_short:
|
|
labels.append(f"{tool_name} ({desc_short})")
|
|
else:
|
|
labels.append(tool_name)
|
|
|
|
# Determine which tools are currently enabled
|
|
pre_selected: Set[int] = set()
|
|
tool_names = [t[0] for t in tools]
|
|
for i, tool_name in enumerate(tool_names):
|
|
if include_list:
|
|
# Include mode: only included tools are selected
|
|
if tool_name in include_list:
|
|
pre_selected.add(i)
|
|
elif exclude_list:
|
|
# Exclude mode: everything except excluded
|
|
if tool_name not in exclude_list:
|
|
pre_selected.add(i)
|
|
else:
|
|
# No filter: all enabled
|
|
pre_selected.add(i)
|
|
|
|
chosen = curses_checklist(
|
|
f"MCP Server: {server_name} ({len(tools)} tools)",
|
|
labels,
|
|
pre_selected,
|
|
cancel_returns=pre_selected,
|
|
)
|
|
|
|
if chosen == pre_selected:
|
|
_print_info(f" {server_name}: no changes")
|
|
continue
|
|
|
|
# Compute new exclude list based on unchecked tools
|
|
new_exclude = [tool_names[i] for i in range(len(tool_names)) if i not in chosen]
|
|
|
|
# Update config
|
|
srv_cfg = mcp_servers.setdefault(server_name, {})
|
|
tools_cfg = srv_cfg.setdefault("tools", {})
|
|
|
|
if new_exclude:
|
|
tools_cfg["exclude"] = new_exclude
|
|
# Remove include if present — we're switching to exclude mode
|
|
tools_cfg.pop("include", None)
|
|
else:
|
|
# All tools enabled — clear filters
|
|
tools_cfg.pop("exclude", None)
|
|
tools_cfg.pop("include", None)
|
|
|
|
enabled_count = len(chosen)
|
|
disabled_count = len(tools) - enabled_count
|
|
_print_success(
|
|
f" {server_name}: {enabled_count} enabled, {disabled_count} disabled"
|
|
)
|
|
any_changes = True
|
|
|
|
if any_changes:
|
|
save_config(config)
|
|
print()
|
|
print(color(" ✓ MCP tool configuration saved", Colors.GREEN))
|
|
else:
|
|
print(color(" No changes to MCP tools", Colors.DIM))
|
|
|
|
|
|
# ─── Non-interactive disable/enable ──────────────────────────────────────────
|
|
|
|
|
|
def _apply_toolset_change(config: dict, platform: str, toolset_names: List[str], action: str):
|
|
"""Add or remove built-in toolsets for a platform."""
|
|
enabled = _get_platform_tools(config, platform, include_default_mcp_servers=False)
|
|
if action == "disable":
|
|
updated = enabled - set(toolset_names)
|
|
else:
|
|
updated = enabled | set(toolset_names)
|
|
_save_platform_tools(config, platform, updated)
|
|
|
|
|
|
def _apply_mcp_change(config: dict, targets: List[str], action: str) -> Set[str]:
|
|
"""Add or remove specific MCP tools from a server's exclude list.
|
|
|
|
Returns the set of server names that were not found in config.
|
|
"""
|
|
failed_servers: Set[str] = set()
|
|
mcp_servers = config.get("mcp_servers") or {}
|
|
|
|
for target in targets:
|
|
server_name, tool_name = target.split(":", 1)
|
|
if server_name not in mcp_servers:
|
|
failed_servers.add(server_name)
|
|
continue
|
|
tools_cfg = mcp_servers[server_name].setdefault("tools", {})
|
|
exclude = list(tools_cfg.get("exclude") or [])
|
|
if action == "disable":
|
|
if tool_name not in exclude:
|
|
exclude.append(tool_name)
|
|
else:
|
|
exclude = [t for t in exclude if t != tool_name]
|
|
tools_cfg["exclude"] = exclude
|
|
|
|
return failed_servers
|
|
|
|
|
|
def _print_tools_list(enabled_toolsets: set, mcp_servers: dict, platform: str = "cli"):
|
|
"""Print a summary of enabled/disabled toolsets and MCP tool filters."""
|
|
effective = _get_effective_configurable_toolsets()
|
|
builtin_keys = {ts_key for ts_key, _, _ in CONFIGURABLE_TOOLSETS}
|
|
|
|
print(f"Built-in toolsets ({platform}):")
|
|
for ts_key, label, _ in effective:
|
|
if ts_key not in builtin_keys:
|
|
continue
|
|
status = (color("✓ enabled", Colors.GREEN) if ts_key in enabled_toolsets
|
|
else color("✗ disabled", Colors.RED))
|
|
print(f" {status} {ts_key} {color(label, Colors.DIM)}")
|
|
|
|
# Plugin toolsets
|
|
plugin_entries = [(k, l) for k, l, _ in effective if k not in builtin_keys]
|
|
if plugin_entries:
|
|
print()
|
|
print(f"Plugin toolsets ({platform}):")
|
|
for ts_key, label in plugin_entries:
|
|
status = (color("✓ enabled", Colors.GREEN) if ts_key in enabled_toolsets
|
|
else color("✗ disabled", Colors.RED))
|
|
print(f" {status} {ts_key} {color(label, Colors.DIM)}")
|
|
|
|
if mcp_servers:
|
|
print()
|
|
print("MCP servers:")
|
|
for srv_name, srv_cfg in mcp_servers.items():
|
|
tools_cfg = srv_cfg.get("tools") or {}
|
|
exclude = tools_cfg.get("exclude") or []
|
|
include = tools_cfg.get("include") or []
|
|
if include:
|
|
_print_info(f"{srv_name} [include only: {', '.join(include)}]")
|
|
elif exclude:
|
|
_print_info(f"{srv_name} [excluded: {color(', '.join(exclude), Colors.YELLOW)}]")
|
|
else:
|
|
_print_info(f"{srv_name} {color('all tools enabled', Colors.DIM)}")
|
|
|
|
|
|
def tools_disable_enable_command(args):
|
|
"""Enable, disable, or list tools for a platform.
|
|
|
|
Built-in toolsets use plain names (e.g. ``web``, ``memory``).
|
|
MCP tools use ``server:tool`` notation (e.g. ``github:create_issue``).
|
|
"""
|
|
action = args.tools_action
|
|
platform = getattr(args, "platform", "cli")
|
|
config = load_config()
|
|
|
|
if platform not in PLATFORMS:
|
|
_print_error(f"Unknown platform '{platform}'. Valid: {', '.join(PLATFORMS)}")
|
|
return
|
|
|
|
if action == "list":
|
|
_print_tools_list(_get_platform_tools(config, platform, include_default_mcp_servers=False),
|
|
config.get("mcp_servers") or {}, platform)
|
|
return
|
|
|
|
targets: List[str] = args.names
|
|
toolset_targets = [t for t in targets if ":" not in t]
|
|
mcp_targets = [t for t in targets if ":" in t]
|
|
|
|
valid_toolsets = {ts_key for ts_key, _, _ in CONFIGURABLE_TOOLSETS} | _get_plugin_toolset_keys()
|
|
unknown_toolsets = [t for t in toolset_targets if t not in valid_toolsets]
|
|
if unknown_toolsets:
|
|
for name in unknown_toolsets:
|
|
_print_error(f"Unknown toolset '{name}'")
|
|
toolset_targets = [t for t in toolset_targets if t in valid_toolsets]
|
|
|
|
if toolset_targets:
|
|
_apply_toolset_change(config, platform, toolset_targets, action)
|
|
|
|
failed_servers: Set[str] = set()
|
|
if mcp_targets:
|
|
failed_servers = _apply_mcp_change(config, mcp_targets, action)
|
|
for srv in failed_servers:
|
|
_print_error(f"MCP server '{srv}' not found in config")
|
|
|
|
save_config(config)
|
|
|
|
successful = [
|
|
t for t in targets
|
|
if t not in unknown_toolsets and (":" not in t or t.split(":")[0] not in failed_servers)
|
|
]
|
|
if successful:
|
|
verb = "Disabled" if action == "disable" else "Enabled"
|
|
_print_success(f"{verb}: {', '.join(successful)}")
|