refactor: deduplicate toolsets, unify async bridging, fix approval race condition, harden security

- Replace 4 copy-pasted messaging platform toolsets with shared _HERMES_CORE_TOOLS list
- Consolidate 5 ad-hoc async-bridging patterns into single _run_async() in model_tools.py
  - Removes deprecated get_event_loop()/set_event_loop() calls
  - Makes all tool handlers self-protecting regardless of caller's event loop state
  - RL handler refactored from if/elif chain to dispatch dict
- Fix exec approval race condition: replace module-level globals with thread-safe
  per-session tools/approval.py (submit_pending, pop_pending, approve_session, is_approved)
  - Session A approving "rm" no longer approves it for all other sessions
- Fix config deep merge: user overriding tts.elevenlabs.voice_id no longer clobbers
  tts.elevenlabs.model_id; migration detection now recurses to arbitrary depth
- Gateway default-deny: unauthenticated users denied unless GATEWAY_ALLOW_ALL_USERS=true
- Add 10 dangerous command patterns: rm --recursive, bash -c, python -e, curl|bash,
  xargs rm, find -delete
- Sanitize gateway error messages: users see generic message, full traceback goes to logs
This commit is contained in:
teknium1
2026-02-21 18:28:49 -08:00
parent 7cb6427dea
commit 6134939882
10 changed files with 336 additions and 396 deletions

View File

@@ -164,6 +164,10 @@ HERMES_OPENAI_API_KEY=
# Slack allowed users (comma-separated Slack user IDs)
# SLACK_ALLOWED_USERS=
# Gateway-wide: allow ALL users without an allowlist (default: false = deny)
# Only set to true if you intentionally want open access.
# GATEWAY_ALLOW_ALL_USERS=false
# =============================================================================
# RESPONSE PACING
# =============================================================================

View File

