Isolate system tool configs (git, ssh, gh, npm) per profile by injecting
a per-profile HOME into subprocess environments only. The Python
process's own os.environ['HOME'] and Path.home() are never modified,
preserving all existing profile infrastructure.
Activation is directory-based: when {HERMES_HOME}/home/ exists on disk,
subprocesses see it as HOME. The directory is created automatically for:
- Docker: entrypoint.sh bootstraps it inside the persistent volume
- Named profiles: added to _PROFILE_DIRS in profiles.py
Injection points (all three subprocess env builders):
- tools/environments/local.py _make_run_env() — foreground terminal
- tools/environments/local.py _sanitize_subprocess_env() — background procs
- tools/code_execution_tool.py child_env — execute_code sandbox
Single source of truth: hermes_constants.get_subprocess_home()
Closes #4426
315 lines
10 KiB
Python
315 lines
10 KiB
Python
"""Local execution environment — spawn-per-call with session snapshot."""
|
|
|
|
import os
|
|
import platform
|
|
import shutil
|
|
import signal
|
|
import subprocess
|
|
import tempfile
|
|
|
|
from tools.environments.base import BaseEnvironment, _pipe_stdin
|
|
|
|
_IS_WINDOWS = platform.system() == "Windows"
|
|
|
|
|
|
# Hermes-internal env vars that should NOT leak into terminal subprocesses.
|
|
_HERMES_PROVIDER_ENV_FORCE_PREFIX = "_HERMES_FORCE_"
|
|
|
|
|
|
def _build_provider_env_blocklist() -> frozenset:
|
|
"""Derive the blocklist from provider, tool, and gateway config."""
|
|
blocked: set[str] = set()
|
|
|
|
try:
|
|
from hermes_cli.auth import PROVIDER_REGISTRY
|
|
for pconfig in PROVIDER_REGISTRY.values():
|
|
blocked.update(pconfig.api_key_env_vars)
|
|
if pconfig.base_url_env_var:
|
|
blocked.add(pconfig.base_url_env_var)
|
|
except ImportError:
|
|
pass
|
|
|
|
try:
|
|
from hermes_cli.config import OPTIONAL_ENV_VARS
|
|
for name, metadata in OPTIONAL_ENV_VARS.items():
|
|
category = metadata.get("category")
|
|
if category in {"tool", "messaging"}:
|
|
blocked.add(name)
|
|
elif category == "setting" and metadata.get("password"):
|
|
blocked.add(name)
|
|
except ImportError:
|
|
pass
|
|
|
|
blocked.update({
|
|
"OPENAI_BASE_URL",
|
|
"OPENAI_API_KEY",
|
|
"OPENAI_API_BASE",
|
|
"OPENAI_ORG_ID",
|
|
"OPENAI_ORGANIZATION",
|
|
"OPENROUTER_API_KEY",
|
|
"ANTHROPIC_BASE_URL",
|
|
"ANTHROPIC_TOKEN",
|
|
"CLAUDE_CODE_OAUTH_TOKEN",
|
|
"LLM_MODEL",
|
|
"GOOGLE_API_KEY",
|
|
"DEEPSEEK_API_KEY",
|
|
"MISTRAL_API_KEY",
|
|
"GROQ_API_KEY",
|
|
"TOGETHER_API_KEY",
|
|
"PERPLEXITY_API_KEY",
|
|
"COHERE_API_KEY",
|
|
"FIREWORKS_API_KEY",
|
|
"XAI_API_KEY",
|
|
"HELICONE_API_KEY",
|
|
"PARALLEL_API_KEY",
|
|
"FIRECRAWL_API_KEY",
|
|
"FIRECRAWL_API_URL",
|
|
"TELEGRAM_HOME_CHANNEL",
|
|
"TELEGRAM_HOME_CHANNEL_NAME",
|
|
"DISCORD_HOME_CHANNEL",
|
|
"DISCORD_HOME_CHANNEL_NAME",
|
|
"DISCORD_REQUIRE_MENTION",
|
|
"DISCORD_FREE_RESPONSE_CHANNELS",
|
|
"DISCORD_AUTO_THREAD",
|
|
"SLACK_HOME_CHANNEL",
|
|
"SLACK_HOME_CHANNEL_NAME",
|
|
"SLACK_ALLOWED_USERS",
|
|
"WHATSAPP_ENABLED",
|
|
"WHATSAPP_MODE",
|
|
"WHATSAPP_ALLOWED_USERS",
|
|
"SIGNAL_HTTP_URL",
|
|
"SIGNAL_ACCOUNT",
|
|
"SIGNAL_ALLOWED_USERS",
|
|
"SIGNAL_GROUP_ALLOWED_USERS",
|
|
"SIGNAL_HOME_CHANNEL",
|
|
"SIGNAL_HOME_CHANNEL_NAME",
|
|
"SIGNAL_IGNORE_STORIES",
|
|
"HASS_TOKEN",
|
|
"HASS_URL",
|
|
"EMAIL_ADDRESS",
|
|
"EMAIL_PASSWORD",
|
|
"EMAIL_IMAP_HOST",
|
|
"EMAIL_SMTP_HOST",
|
|
"EMAIL_HOME_ADDRESS",
|
|
"EMAIL_HOME_ADDRESS_NAME",
|
|
"GATEWAY_ALLOWED_USERS",
|
|
"GH_TOKEN",
|
|
"GITHUB_APP_ID",
|
|
"GITHUB_APP_PRIVATE_KEY_PATH",
|
|
"GITHUB_APP_INSTALLATION_ID",
|
|
"MODAL_TOKEN_ID",
|
|
"MODAL_TOKEN_SECRET",
|
|
"DAYTONA_API_KEY",
|
|
})
|
|
return frozenset(blocked)
|
|
|
|
|
|
_HERMES_PROVIDER_ENV_BLOCKLIST = _build_provider_env_blocklist()
|
|
|
|
|
|
def _sanitize_subprocess_env(base_env: dict | None, extra_env: dict | None = None) -> dict:
|
|
"""Filter Hermes-managed secrets from a subprocess environment."""
|
|
try:
|
|
from tools.env_passthrough import is_env_passthrough as _is_passthrough
|
|
except Exception:
|
|
_is_passthrough = lambda _: False # noqa: E731
|
|
|
|
sanitized: dict[str, str] = {}
|
|
|
|
for key, value in (base_env or {}).items():
|
|
if key.startswith(_HERMES_PROVIDER_ENV_FORCE_PREFIX):
|
|
continue
|
|
if key not in _HERMES_PROVIDER_ENV_BLOCKLIST or _is_passthrough(key):
|
|
sanitized[key] = value
|
|
|
|
for key, value in (extra_env or {}).items():
|
|
if key.startswith(_HERMES_PROVIDER_ENV_FORCE_PREFIX):
|
|
real_key = key[len(_HERMES_PROVIDER_ENV_FORCE_PREFIX):]
|
|
sanitized[real_key] = value
|
|
elif key not in _HERMES_PROVIDER_ENV_BLOCKLIST or _is_passthrough(key):
|
|
sanitized[key] = value
|
|
|
|
# Per-profile HOME isolation for background processes (same as _make_run_env).
|
|
from hermes_constants import get_subprocess_home
|
|
_profile_home = get_subprocess_home()
|
|
if _profile_home:
|
|
sanitized["HOME"] = _profile_home
|
|
|
|
return sanitized
|
|
|
|
|
|
def _find_bash() -> str:
|
|
"""Find bash for command execution."""
|
|
if not _IS_WINDOWS:
|
|
return (
|
|
shutil.which("bash")
|
|
or ("/usr/bin/bash" if os.path.isfile("/usr/bin/bash") else None)
|
|
or ("/bin/bash" if os.path.isfile("/bin/bash") else None)
|
|
or os.environ.get("SHELL")
|
|
or "/bin/sh"
|
|
)
|
|
|
|
custom = os.environ.get("HERMES_GIT_BASH_PATH")
|
|
if custom and os.path.isfile(custom):
|
|
return custom
|
|
|
|
found = shutil.which("bash")
|
|
if found:
|
|
return found
|
|
|
|
for candidate in (
|
|
os.path.join(os.environ.get("ProgramFiles", r"C:\Program Files"), "Git", "bin", "bash.exe"),
|
|
os.path.join(os.environ.get("ProgramFiles(x86)", r"C:\Program Files (x86)"), "Git", "bin", "bash.exe"),
|
|
os.path.join(os.environ.get("LOCALAPPDATA", ""), "Programs", "Git", "bin", "bash.exe"),
|
|
):
|
|
if candidate and os.path.isfile(candidate):
|
|
return candidate
|
|
|
|
raise RuntimeError(
|
|
"Git Bash not found. Hermes Agent requires Git for Windows on Windows.\n"
|
|
"Install it from: https://git-scm.com/download/win\n"
|
|
"Or set HERMES_GIT_BASH_PATH to your bash.exe location."
|
|
)
|
|
|
|
|
|
# Backward compat — process_registry.py imports this name
|
|
_find_shell = _find_bash
|
|
|
|
|
|
# Standard PATH entries for environments with minimal PATH.
|
|
_SANE_PATH = (
|
|
"/opt/homebrew/bin:/opt/homebrew/sbin:"
|
|
"/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
|
|
)
|
|
|
|
|
|
def _make_run_env(env: dict) -> dict:
|
|
"""Build a run environment with a sane PATH and provider-var stripping."""
|
|
try:
|
|
from tools.env_passthrough import is_env_passthrough as _is_passthrough
|
|
except Exception:
|
|
_is_passthrough = lambda _: False # noqa: E731
|
|
|
|
merged = dict(os.environ | env)
|
|
run_env = {}
|
|
for k, v in merged.items():
|
|
if k.startswith(_HERMES_PROVIDER_ENV_FORCE_PREFIX):
|
|
real_key = k[len(_HERMES_PROVIDER_ENV_FORCE_PREFIX):]
|
|
run_env[real_key] = v
|
|
elif k not in _HERMES_PROVIDER_ENV_BLOCKLIST or _is_passthrough(k):
|
|
run_env[k] = v
|
|
existing_path = run_env.get("PATH", "")
|
|
if "/usr/bin" not in existing_path.split(":"):
|
|
run_env["PATH"] = f"{existing_path}:{_SANE_PATH}" if existing_path else _SANE_PATH
|
|
|
|
# Per-profile HOME isolation: redirect system tool configs (git, ssh, gh,
|
|
# npm …) into {HERMES_HOME}/home/ when that directory exists. Only the
|
|
# subprocess sees the override — the Python process keeps the real HOME.
|
|
from hermes_constants import get_subprocess_home
|
|
_profile_home = get_subprocess_home()
|
|
if _profile_home:
|
|
run_env["HOME"] = _profile_home
|
|
|
|
return run_env
|
|
|
|
|
|
class LocalEnvironment(BaseEnvironment):
|
|
"""Run commands directly on the host machine.
|
|
|
|
Spawn-per-call: every execute() spawns a fresh bash process.
|
|
Session snapshot preserves env vars across calls.
|
|
CWD persists via file-based read after each command.
|
|
"""
|
|
|
|
def __init__(self, cwd: str = "", timeout: int = 60, env: dict = None):
|
|
super().__init__(cwd=cwd or os.getcwd(), timeout=timeout, env=env)
|
|
self.init_session()
|
|
|
|
def get_temp_dir(self) -> str:
|
|
"""Return a shell-safe writable temp dir for local execution.
|
|
|
|
Termux does not provide /tmp by default, but exposes a POSIX TMPDIR.
|
|
Prefer POSIX-style env vars when available, keep using /tmp on regular
|
|
Unix systems, and only fall back to tempfile.gettempdir() when it also
|
|
resolves to a POSIX path.
|
|
|
|
Check the environment configured for this backend first so callers can
|
|
override the temp root explicitly (for example via terminal.env or a
|
|
custom TMPDIR), then fall back to the host process environment.
|
|
"""
|
|
for env_var in ("TMPDIR", "TMP", "TEMP"):
|
|
candidate = self.env.get(env_var) or os.environ.get(env_var)
|
|
if candidate and candidate.startswith("/"):
|
|
return candidate.rstrip("/") or "/"
|
|
|
|
if os.path.isdir("/tmp") and os.access("/tmp", os.W_OK | os.X_OK):
|
|
return "/tmp"
|
|
|
|
candidate = tempfile.gettempdir()
|
|
if candidate.startswith("/"):
|
|
return candidate.rstrip("/") or "/"
|
|
|
|
return "/tmp"
|
|
|
|
def _run_bash(self, cmd_string: str, *, login: bool = False,
|
|
timeout: int = 120,
|
|
stdin_data: str | None = None) -> subprocess.Popen:
|
|
bash = _find_bash()
|
|
args = [bash, "-l", "-c", cmd_string] if login else [bash, "-c", cmd_string]
|
|
run_env = _make_run_env(self.env)
|
|
|
|
proc = subprocess.Popen(
|
|
args,
|
|
text=True,
|
|
env=run_env,
|
|
encoding="utf-8",
|
|
errors="replace",
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
stdin=subprocess.PIPE if stdin_data is not None else subprocess.DEVNULL,
|
|
preexec_fn=None if _IS_WINDOWS else os.setsid,
|
|
)
|
|
|
|
if stdin_data is not None:
|
|
_pipe_stdin(proc, stdin_data)
|
|
|
|
return proc
|
|
|
|
def _kill_process(self, proc):
|
|
"""Kill the entire process group (all children)."""
|
|
try:
|
|
if _IS_WINDOWS:
|
|
proc.terminate()
|
|
else:
|
|
pgid = os.getpgid(proc.pid)
|
|
os.killpg(pgid, signal.SIGTERM)
|
|
try:
|
|
proc.wait(timeout=1.0)
|
|
except subprocess.TimeoutExpired:
|
|
os.killpg(pgid, signal.SIGKILL)
|
|
except (ProcessLookupError, PermissionError):
|
|
try:
|
|
proc.kill()
|
|
except Exception:
|
|
pass
|
|
|
|
def _update_cwd(self, result: dict):
|
|
"""Read CWD from temp file (local-only, no round-trip needed)."""
|
|
try:
|
|
cwd_path = open(self._cwd_file).read().strip()
|
|
if cwd_path:
|
|
self.cwd = cwd_path
|
|
except (OSError, FileNotFoundError):
|
|
pass
|
|
|
|
# Still strip the marker from output so it's not visible
|
|
self._extract_cwd_from_output(result)
|
|
|
|
def cleanup(self):
|
|
"""Clean up temp files."""
|
|
for f in (self._snapshot_path, self._cwd_file):
|
|
try:
|
|
os.unlink(f)
|
|
except OSError:
|
|
pass
|