From b177b4abad1dffd60bc2e1527af8917d1ed7442f Mon Sep 17 00:00:00 2001 From: teknium1 Date: Sun, 15 Mar 2026 02:51:04 -0700 Subject: [PATCH] fix(security): block gateway and tool env vars in subprocesses Extend subprocess env sanitization beyond provider credentials by blocking Hermes-managed tool, messaging, and related gateway runtime vars. Reuse a shared sanitizer in LocalEnvironment and ProcessRegistry so background and PTY processes honor the same blocklist and _HERMES_FORCE_ escape hatch. Add regression coverage for local env execution and process_registry spawning. --- tests/tools/test_local_env_blocklist.py | 86 ++++++++++++++++++++-- tests/tools/test_process_registry.py | 50 +++++++++++++ tools/environments/local.py | 96 ++++++++++++++++++++----- tools/process_registry.py | 10 +-- 4 files changed, 215 insertions(+), 27 deletions(-) diff --git a/tests/tools/test_local_env_blocklist.py b/tests/tools/test_local_env_blocklist.py index d0a11b510..f2346c3ef 100644 --- a/tests/tools/test_local_env_blocklist.py +++ b/tests/tools/test_local_env_blocklist.py @@ -1,10 +1,11 @@ -"""Tests for provider env var blocklist in LocalEnvironment. +"""Tests for subprocess env sanitization in LocalEnvironment. -Verifies that Hermes-internal provider env vars (OPENAI_BASE_URL, etc.) -are stripped from subprocess environments so external CLIs are not -silently misrouted. +Verifies that Hermes-managed provider, tool, and gateway env vars are +stripped from subprocess environments so external CLIs are not silently +misrouted or handed Hermes secrets. See: https://github.com/NousResearch/hermes-agent/issues/1002 +See: https://github.com/NousResearch/hermes-agent/issues/1264 """ import os @@ -110,6 +111,30 @@ class TestProviderEnvBlocklist: for var in extra_provider_vars: assert var not in result_env, f"{var} leaked into subprocess env" + def test_tool_and_gateway_vars_are_stripped(self): + """Tool and gateway secrets/config must not leak into subprocess env.""" + leaked_vars = { + "TELEGRAM_BOT_TOKEN": "bot-token", + "TELEGRAM_HOME_CHANNEL": "12345", + "DISCORD_HOME_CHANNEL": "67890", + "SLACK_APP_TOKEN": "xapp-secret", + "WHATSAPP_ALLOWED_USERS": "+15555550123", + "SIGNAL_ACCOUNT": "+15555550124", + "HASS_TOKEN": "ha-secret", + "EMAIL_PASSWORD": "email-secret", + "FIRECRAWL_API_KEY": "fc-secret", + "BROWSERBASE_PROJECT_ID": "bb-project", + "ELEVENLABS_API_KEY": "el-secret", + "GITHUB_TOKEN": "ghp_secret", + "GH_TOKEN": "gh_alias_secret", + "GATEWAY_ALLOW_ALL_USERS": "true", + "GATEWAY_ALLOWED_USERS": "alice,bob", + } + result_env = _run_with_env(extra_os_env=leaked_vars) + + for var in leaked_vars: + assert var not in result_env, f"{var} leaked into subprocess env" + def test_safe_vars_are_preserved(self): """Standard env vars (PATH, HOME, USER) must still be passed through.""" result_env = _run_with_env() @@ -205,3 +230,56 @@ class TestBlocklistCoverage: "HELICONE_API_KEY", } assert extras.issubset(_HERMES_PROVIDER_ENV_BLOCKLIST) + + def test_optional_tool_and_messaging_vars_are_in_blocklist(self): + """Tool/messaging vars from OPTIONAL_ENV_VARS should stay covered.""" + from hermes_cli.config import OPTIONAL_ENV_VARS + + for name, metadata in OPTIONAL_ENV_VARS.items(): + category = metadata.get("category") + if category in {"tool", "messaging"}: + assert name in _HERMES_PROVIDER_ENV_BLOCKLIST, ( + f"Optional env var {name} (category={category}) missing from blocklist" + ) + elif category == "setting" and metadata.get("password"): + assert name in _HERMES_PROVIDER_ENV_BLOCKLIST, ( + f"Secret setting env var {name} missing from blocklist" + ) + + def test_gateway_runtime_vars_are_in_blocklist(self): + extras = { + "TELEGRAM_HOME_CHANNEL", + "TELEGRAM_HOME_CHANNEL_NAME", + "DISCORD_HOME_CHANNEL", + "DISCORD_HOME_CHANNEL_NAME", + "DISCORD_REQUIRE_MENTION", + "DISCORD_FREE_RESPONSE_CHANNELS", + "DISCORD_AUTO_THREAD", + "SLACK_HOME_CHANNEL", + "SLACK_HOME_CHANNEL_NAME", + "SLACK_ALLOWED_USERS", + "WHATSAPP_ENABLED", + "WHATSAPP_MODE", + "WHATSAPP_ALLOWED_USERS", + "SIGNAL_HTTP_URL", + "SIGNAL_ACCOUNT", + "SIGNAL_ALLOWED_USERS", + "SIGNAL_GROUP_ALLOWED_USERS", + "SIGNAL_HOME_CHANNEL", + "SIGNAL_HOME_CHANNEL_NAME", + "SIGNAL_IGNORE_STORIES", + "HASS_TOKEN", + "HASS_URL", + "EMAIL_ADDRESS", + "EMAIL_PASSWORD", + "EMAIL_IMAP_HOST", + "EMAIL_SMTP_HOST", + "EMAIL_HOME_ADDRESS", + "EMAIL_HOME_ADDRESS_NAME", + "GATEWAY_ALLOWED_USERS", + "GH_TOKEN", + "GITHUB_APP_ID", + "GITHUB_APP_PRIVATE_KEY_PATH", + "GITHUB_APP_INSTALLATION_ID", + } + assert extras.issubset(_HERMES_PROVIDER_ENV_BLOCKLIST) diff --git a/tests/tools/test_process_registry.py b/tests/tools/test_process_registry.py index bc5a150ce..7ebe94c04 100644 --- a/tests/tools/test_process_registry.py +++ b/tests/tools/test_process_registry.py @@ -1,11 +1,13 @@ """Tests for tools/process_registry.py — ProcessRegistry query methods, pruning, checkpoint.""" import json +import os import time import pytest from pathlib import Path from unittest.mock import MagicMock, patch +from tools.environments.local import _HERMES_PROVIDER_ENV_FORCE_PREFIX from tools.process_registry import ( ProcessRegistry, ProcessSession, @@ -213,6 +215,54 @@ class TestPruning: assert total <= MAX_PROCESSES +# ========================================================================= +# Spawn env sanitization +# ========================================================================= + +class TestSpawnEnvSanitization: + def test_spawn_local_strips_blocked_vars_from_background_env(self, registry): + captured = {} + + def fake_popen(cmd, **kwargs): + captured["env"] = kwargs["env"] + proc = MagicMock() + proc.pid = 4321 + proc.stdout = iter([]) + proc.stdin = MagicMock() + proc.poll.return_value = None + return proc + + fake_thread = MagicMock() + + with patch.dict(os.environ, { + "PATH": "/usr/bin:/bin", + "HOME": "/home/user", + "USER": "tester", + "TELEGRAM_BOT_TOKEN": "bot-secret", + "FIRECRAWL_API_KEY": "fc-secret", + }, clear=True), \ + patch("tools.process_registry._find_shell", return_value="/bin/bash"), \ + patch("subprocess.Popen", side_effect=fake_popen), \ + patch("threading.Thread", return_value=fake_thread), \ + patch.object(registry, "_write_checkpoint"): + registry.spawn_local( + "echo hello", + cwd="/tmp", + env_vars={ + "MY_CUSTOM_VAR": "keep-me", + "TELEGRAM_BOT_TOKEN": "drop-me", + f"{_HERMES_PROVIDER_ENV_FORCE_PREFIX}TELEGRAM_BOT_TOKEN": "forced-bot-token", + }, + ) + + env = captured["env"] + assert env["MY_CUSTOM_VAR"] == "keep-me" + assert env["TELEGRAM_BOT_TOKEN"] == "forced-bot-token" + assert "FIRECRAWL_API_KEY" not in env + assert f"{_HERMES_PROVIDER_ENV_FORCE_PREFIX}TELEGRAM_BOT_TOKEN" not in env + assert env["PYTHONUNBUFFERED"] == "1" + + # ========================================================================= # Checkpoint # ========================================================================= diff --git a/tools/environments/local.py b/tools/environments/local.py index 75ba53eaf..bd82ded10 100644 --- a/tools/environments/local.py +++ b/tools/environments/local.py @@ -27,11 +27,12 @@ _HERMES_PROVIDER_ENV_FORCE_PREFIX = "_HERMES_FORCE_" def _build_provider_env_blocklist() -> frozenset: - """Derive the blocklist from the provider registry + known extras. + """Derive the blocklist from provider, tool, and gateway config. Automatically picks up api_key_env_vars and base_url_env_var from - every registered provider, so adding a new provider to auth.py is - enough — no manual list to keep in sync. + every registered provider, plus tool/messaging env vars from the + optional config registry, so new Hermes-managed secrets are blocked + in subprocesses without having to maintain multiple static lists. """ blocked: set[str] = set() @@ -44,7 +45,18 @@ def _build_provider_env_blocklist() -> frozenset: except ImportError: pass - # Vars not in the registry but still Hermes-internal / conflict-prone + try: + from hermes_cli.config import OPTIONAL_ENV_VARS + for name, metadata in OPTIONAL_ENV_VARS.items(): + category = metadata.get("category") + if category in {"tool", "messaging"}: + blocked.add(name) + elif category == "setting" and metadata.get("password"): + blocked.add(name) + except ImportError: + pass + + # Vars not covered above but still Hermes-internal / conflict-prone. blocked.update({ "OPENAI_BASE_URL", "OPENAI_API_KEY", @@ -67,6 +79,41 @@ def _build_provider_env_blocklist() -> frozenset: "FIREWORKS_API_KEY", # Fireworks AI "XAI_API_KEY", # xAI (Grok) "HELICONE_API_KEY", # LLM Observability proxy + # Gateway/runtime config not represented in OPTIONAL_ENV_VARS. + "TELEGRAM_HOME_CHANNEL", + "TELEGRAM_HOME_CHANNEL_NAME", + "DISCORD_HOME_CHANNEL", + "DISCORD_HOME_CHANNEL_NAME", + "DISCORD_REQUIRE_MENTION", + "DISCORD_FREE_RESPONSE_CHANNELS", + "DISCORD_AUTO_THREAD", + "SLACK_HOME_CHANNEL", + "SLACK_HOME_CHANNEL_NAME", + "SLACK_ALLOWED_USERS", + "WHATSAPP_ENABLED", + "WHATSAPP_MODE", + "WHATSAPP_ALLOWED_USERS", + "SIGNAL_HTTP_URL", + "SIGNAL_ACCOUNT", + "SIGNAL_ALLOWED_USERS", + "SIGNAL_GROUP_ALLOWED_USERS", + "SIGNAL_HOME_CHANNEL", + "SIGNAL_HOME_CHANNEL_NAME", + "SIGNAL_IGNORE_STORIES", + "HASS_TOKEN", + "HASS_URL", + "EMAIL_ADDRESS", + "EMAIL_PASSWORD", + "EMAIL_IMAP_HOST", + "EMAIL_SMTP_HOST", + "EMAIL_HOME_ADDRESS", + "EMAIL_HOME_ADDRESS_NAME", + "GATEWAY_ALLOWED_USERS", + # Skills Hub / GitHub app auth paths and aliases. + "GH_TOKEN", + "GITHUB_APP_ID", + "GITHUB_APP_PRIVATE_KEY_PATH", + "GITHUB_APP_INSTALLATION_ID", }) return frozenset(blocked) @@ -74,6 +121,30 @@ def _build_provider_env_blocklist() -> frozenset: _HERMES_PROVIDER_ENV_BLOCKLIST = _build_provider_env_blocklist() +def _sanitize_subprocess_env(base_env: dict | None, extra_env: dict | None = None) -> dict: + """Filter Hermes-managed secrets from a subprocess environment. + + `_HERMES_FORCE_` entries in ``extra_env`` opt a blocked variable back in + intentionally for callers that truly need it. + """ + sanitized: dict[str, str] = {} + + for key, value in (base_env or {}).items(): + if key.startswith(_HERMES_PROVIDER_ENV_FORCE_PREFIX): + continue + if key not in _HERMES_PROVIDER_ENV_BLOCKLIST: + sanitized[key] = value + + for key, value in (extra_env or {}).items(): + if key.startswith(_HERMES_PROVIDER_ENV_FORCE_PREFIX): + real_key = key[len(_HERMES_PROVIDER_ENV_FORCE_PREFIX):] + sanitized[real_key] = value + elif key not in _HERMES_PROVIDER_ENV_BLOCKLIST: + sanitized[key] = value + + return sanitized + + def _find_bash() -> str: """Find bash for command execution. @@ -249,18 +320,11 @@ class LocalEnvironment(BaseEnvironment): # Ensure PATH always includes standard dirs — systemd services # and some terminal multiplexers inherit a minimal PATH. _SANE_PATH = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" - # Strip Hermes-internal provider vars so external CLIs - # (e.g. codex) are not silently misrouted. Callers that - # truly need a blocked var can opt in by prefixing the key - # with _HERMES_FORCE_ in self.env (e.g. _HERMES_FORCE_OPENAI_API_KEY). - merged = dict(os.environ | self.env) - run_env = {} - for k, v in merged.items(): - if k.startswith(_HERMES_PROVIDER_ENV_FORCE_PREFIX): - real_key = k[len(_HERMES_PROVIDER_ENV_FORCE_PREFIX):] - run_env[real_key] = v - elif k not in _HERMES_PROVIDER_ENV_BLOCKLIST: - run_env[k] = v + # Strip Hermes-managed provider/tool/gateway vars so external CLIs + # are not silently misrouted or handed Hermes secrets. Callers that + # truly need a blocked var can opt in by prefixing the key with + # _HERMES_FORCE_ in self.env (e.g. _HERMES_FORCE_OPENAI_API_KEY). + run_env = _sanitize_subprocess_env(os.environ, self.env) existing_path = run_env.get("PATH", "") if "/usr/bin" not in existing_path.split(":"): run_env["PATH"] = f"{existing_path}:{_SANE_PATH}" if existing_path else _SANE_PATH diff --git a/tools/process_registry.py b/tools/process_registry.py index d08510d90..51c424532 100644 --- a/tools/process_registry.py +++ b/tools/process_registry.py @@ -42,7 +42,7 @@ import time import uuid _IS_WINDOWS = platform.system() == "Windows" -from tools.environments.local import _find_shell, _HERMES_PROVIDER_ENV_BLOCKLIST +from tools.environments.local import _find_shell, _sanitize_subprocess_env from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional @@ -155,9 +155,7 @@ class ProcessRegistry: else: from ptyprocess import PtyProcess as _PtyProcessCls user_shell = _find_shell() - pty_env = {k: v for k, v in os.environ.items() - if k not in _HERMES_PROVIDER_ENV_BLOCKLIST} - pty_env.update(env_vars or {}) + pty_env = _sanitize_subprocess_env(os.environ, env_vars) pty_env["PYTHONUNBUFFERED"] = "1" pty_proc = _PtyProcessCls.spawn( [user_shell, "-lic", command], @@ -198,9 +196,7 @@ class ProcessRegistry: # Force unbuffered output for Python scripts so progress is visible # during background execution (libraries like tqdm/datasets buffer when # stdout is a pipe, hiding output from process(action="poll")). - bg_env = {k: v for k, v in os.environ.items() - if k not in _HERMES_PROVIDER_ENV_BLOCKLIST} - bg_env.update(env_vars or {}) + bg_env = _sanitize_subprocess_env(os.environ, env_vars) bg_env["PYTHONUNBUFFERED"] = "1" proc = subprocess.Popen( [user_shell, "-lic", command],