@@ -244,11 +244,11 @@ This is intentional: CLI users are in a terminal and expect the agent to work in
### Security (User Allowlists):
**IMPORTANT**: Without an allowlist, anyone who finds your bot can use it!
**IMPORTANT**: By default, the gateway denies all users who are not in an allowlist or paired via DM.
The gateway checks `{PLATFORM}_ALLOWED_USERS` environment variables:
- If set: Only listed user IDs can interact with the bot
- If unset: All users are allowed (dangerous with terminal access!)
- If unset: All users are denied unless `GATEWAY_ALLOW_ALL_USERS=true` is set
Users can find their IDs:
- **Telegram**: Message [@userinfobot](https://t.me/userinfobot)

View File

@@ -250,15 +250,15 @@ Pairing codes expire after 1 hour, are rate-limited, and use cryptographic rando
### Security (Important!)
**Without an allowlist, anyone who finds your bot can use it!**
**By default, the gateway denies all users who are not in an allowlist or paired via DM.** This is the safe default for a bot with terminal access.
```bash
# Restrict to specific users (recommended):
TELEGRAM_ALLOWED_USERS=123456789,987654321
DISCORD_ALLOWED_USERS=123456789012345678
# Or allow all users in a specific platform:
# (Leave the variable unset - NOT recommended for bots with terminal access)
# Or explicitly allow all users (NOT recommended for bots with terminal access):
GATEWAY_ALLOW_ALL_USERS=true
```
### Working Directory
@@ -1346,6 +1346,7 @@ All variables go in `~/.hermes/.env`. Run `hermes config set VAR value` to set t
| `DISCORD_ALLOWED_USERS` | Comma-separated user IDs allowed to use bot |
| `DISCORD_HOME_CHANNEL` | Default channel for cron delivery |
| `MESSAGING_CWD` | Working directory for terminal in messaging (default: ~) |
| `GATEWAY_ALLOW_ALL_USERS` | Allow all users without allowlist (`true`/`false`, default: `false`) |
**Agent Behavior:**
| Variable | Description |

View File

@@ -700,13 +700,13 @@ if DISCORD_AVAILABLE:
await interaction.response.edit_message(embed=embed, view=self)
# Store the approval decision for the gateway to pick up
# Store the approval decision
try:
from tools.terminal_tool import _session_approved_patterns
from tools.approval import approve_permanent
if action == "allow_once":
pass # One-time approval handled by gateway
elif action == "allow_always":
_session_approved_patterns.add(self.approval_id)
approve_permanent(self.approval_id)
except ImportError:
pass

View File

@@ -113,6 +113,21 @@ class GatewayRunner:
logger.info("Starting Hermes Gateway...")
logger.info("Session storage: %s", self.config.sessions_dir)
# Warn if no user allowlists are configured and open access is not opted in
_any_allowlist = any(
os.getenv(v)
for v in ("TELEGRAM_ALLOWED_USERS", "DISCORD_ALLOWED_USERS",
"WHATSAPP_ALLOWED_USERS", "SLACK_ALLOWED_USERS",
"GATEWAY_ALLOWED_USERS")
)
_allow_all = os.getenv("GATEWAY_ALLOW_ALL_USERS", "").lower() in ("true", "1", "yes")
if not _any_allowlist and not _allow_all:
logger.warning(
"No user allowlists configured. All unauthorized users will be denied. "
"Set GATEWAY_ALLOW_ALL_USERS=true in ~/.hermes/.env to allow open access, "
"or configure platform allowlists (e.g., TELEGRAM_ALLOWED_USERS=your_id)."
)
# Discover and load event hooks
self.hooks.discover_and_load()
@@ -261,9 +276,11 @@ class GatewayRunner:
if self.pairing_store.is_approved(platform_name, user_id):
return True
# If no allowlists configured and no pairing approvals, allow all (backward compatible)
# If no allowlists configured: default-deny unless explicitly opted in.
# Set GATEWAY_ALLOW_ALL_USERS=true in ~/.hermes/.env for open access.
if not platform_allowlist and not global_allowlist:
return True
allow_all = os.getenv("GATEWAY_ALLOW_ALL_USERS", "").lower() in ("true", "1", "yes")
return allow_all
# Check if user is in any allowlist
allowed_ids = set()
@@ -353,9 +370,9 @@ class GatewayRunner:
cmd = approval["command"]
pattern_key = approval.get("pattern_key", "")
logger.info("User approved dangerous command: %s...", cmd[:60])
# Approve for session and re-run via terminal_tool with force=True
from tools.terminal_tool import terminal_tool, _session_approved_patterns
_session_approved_patterns.add(pattern_key)
from tools.terminal_tool import terminal_tool
from tools.approval import approve_session
approve_session(session_key_preview, pattern_key)
result = terminal_tool(command=cmd, force=True)
return f"✅ Command approved and executed.\n\n```\n{result[:3500]}\n```"
elif user_text in ("no", "n", "deny", "cancel", "nope"):
@@ -474,13 +491,11 @@ class GatewayRunner:
logger.error("Process watcher setup error: %s", e)
# Check if the agent encountered a dangerous command needing approval
# The terminal tool stores the last pending approval globally
try:
from tools.terminal_tool import _last_pending_approval
if _last_pending_approval:
self._pending_approvals[session_key] = _last_pending_approval.copy()
# Clear the global so it doesn't leak to other sessions
_last_pending_approval.clear()
from tools.approval import pop_pending
pending = pop_pending(session_key)
if pending:
self._pending_approvals[session_key] = pending
except Exception as e:
logger.debug("Failed to check pending approvals: %s", e)
@@ -538,8 +553,12 @@ class GatewayRunner:
return response
except Exception as e:
logger.error("Agent error: %s", e)
return f"Sorry, I encountered an error: {str(e)}"
logger.exception("Agent error in session %s", session_key)
return (
"Sorry, I encountered an unexpected error. "
"The details have been logged for debugging. "
"Try again or use /reset to start a fresh session."
)
finally:
# Clear session env
self._clear_session_env()

View File

@@ -286,6 +286,13 @@ OPTIONAL_ENV_VARS = {
"url": "https://github.com/settings/tokens",
"password": True,
},
"GATEWAY_ALLOW_ALL_USERS": {
"description": "Allow all users to interact with messaging bots (true/false). Default: false (deny unless allowlisted).",
"prompt": "Allow all users (true/false)",
"url": None,
"password": False,
"advanced": True,
},
}
@@ -311,35 +318,46 @@ def get_missing_env_vars(required_only: bool = False) -> List[Dict[str, Any]]:
return missing
def _set_nested(config: dict, dotted_key: str, value):
"""Set a value at an arbitrarily nested dotted key path.
Creates intermediate dicts as needed, e.g. ``_set_nested(c, "a.b.c", 1)``
ensures ``c["a"]["b"]["c"] == 1``.
"""
parts = dotted_key.split(".")
current = config
for part in parts[:-1]:
if part not in current or not isinstance(current.get(part), dict):
current[part] = {}
current = current[part]
current[parts[-1]] = value
def get_missing_config_fields() -> List[Dict[str, Any]]:
"""
Check which config fields are missing or outdated.
Check which config fields are missing or outdated (recursive).
Returns list of missing/outdated fields.
Walks the DEFAULT_CONFIG tree at arbitrary depth and reports any keys
present in defaults but absent from the user's loaded config.
"""
config = load_config()
missing = []
# Check for new top-level keys in DEFAULT_CONFIG
for key, default_value in DEFAULT_CONFIG.items():
if key.startswith('_'):
continue # Skip internal keys
if key not in config:
missing.append({
"key": key,
"default": default_value,
"description": f"New config section: {key}",
})
elif isinstance(default_value, dict):
# Check nested keys
for subkey, subvalue in default_value.items():
if subkey not in config.get(key, {}):
missing.append({
"key": f"{key}.{subkey}",
"default": subvalue,
"description": f"New config option: {key}.{subkey}",
})
def _check(defaults: dict, current: dict, prefix: str = ""):
for key, default_value in defaults.items():
if key.startswith('_'):
continue
full_key = key if not prefix else f"{prefix}.{key}"
if key not in current:
missing.append({
"key": full_key,
"default": default_value,
"description": f"New config option: {full_key}",
})
elif isinstance(default_value, dict) and isinstance(current.get(key), dict):
_check(default_value, current[key], full_key)
_check(DEFAULT_CONFIG, config)
return missing
@@ -450,16 +468,7 @@ def migrate_config(interactive: bool = True, quiet: bool = False) -> Dict[str, A
key = field["key"]
default = field["default"]
# Add with default value
if "." in key:
# Nested key
parent, child = key.split(".", 1)
if parent not in config:
config[parent] = {}
config[parent][child] = default
else:
config[key] = default
_set_nested(config, key, default)
results["config_added"].append(key)
if not quiet:
print(f" ✓ Added {key} = {default}")
@@ -476,12 +485,31 @@ def migrate_config(interactive: bool = True, quiet: bool = False) -> Dict[str, A
return results
def _deep_merge(base: dict, override: dict) -> dict:
"""Recursively merge *override* into *base*, preserving nested defaults.
Keys in *override* take precedence. If both values are dicts the merge
recurses, so a user who overrides only ``tts.elevenlabs.voice_id`` will
keep the default ``tts.elevenlabs.model_id`` intact.
"""
result = base.copy()
for key, value in override.items():
if (
key in result
and isinstance(result[key], dict)
and isinstance(value, dict)
):
result[key] = _deep_merge(result[key], value)
else:
result[key] = value
return result
def load_config() -> Dict[str, Any]:
"""Load configuration from ~/.hermes/config.yaml."""
import copy
config_path = get_config_path()
# Deep copy to avoid mutating DEFAULT_CONFIG
config = copy.deepcopy(DEFAULT_CONFIG)
if config_path.exists():
@@ -489,12 +517,7 @@ def load_config() -> Dict[str, Any]:
with open(config_path) as f:
user_config = yaml.safe_load(f) or {}
# Deep merge user values over defaults
for key, value in user_config.items():
if isinstance(value, dict) and key in config and isinstance(config[key], dict):
config[key].update(value)
else:
config[key] = value
config = _deep_merge(config, user_config)
except Exception as e:
print(f"Warning: Failed to load config: {e}")

View File

@@ -99,6 +99,36 @@ from tools.delegate_tool import delegate_task, check_delegate_requirements, DELE
from toolsets import resolve_toolset, validate_toolset
# =============================================================================
# Async Bridging
# =============================================================================
def _run_async(coro):
"""Run an async coroutine from a sync context.
If the current thread already has a running event loop (e.g., inside
the gateway's async stack or Atropos's event loop), we spin up a
disposable thread so asyncio.run() can create its own loop without
conflicting.
This is the single source of truth for sync->async bridging in tool
handlers. The RL paths (agent_loop.py, tool_context.py) also provide
outer thread-pool wrapping as defense-in-depth, but each handler is
self-protecting via this function.
"""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop and loop.is_running():
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(asyncio.run, coro)
return future.result(timeout=300)
return asyncio.run(coro)
# =============================================================================
# Tool Availability Checking
# =============================================================================
@@ -1515,21 +1545,8 @@ def handle_web_function_call(function_name: str, function_args: Dict[str, Any])
elif function_name == "web_extract":
urls = function_args.get("urls", [])
# Limit URLs to prevent abuse
urls = urls[:5] if isinstance(urls, list) else []
# Run async function -- use existing loop if available (Atropos),
# otherwise create one (normal CLI)
try:
loop = asyncio.get_running_loop()
# Already in an async context (Atropos) -- run in a thread
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
return pool.submit(
lambda: asyncio.run(web_extract_tool(urls, "markdown"))
).result(timeout=120)
except RuntimeError:
# No running loop (normal CLI) -- use asyncio.run directly
return asyncio.run(web_extract_tool(urls, "markdown"))
return _run_async(web_extract_tool(urls, "markdown"))
else:
return json.dumps({"error": f"Unknown web function: {function_name}"}, ensure_ascii=False)
@@ -1633,8 +1650,7 @@ def handle_vision_function_call(function_name: str, function_args: Dict[str, Any
full_prompt = f"Fully describe and explain everything about this image, then answer the following question:\n\n{question}"
# Run async function in event loop
return asyncio.run(vision_analyze_tool(image_url, full_prompt, "google/gemini-3-flash-preview"))
return _run_async(vision_analyze_tool(image_url, full_prompt, "google/gemini-3-flash-preview"))
else:
return json.dumps({"error": f"Unknown vision function: {function_name}"}, ensure_ascii=False)
@@ -1657,8 +1673,7 @@ def handle_moa_function_call(function_name: str, function_args: Dict[str, Any])
if not user_prompt:
return json.dumps({"error": "user_prompt is required for MoA processing"}, ensure_ascii=False)
# Run async function in event loop
return asyncio.run(mixture_of_agents_tool(user_prompt=user_prompt))
return _run_async(mixture_of_agents_tool(user_prompt=user_prompt))
else:
return json.dumps({"error": f"Unknown MoA function: {function_name}"}, ensure_ascii=False)
@@ -1683,39 +1698,16 @@ def handle_image_function_call(function_name: str, function_args: Dict[str, Any]
aspect_ratio = function_args.get("aspect_ratio", "landscape")
# Use fixed internal defaults for all other parameters (not exposed to model)
num_inference_steps = 50
guidance_scale = 4.5
num_images = 1
output_format = "png"
seed = None
# Run async function in event loop with proper handling for multiprocessing
try:
# Try to get existing event loop
loop = asyncio.get_event_loop()
if loop.is_closed():
# If closed, create a new one
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
# No event loop in current thread, create one
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Run the coroutine in the event loop
result = loop.run_until_complete(image_generate_tool(
return _run_async(image_generate_tool(
prompt=prompt,
aspect_ratio=aspect_ratio,
num_inference_steps=num_inference_steps,
guidance_scale=guidance_scale,
num_images=num_images,
output_format=output_format,
seed=seed
num_inference_steps=50,
guidance_scale=4.5,
num_images=1,
output_format="png",
seed=None,
))
return result
else:
return json.dumps({"error": f"Unknown image generation function: {function_name}"}, ensure_ascii=False)
@@ -1869,65 +1861,31 @@ def handle_rl_function_call(
Returns:
str: Function result as JSON string
"""
# Run async functions in event loop
import asyncio
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if function_name == "rl_list_environments":
return loop.run_until_complete(rl_list_environments())
elif function_name == "rl_select_environment":
return loop.run_until_complete(
rl_select_environment(name=function_args.get("name", ""))
)
elif function_name == "rl_get_current_config":
return loop.run_until_complete(rl_get_current_config())
elif function_name == "rl_edit_config":
return loop.run_until_complete(
rl_edit_config(
field=function_args.get("field", ""),
value=function_args.get("value")
)
)
elif function_name == "rl_start_training":
return loop.run_until_complete(rl_start_training())
elif function_name == "rl_check_status":
return loop.run_until_complete(
rl_check_status(run_id=function_args.get("run_id", ""))
)
elif function_name == "rl_stop_training":
return loop.run_until_complete(
rl_stop_training(run_id=function_args.get("run_id", ""))
)
elif function_name == "rl_get_results":
return loop.run_until_complete(
rl_get_results(run_id=function_args.get("run_id", ""))
)
elif function_name == "rl_list_runs":
return loop.run_until_complete(rl_list_runs())
elif function_name == "rl_test_inference":
return loop.run_until_complete(
rl_test_inference(
num_steps=function_args.get("num_steps", 3),
group_size=function_args.get("group_size", 16),
models=function_args.get("models"),
)
)
return json.dumps({"error": f"Unknown RL function: {function_name}"}, ensure_ascii=False)
rl_dispatch = {
"rl_list_environments": lambda: rl_list_environments(),
"rl_select_environment": lambda: rl_select_environment(
name=function_args.get("name", "")),
"rl_get_current_config": lambda: rl_get_current_config(),
"rl_edit_config": lambda: rl_edit_config(
field=function_args.get("field", ""),
value=function_args.get("value")),
"rl_start_training": lambda: rl_start_training(),
"rl_check_status": lambda: rl_check_status(
run_id=function_args.get("run_id", "")),
"rl_stop_training": lambda: rl_stop_training(
run_id=function_args.get("run_id", "")),
"rl_get_results": lambda: rl_get_results(
run_id=function_args.get("run_id", "")),
"rl_list_runs": lambda: rl_list_runs(),
"rl_test_inference": lambda: rl_test_inference(
num_steps=function_args.get("num_steps", 3),
group_size=function_args.get("group_size", 16),
models=function_args.get("models")),
}
handler = rl_dispatch.get(function_name)
if not handler:
return json.dumps({"error": f"Unknown RL function: {function_name}"}, ensure_ascii=False)
return _run_async(handler())
def handle_file_function_call(
@@ -2076,27 +2034,6 @@ def handle_send_message_function_call(function_name, function_args):
return json.dumps({"error": f"Send failed: {e}"})
def _run_async(coro):
"""Run an async coroutine from a sync context.
If the current thread already has a running event loop (e.g. inside
the gateway's async stack), we spin up a disposable thread so
asyncio.run() can create its own loop without conflicting.
"""
import asyncio
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop and loop.is_running():
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(asyncio.run, coro)
return future.result(timeout=30)
return asyncio.run(coro)
async def _send_to_platform(platform, pconfig, chat_id, message):
"""Route a message to the appropriate platform sender."""
from gateway.config import Platform

86
tools/approval.py Normal file
View File

@@ -0,0 +1,86 @@
"""Thread-safe per-session approval management for dangerous commands.
Replaces the module-level globals (_last_pending_approval, _session_approved_patterns)
that were previously in terminal_tool.py. Those globals were shared across all
concurrent gateway sessions, creating race conditions where one session's approval
could overwrite another's.
This module provides session-scoped state keyed by session_key, with proper locking.
"""
import threading
from typing import Optional
_lock = threading.Lock()
# Pending approval requests: session_key -> approval_dict
_pending: dict[str, dict] = {}
# Session-scoped approved patterns: session_key -> set of pattern_keys
_session_approved: dict[str, set] = {}
# Permanent allowlist (loaded from config, shared across sessions intentionally)
_permanent_approved: set = set()
def submit_pending(session_key: str, approval: dict):
"""Store a pending approval request for a session.
Called by _check_dangerous_command when a gateway session hits a
dangerous command. The gateway picks it up later via pop_pending().
"""
with _lock:
_pending[session_key] = approval
def pop_pending(session_key: str) -> Optional[dict]:
"""Retrieve and remove a pending approval for a session.
Returns the approval dict if one was pending, None otherwise.
Atomic: no other thread can read the same pending approval.
"""
with _lock:
return _pending.pop(session_key, None)
def has_pending(session_key: str) -> bool:
"""Check if a session has a pending approval request."""
with _lock:
return session_key in _pending
def approve_session(session_key: str, pattern_key: str):
"""Approve a dangerous command pattern for this session only.
The approval is scoped to the session -- other sessions are unaffected.
"""
with _lock:
_session_approved.setdefault(session_key, set()).add(pattern_key)
def is_approved(session_key: str, pattern_key: str) -> bool:
"""Check if a pattern is approved (session-scoped or permanent)."""
with _lock:
if pattern_key in _permanent_approved:
return True
return pattern_key in _session_approved.get(session_key, set())
def approve_permanent(pattern_key: str):
"""Add a pattern to the permanent (cross-session) allowlist."""
with _lock:
_permanent_approved.add(pattern_key)
def load_permanent(patterns: set):
"""Bulk-load permanent allowlist entries from config."""
with _lock:
_permanent_approved.update(patterns)
def clear_session(session_key: str):
"""Clear all approvals and pending requests for a session (e.g., on /reset)."""
with _lock:
_session_approved.pop(session_key, None)
_pending.pop(session_key, None)

View File

@@ -255,19 +255,17 @@ def set_approval_callback(cb):
# Dangerous Command Approval System
# =============================================================================
# Session-cached dangerous command approvals (pattern -> approved)
_session_approved_patterns: set = set()
# Last approval-required command (for gateway to pick up)
# Set by _check_dangerous_command when in ask mode, read by gateway
_last_pending_approval: dict = {}
from tools import approval as _approval
# Dangerous command patterns (regex, description)
DANGEROUS_PATTERNS = [
(r'\brm\s+(-[^\s]*\s+)*/', "delete in root path"),
(r'\brm\s+(-[^\s]*)?r', "recursive delete"),
(r'\brm\s+--recursive\b', "recursive delete (long flag)"),
(r'\bchmod\s+(-[^\s]*\s+)*777\b', "world-writable permissions"),
(r'\bchmod\s+--recursive\b.*777', "recursive world-writable (long flag)"),
(r'\bchown\s+(-[^\s]*)?R\s+root', "recursive chown to root"),
(r'\bchown\s+--recursive\b.*root', "recursive chown to root (long flag)"),
(r'\bmkfs\b', "format filesystem"),
(r'\bdd\s+.*if=', "disk copy"),
(r'>\s*/dev/sd', "write to block device"),
@@ -279,16 +277,31 @@ DANGEROUS_PATTERNS = [
(r'\bkill\s+-9\s+-1\b', "kill all processes"),
(r'\bpkill\s+-9\b', "force kill processes"),
(r':()\s*{\s*:\s*\|\s*:&\s*}\s*;:', "fork bomb"),
# Indirect execution via command launchers
(r'\b(bash|sh|zsh)\s+-c\s+', "shell command via -c flag"),
(r'\b(python[23]?|perl|ruby|node)\s+-[ec]\s+', "script execution via -e/-c flag"),
# Pipe-to-shell (remote code execution)
(r'\b(curl|wget)\b.*\|\s*(ba)?sh\b', "pipe remote content to shell"),
# Destructive find/xargs patterns
(r'\bxargs\s+.*\brm\b', "xargs with rm"),
(r'\bfind\b.*-exec\s+rm\b', "find -exec rm"),
(r'\bfind\b.*-delete\b', "find -delete"),
]
def _load_permanent_allowlist() -> set:
"""Load permanently allowed command patterns from config."""
"""Load permanently allowed command patterns from config.
Also syncs them into the approval module so is_approved() works for
patterns that were added via 'always' in a previous session.
"""
try:
from hermes_cli.config import load_config
config = load_config()
patterns = config.get("command_allowlist", [])
return set(patterns) if patterns else set()
patterns = set(config.get("command_allowlist", []) or [])
if patterns:
_approval.load_permanent(patterns)
return patterns
except Exception:
return set()
@@ -325,14 +338,8 @@ def _detect_dangerous_command(command: str) -> tuple:
def _is_command_approved(pattern_key: str) -> bool:
"""Check if a pattern is approved (session or permanent)."""
if pattern_key in _session_approved_patterns:
return True
permanent = _load_permanent_allowlist()
if pattern_key in permanent:
return True
return False
session_key = os.getenv("HERMES_SESSION_KEY", "default")
return _approval.is_approved(session_key, pattern_key)
def _prompt_dangerous_approval(command: str, description: str, timeout_seconds: int = 60) -> str:
@@ -446,12 +453,12 @@ def _check_dangerous_command(command: str, env_type: str) -> dict:
if is_gateway or os.getenv("HERMES_EXEC_ASK"):
# Messaging context - return approval_required so the gateway can
# prompt the user interactively instead of just blocking
global _last_pending_approval
_last_pending_approval = {
session_key = os.getenv("HERMES_SESSION_KEY", "default")
_approval.submit_pending(session_key, {
"command": command,
"pattern_key": pattern_key,
"description": description,
}
})
return {
"approved": False,
"pattern_key": pattern_key,
@@ -467,14 +474,13 @@ def _check_dangerous_command(command: str, env_type: str) -> dict:
if choice == "deny":
return {"approved": False, "message": "BLOCKED: User denied this potentially dangerous command. Do NOT retry this command - the user has explicitly rejected it."}
# Handle approval
session_key = os.getenv("HERMES_SESSION_KEY", "default")
if choice == "session":
_session_approved_patterns.add(pattern_key)
_approval.approve_session(session_key, pattern_key)
elif choice == "always":
_session_approved_patterns.add(pattern_key)
permanent = _load_permanent_allowlist()
permanent.add(pattern_key)
_save_permanent_allowlist(permanent)
_approval.approve_session(session_key, pattern_key)
_approval.approve_permanent(pattern_key)
_save_permanent_allowlist(_load_permanent_allowlist() | {pattern_key})
return {"approved": True, "message": None}

View File

@@ -26,6 +26,42 @@ Usage:
from typing import List, Dict, Any, Set, Optional
# Shared tool list for CLI and all messaging platform toolsets.
# Messaging platforms add "send_message" on top of this list.
# Edit this once to update all platforms simultaneously.
_HERMES_CORE_TOOLS = [
# Web
"web_search", "web_extract",
# Terminal + process management
"terminal", "process",
# File manipulation
"read_file", "write_file", "patch", "search_files",
# Vision + image generation
"vision_analyze", "image_generate",
# MoA
"mixture_of_agents",
# Skills
"skills_list", "skill_view", "skill_manage",
# Browser automation
"browser_navigate", "browser_snapshot", "browser_click",
"browser_type", "browser_scroll", "browser_back",
"browser_press", "browser_close", "browser_get_images",
"browser_vision",
# Text-to-speech
"text_to_speech",
# Planning & memory
"todo", "memory",
# Session history search
"session_search",
# Clarifying questions
"clarify",
# Code execution + delegation
"execute_code", "delegate_task",
# Cronjob management
"schedule_cronjob", "list_cronjobs", "remove_cronjob",
]
# Core toolset definitions
# These can include individual tools or reference other toolsets
TOOLSETS = {
@@ -165,212 +201,40 @@ TOOLSETS = {
},
# ==========================================================================
# CLI-specific toolsets (only available when running via cli.py)
# Full Hermes toolsets (CLI + messaging platforms)
#
# All platforms share the same core tools. Messaging platforms add
# send_message for cross-channel messaging. Defined via _HERMES_CORE_TOOLS
# to avoid duplicating the tool list for each platform.
# ==========================================================================
"hermes-cli": {
"description": "Full interactive CLI toolset - all default tools plus cronjob management",
"tools": [
# Web tools
"web_search", "web_extract",
# Terminal + process management
"terminal", "process",
# File manipulation
"read_file", "write_file", "patch", "search_files",
# Vision
"vision_analyze",
# Image generation
"image_generate",
# MoA
"mixture_of_agents",
# Skills
"skills_list", "skill_view", "skill_manage",
# Browser
"browser_navigate", "browser_snapshot", "browser_click",
"browser_type", "browser_scroll", "browser_back",
"browser_press", "browser_close", "browser_get_images",
"browser_vision",
# Text-to-speech
"text_to_speech",
# Planning & task management
"todo",
# Persistent memory
"memory",
# Session history search
"session_search",
# Clarifying questions
"clarify",
# Code execution sandbox (programmatic tool calling)
"execute_code",
# Subagent delegation
"delegate_task",
# Cronjob management (CLI-only)
"schedule_cronjob", "list_cronjobs", "remove_cronjob"
],
"tools": _HERMES_CORE_TOOLS,
"includes": []
},
# ==========================================================================
# Messaging Platform-Specific Toolsets
# ==========================================================================
"hermes-telegram": {
"description": "Telegram bot toolset - full access for personal use (terminal has safety checks)",
"tools": [
# Terminal + process management
"terminal", "process",
# File manipulation
"read_file", "write_file", "patch", "search_files",
# Web tools
"web_search", "web_extract",
# Vision - analyze images sent by users
"vision_analyze",
# Image generation
"image_generate",
# Text-to-speech
"text_to_speech",
# Browser automation (requires Browserbase API key)
"browser_navigate", "browser_snapshot", "browser_click",
"browser_type", "browser_scroll", "browser_back",
"browser_press", "browser_close", "browser_get_images",
"browser_vision",
# Skills - access knowledge base
"skills_list", "skill_view", "skill_manage",
# Planning & task management
"todo",
# Persistent memory
"memory",
# Session history search
"session_search",
# Code execution sandbox (programmatic tool calling)
"execute_code",
# Subagent delegation
"delegate_task",
# Cronjob management - let users schedule tasks
"schedule_cronjob", "list_cronjobs", "remove_cronjob",
# Cross-channel messaging
"send_message"
],
"tools": _HERMES_CORE_TOOLS + ["send_message"],
"includes": []
},
"hermes-discord": {
"description": "Discord bot toolset - full access (terminal has safety checks via dangerous command approval)",
"tools": [
# Terminal + process management
"terminal", "process",
# File manipulation
"read_file", "write_file", "patch", "search_files",
# Web tools
"web_search", "web_extract",
# Vision - analyze images sent by users
"vision_analyze",
# Image generation
"image_generate",
# Text-to-speech
"text_to_speech",
# Browser automation (requires Browserbase API key)
"browser_navigate", "browser_snapshot", "browser_click",
"browser_type", "browser_scroll", "browser_back",
"browser_press", "browser_close", "browser_get_images",
"browser_vision",
# Skills - access knowledge base
"skills_list", "skill_view", "skill_manage",
# Planning & task management
"todo",
# Persistent memory
"memory",
# Session history search
"session_search",
# Code execution sandbox (programmatic tool calling)
"execute_code",
# Subagent delegation
"delegate_task",
# Cronjob management - let users schedule tasks
"schedule_cronjob", "list_cronjobs", "remove_cronjob",
# Cross-channel messaging
"send_message"
],
"tools": _HERMES_CORE_TOOLS + ["send_message"],
"includes": []
},
"hermes-whatsapp": {
"description": "WhatsApp bot toolset - similar to Telegram (personal messaging, more trusted)",
"tools": [
# Web tools
"web_search", "web_extract",
# Terminal + process management
"terminal", "process",
# File manipulation
"read_file", "write_file", "patch", "search_files",
# Vision
"vision_analyze",
# Image generation
"image_generate",
# Text-to-speech
"text_to_speech",
# Browser automation (requires Browserbase API key)
"browser_navigate", "browser_snapshot", "browser_click",
"browser_type", "browser_scroll", "browser_back",
"browser_press", "browser_close", "browser_get_images",
"browser_vision",
# Skills
"skills_list", "skill_view", "skill_manage",
# Planning & task management
"todo",
# Persistent memory
"memory",
# Session history search
"session_search",
# Code execution sandbox (programmatic tool calling)
"execute_code",
# Subagent delegation
"delegate_task",
# Cronjob management
"schedule_cronjob", "list_cronjobs", "remove_cronjob",
# Cross-channel messaging
"send_message"
],
"tools": _HERMES_CORE_TOOLS + ["send_message"],
"includes": []
},
"hermes-slack": {
"description": "Slack bot toolset - full access for workspace use (terminal has safety checks)",
"tools": [
# Terminal + process management
"terminal", "process",
# File manipulation
"read_file", "write_file", "patch", "search_files",
# Web tools
"web_search", "web_extract",
# Vision - analyze images sent by users
"vision_analyze",
# Image generation
"image_generate",
# Text-to-speech
"text_to_speech",
# Browser automation (requires Browserbase API key)
"browser_navigate", "browser_snapshot", "browser_click",
"browser_type", "browser_scroll", "browser_back",
"browser_press", "browser_close", "browser_get_images",
"browser_vision",
# Skills - access knowledge base
"skills_list", "skill_view", "skill_manage",
# Planning & task management
"todo",
# Persistent memory
"memory",
# Session history search
"session_search",
# Code execution sandbox (programmatic tool calling)
"execute_code",
# Subagent delegation
"delegate_task",
# Cronjob management - let users schedule tasks
"schedule_cronjob", "list_cronjobs", "remove_cronjob",
# Cross-channel messaging
"send_message"
],
"tools": _HERMES_CORE_TOOLS + ["send_message"],
"includes": []
},