fix(security): block gateway and tool env vars in subprocesses

Extend subprocess env sanitization beyond provider credentials by blocking Hermes-managed tool, messaging, and related gateway runtime vars. Reuse a shared sanitizer in LocalEnvironment and ProcessRegistry so background and PTY processes honor the same blocklist and _HERMES_FORCE_ escape hatch. Add regression coverage for local env execution and process_registry spawning.
This commit is contained in:
teknium1
2026-03-15 02:51:04 -07:00
parent 779f8df6a6
commit b177b4abad
4 changed files with 215 additions and 27 deletions

View File

@@ -1,10 +1,11 @@
"""Tests for provider env var blocklist in LocalEnvironment.
"""Tests for subprocess env sanitization in LocalEnvironment.
Verifies that Hermes-internal provider env vars (OPENAI_BASE_URL, etc.)
are stripped from subprocess environments so external CLIs are not
silently misrouted.
Verifies that Hermes-managed provider, tool, and gateway env vars are
stripped from subprocess environments so external CLIs are not silently
misrouted or handed Hermes secrets.
See: https://github.com/NousResearch/hermes-agent/issues/1002
See: https://github.com/NousResearch/hermes-agent/issues/1264
"""
import os
@@ -110,6 +111,30 @@ class TestProviderEnvBlocklist:
for var in extra_provider_vars:
assert var not in result_env, f"{var} leaked into subprocess env"
def test_tool_and_gateway_vars_are_stripped(self):
"""Tool and gateway secrets/config must not leak into subprocess env."""
leaked_vars = {
"TELEGRAM_BOT_TOKEN": "bot-token",
"TELEGRAM_HOME_CHANNEL": "12345",
"DISCORD_HOME_CHANNEL": "67890",
"SLACK_APP_TOKEN": "xapp-secret",
"WHATSAPP_ALLOWED_USERS": "+15555550123",
"SIGNAL_ACCOUNT": "+15555550124",
"HASS_TOKEN": "ha-secret",
"EMAIL_PASSWORD": "email-secret",
"FIRECRAWL_API_KEY": "fc-secret",
"BROWSERBASE_PROJECT_ID": "bb-project",
"ELEVENLABS_API_KEY": "el-secret",
"GITHUB_TOKEN": "ghp_secret",
"GH_TOKEN": "gh_alias_secret",
"GATEWAY_ALLOW_ALL_USERS": "true",
"GATEWAY_ALLOWED_USERS": "alice,bob",
}
result_env = _run_with_env(extra_os_env=leaked_vars)
for var in leaked_vars:
assert var not in result_env, f"{var} leaked into subprocess env"
def test_safe_vars_are_preserved(self):
"""Standard env vars (PATH, HOME, USER) must still be passed through."""
result_env = _run_with_env()
@@ -205,3 +230,56 @@ class TestBlocklistCoverage:
"HELICONE_API_KEY",
}
assert extras.issubset(_HERMES_PROVIDER_ENV_BLOCKLIST)
def test_optional_tool_and_messaging_vars_are_in_blocklist(self):
"""Tool/messaging vars from OPTIONAL_ENV_VARS should stay covered."""
from hermes_cli.config import OPTIONAL_ENV_VARS
for name, metadata in OPTIONAL_ENV_VARS.items():
category = metadata.get("category")
if category in {"tool", "messaging"}:
assert name in _HERMES_PROVIDER_ENV_BLOCKLIST, (
f"Optional env var {name} (category={category}) missing from blocklist"
)
elif category == "setting" and metadata.get("password"):
assert name in _HERMES_PROVIDER_ENV_BLOCKLIST, (
f"Secret setting env var {name} missing from blocklist"
)
def test_gateway_runtime_vars_are_in_blocklist(self):
extras = {
"TELEGRAM_HOME_CHANNEL",
"TELEGRAM_HOME_CHANNEL_NAME",
"DISCORD_HOME_CHANNEL",
"DISCORD_HOME_CHANNEL_NAME",
"DISCORD_REQUIRE_MENTION",
"DISCORD_FREE_RESPONSE_CHANNELS",
"DISCORD_AUTO_THREAD",
"SLACK_HOME_CHANNEL",
"SLACK_HOME_CHANNEL_NAME",
"SLACK_ALLOWED_USERS",
"WHATSAPP_ENABLED",
"WHATSAPP_MODE",
"WHATSAPP_ALLOWED_USERS",
"SIGNAL_HTTP_URL",
"SIGNAL_ACCOUNT",
"SIGNAL_ALLOWED_USERS",
"SIGNAL_GROUP_ALLOWED_USERS",
"SIGNAL_HOME_CHANNEL",
"SIGNAL_HOME_CHANNEL_NAME",
"SIGNAL_IGNORE_STORIES",
"HASS_TOKEN",
"HASS_URL",
"EMAIL_ADDRESS",
"EMAIL_PASSWORD",
"EMAIL_IMAP_HOST",
"EMAIL_SMTP_HOST",
"EMAIL_HOME_ADDRESS",
"EMAIL_HOME_ADDRESS_NAME",
"GATEWAY_ALLOWED_USERS",
"GH_TOKEN",
"GITHUB_APP_ID",
"GITHUB_APP_PRIVATE_KEY_PATH",
"GITHUB_APP_INSTALLATION_ID",
}
assert extras.issubset(_HERMES_PROVIDER_ENV_BLOCKLIST)

View File

@@ -1,11 +1,13 @@
"""Tests for tools/process_registry.py — ProcessRegistry query methods, pruning, checkpoint."""
import json
import os
import time
import pytest
from pathlib import Path
from unittest.mock import MagicMock, patch
from tools.environments.local import _HERMES_PROVIDER_ENV_FORCE_PREFIX
from tools.process_registry import (
ProcessRegistry,
ProcessSession,
@@ -213,6 +215,54 @@ class TestPruning:
assert total <= MAX_PROCESSES
# =========================================================================
# Spawn env sanitization
# =========================================================================
class TestSpawnEnvSanitization:
def test_spawn_local_strips_blocked_vars_from_background_env(self, registry):
captured = {}
def fake_popen(cmd, **kwargs):
captured["env"] = kwargs["env"]
proc = MagicMock()
proc.pid = 4321
proc.stdout = iter([])
proc.stdin = MagicMock()
proc.poll.return_value = None
return proc
fake_thread = MagicMock()
with patch.dict(os.environ, {
"PATH": "/usr/bin:/bin",
"HOME": "/home/user",
"USER": "tester",
"TELEGRAM_BOT_TOKEN": "bot-secret",
"FIRECRAWL_API_KEY": "fc-secret",
}, clear=True), \
patch("tools.process_registry._find_shell", return_value="/bin/bash"), \
patch("subprocess.Popen", side_effect=fake_popen), \
patch("threading.Thread", return_value=fake_thread), \
patch.object(registry, "_write_checkpoint"):
registry.spawn_local(
"echo hello",
cwd="/tmp",
env_vars={
"MY_CUSTOM_VAR": "keep-me",
"TELEGRAM_BOT_TOKEN": "drop-me",
f"{_HERMES_PROVIDER_ENV_FORCE_PREFIX}TELEGRAM_BOT_TOKEN": "forced-bot-token",
},
)
env = captured["env"]
assert env["MY_CUSTOM_VAR"] == "keep-me"
assert env["TELEGRAM_BOT_TOKEN"] == "forced-bot-token"
assert "FIRECRAWL_API_KEY" not in env
assert f"{_HERMES_PROVIDER_ENV_FORCE_PREFIX}TELEGRAM_BOT_TOKEN" not in env
assert env["PYTHONUNBUFFERED"] == "1"
# =========================================================================
# Checkpoint
# =========================================================================

View File

@@ -27,11 +27,12 @@ _HERMES_PROVIDER_ENV_FORCE_PREFIX = "_HERMES_FORCE_"
def _build_provider_env_blocklist() -> frozenset:
"""Derive the blocklist from the provider registry + known extras.
"""Derive the blocklist from provider, tool, and gateway config.
Automatically picks up api_key_env_vars and base_url_env_var from
every registered provider, so adding a new provider to auth.py is
enough — no manual list to keep in sync.
every registered provider, plus tool/messaging env vars from the
optional config registry, so new Hermes-managed secrets are blocked
in subprocesses without having to maintain multiple static lists.
"""
blocked: set[str] = set()
@@ -44,7 +45,18 @@ def _build_provider_env_blocklist() -> frozenset:
except ImportError:
pass
# Vars not in the registry but still Hermes-internal / conflict-prone
try:
from hermes_cli.config import OPTIONAL_ENV_VARS
for name, metadata in OPTIONAL_ENV_VARS.items():
category = metadata.get("category")
if category in {"tool", "messaging"}:
blocked.add(name)
elif category == "setting" and metadata.get("password"):
blocked.add(name)
except ImportError:
pass
# Vars not covered above but still Hermes-internal / conflict-prone.
blocked.update({
"OPENAI_BASE_URL",
"OPENAI_API_KEY",
@@ -67,6 +79,41 @@ def _build_provider_env_blocklist() -> frozenset:
"FIREWORKS_API_KEY", # Fireworks AI
"XAI_API_KEY", # xAI (Grok)
"HELICONE_API_KEY", # LLM Observability proxy
# Gateway/runtime config not represented in OPTIONAL_ENV_VARS.
"TELEGRAM_HOME_CHANNEL",
"TELEGRAM_HOME_CHANNEL_NAME",
"DISCORD_HOME_CHANNEL",
"DISCORD_HOME_CHANNEL_NAME",
"DISCORD_REQUIRE_MENTION",
"DISCORD_FREE_RESPONSE_CHANNELS",
"DISCORD_AUTO_THREAD",
"SLACK_HOME_CHANNEL",
"SLACK_HOME_CHANNEL_NAME",
"SLACK_ALLOWED_USERS",
"WHATSAPP_ENABLED",
"WHATSAPP_MODE",
"WHATSAPP_ALLOWED_USERS",
"SIGNAL_HTTP_URL",
"SIGNAL_ACCOUNT",
"SIGNAL_ALLOWED_USERS",
"SIGNAL_GROUP_ALLOWED_USERS",
"SIGNAL_HOME_CHANNEL",
"SIGNAL_HOME_CHANNEL_NAME",
"SIGNAL_IGNORE_STORIES",
"HASS_TOKEN",
"HASS_URL",
"EMAIL_ADDRESS",
"EMAIL_PASSWORD",
"EMAIL_IMAP_HOST",
"EMAIL_SMTP_HOST",
"EMAIL_HOME_ADDRESS",
"EMAIL_HOME_ADDRESS_NAME",
"GATEWAY_ALLOWED_USERS",
# Skills Hub / GitHub app auth paths and aliases.
"GH_TOKEN",
"GITHUB_APP_ID",
"GITHUB_APP_PRIVATE_KEY_PATH",
"GITHUB_APP_INSTALLATION_ID",
})
return frozenset(blocked)
@@ -74,6 +121,30 @@ def _build_provider_env_blocklist() -> frozenset:
_HERMES_PROVIDER_ENV_BLOCKLIST = _build_provider_env_blocklist()
def _sanitize_subprocess_env(base_env: dict | None, extra_env: dict | None = None) -> dict:
"""Filter Hermes-managed secrets from a subprocess environment.
`_HERMES_FORCE_<VAR>` entries in ``extra_env`` opt a blocked variable back in
intentionally for callers that truly need it.
"""
sanitized: dict[str, str] = {}
for key, value in (base_env or {}).items():
if key.startswith(_HERMES_PROVIDER_ENV_FORCE_PREFIX):
continue
if key not in _HERMES_PROVIDER_ENV_BLOCKLIST:
sanitized[key] = value
for key, value in (extra_env or {}).items():
if key.startswith(_HERMES_PROVIDER_ENV_FORCE_PREFIX):
real_key = key[len(_HERMES_PROVIDER_ENV_FORCE_PREFIX):]
sanitized[real_key] = value
elif key not in _HERMES_PROVIDER_ENV_BLOCKLIST:
sanitized[key] = value
return sanitized
def _find_bash() -> str:
"""Find bash for command execution.
@@ -249,18 +320,11 @@ class LocalEnvironment(BaseEnvironment):
# Ensure PATH always includes standard dirs — systemd services
# and some terminal multiplexers inherit a minimal PATH.
_SANE_PATH = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
# Strip Hermes-internal provider vars so external CLIs
# (e.g. codex) are not silently misrouted. Callers that
# truly need a blocked var can opt in by prefixing the key
# with _HERMES_FORCE_ in self.env (e.g. _HERMES_FORCE_OPENAI_API_KEY).
merged = dict(os.environ | self.env)
run_env = {}
for k, v in merged.items():
if k.startswith(_HERMES_PROVIDER_ENV_FORCE_PREFIX):
real_key = k[len(_HERMES_PROVIDER_ENV_FORCE_PREFIX):]
run_env[real_key] = v
elif k not in _HERMES_PROVIDER_ENV_BLOCKLIST:
run_env[k] = v
# Strip Hermes-managed provider/tool/gateway vars so external CLIs
# are not silently misrouted or handed Hermes secrets. Callers that
# truly need a blocked var can opt in by prefixing the key with
# _HERMES_FORCE_ in self.env (e.g. _HERMES_FORCE_OPENAI_API_KEY).
run_env = _sanitize_subprocess_env(os.environ, self.env)
existing_path = run_env.get("PATH", "")
if "/usr/bin" not in existing_path.split(":"):
run_env["PATH"] = f"{existing_path}:{_SANE_PATH}" if existing_path else _SANE_PATH

View File

@@ -42,7 +42,7 @@ import time
import uuid
_IS_WINDOWS = platform.system() == "Windows"
from tools.environments.local import _find_shell, _HERMES_PROVIDER_ENV_BLOCKLIST
from tools.environments.local import _find_shell, _sanitize_subprocess_env
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
@@ -155,9 +155,7 @@ class ProcessRegistry:
else:
from ptyprocess import PtyProcess as _PtyProcessCls
user_shell = _find_shell()
pty_env = {k: v for k, v in os.environ.items()
if k not in _HERMES_PROVIDER_ENV_BLOCKLIST}
pty_env.update(env_vars or {})
pty_env = _sanitize_subprocess_env(os.environ, env_vars)
pty_env["PYTHONUNBUFFERED"] = "1"
pty_proc = _PtyProcessCls.spawn(
[user_shell, "-lic", command],
@@ -198,9 +196,7 @@ class ProcessRegistry:
# Force unbuffered output for Python scripts so progress is visible
# during background execution (libraries like tqdm/datasets buffer when
# stdout is a pipe, hiding output from process(action="poll")).
bg_env = {k: v for k, v in os.environ.items()
if k not in _HERMES_PROVIDER_ENV_BLOCKLIST}
bg_env.update(env_vars or {})
bg_env = _sanitize_subprocess_env(os.environ, env_vars)
bg_env["PYTHONUNBUFFERED"] = "1"
proc = subprocess.Popen(
[user_shell, "-lic", command],