"""Local execution environment — spawn-per-call with session snapshot.""" import os import platform import shutil import signal import subprocess import tempfile from tools.environments.base import BaseEnvironment, _pipe_stdin _IS_WINDOWS = platform.system() == "Windows" # Hermes-internal env vars that should NOT leak into terminal subprocesses. _HERMES_PROVIDER_ENV_FORCE_PREFIX = "_HERMES_FORCE_" def _build_provider_env_blocklist() -> frozenset: """Derive the blocklist from provider, tool, and gateway config.""" blocked: set[str] = set() try: from hermes_cli.auth import PROVIDER_REGISTRY for pconfig in PROVIDER_REGISTRY.values(): blocked.update(pconfig.api_key_env_vars) if pconfig.base_url_env_var: blocked.add(pconfig.base_url_env_var) except ImportError: pass try: from hermes_cli.config import OPTIONAL_ENV_VARS for name, metadata in OPTIONAL_ENV_VARS.items(): category = metadata.get("category") if category in {"tool", "messaging"}: blocked.add(name) elif category == "setting" and metadata.get("password"): blocked.add(name) except ImportError: pass blocked.update({ "OPENAI_BASE_URL", "OPENAI_API_KEY", "OPENAI_API_BASE", "OPENAI_ORG_ID", "OPENAI_ORGANIZATION", "OPENROUTER_API_KEY", "ANTHROPIC_BASE_URL", "ANTHROPIC_TOKEN", "CLAUDE_CODE_OAUTH_TOKEN", "LLM_MODEL", "GOOGLE_API_KEY", "DEEPSEEK_API_KEY", "MISTRAL_API_KEY", "GROQ_API_KEY", "TOGETHER_API_KEY", "PERPLEXITY_API_KEY", "COHERE_API_KEY", "FIREWORKS_API_KEY", "XAI_API_KEY", "HELICONE_API_KEY", "PARALLEL_API_KEY", "FIRECRAWL_API_KEY", "FIRECRAWL_API_URL", "TELEGRAM_HOME_CHANNEL", "TELEGRAM_HOME_CHANNEL_NAME", "DISCORD_HOME_CHANNEL", "DISCORD_HOME_CHANNEL_NAME", "DISCORD_REQUIRE_MENTION", "DISCORD_FREE_RESPONSE_CHANNELS", "DISCORD_AUTO_THREAD", "SLACK_HOME_CHANNEL", "SLACK_HOME_CHANNEL_NAME", "SLACK_ALLOWED_USERS", "WHATSAPP_ENABLED", "WHATSAPP_MODE", "WHATSAPP_ALLOWED_USERS", "SIGNAL_HTTP_URL", "SIGNAL_ACCOUNT", "SIGNAL_ALLOWED_USERS", "SIGNAL_GROUP_ALLOWED_USERS", "SIGNAL_HOME_CHANNEL", "SIGNAL_HOME_CHANNEL_NAME", "SIGNAL_IGNORE_STORIES", "HASS_TOKEN", "HASS_URL", "EMAIL_ADDRESS", "EMAIL_PASSWORD", "EMAIL_IMAP_HOST", "EMAIL_SMTP_HOST", "EMAIL_HOME_ADDRESS", "EMAIL_HOME_ADDRESS_NAME", "GATEWAY_ALLOWED_USERS", "GH_TOKEN", "GITHUB_APP_ID", "GITHUB_APP_PRIVATE_KEY_PATH", "GITHUB_APP_INSTALLATION_ID", "MODAL_TOKEN_ID", "MODAL_TOKEN_SECRET", "DAYTONA_API_KEY", }) return frozenset(blocked) _HERMES_PROVIDER_ENV_BLOCKLIST = _build_provider_env_blocklist() def _sanitize_subprocess_env(base_env: dict | None, extra_env: dict | None = None) -> dict: """Filter Hermes-managed secrets from a subprocess environment.""" try: from tools.env_passthrough import is_env_passthrough as _is_passthrough except Exception: _is_passthrough = lambda _: False # noqa: E731 sanitized: dict[str, str] = {} for key, value in (base_env or {}).items(): if key.startswith(_HERMES_PROVIDER_ENV_FORCE_PREFIX): continue if key not in _HERMES_PROVIDER_ENV_BLOCKLIST or _is_passthrough(key): sanitized[key] = value for key, value in (extra_env or {}).items(): if key.startswith(_HERMES_PROVIDER_ENV_FORCE_PREFIX): real_key = key[len(_HERMES_PROVIDER_ENV_FORCE_PREFIX):] sanitized[real_key] = value elif key not in _HERMES_PROVIDER_ENV_BLOCKLIST or _is_passthrough(key): sanitized[key] = value # Per-profile HOME isolation for background processes (same as _make_run_env). from hermes_constants import get_subprocess_home _profile_home = get_subprocess_home() if _profile_home: sanitized["HOME"] = _profile_home return sanitized def _find_bash() -> str: """Find bash for command execution.""" if not _IS_WINDOWS: return ( shutil.which("bash") or ("/usr/bin/bash" if os.path.isfile("/usr/bin/bash") else None) or ("/bin/bash" if os.path.isfile("/bin/bash") else None) or os.environ.get("SHELL") or "/bin/sh" ) custom = os.environ.get("HERMES_GIT_BASH_PATH") if custom and os.path.isfile(custom): return custom found = shutil.which("bash") if found: return found for candidate in ( os.path.join(os.environ.get("ProgramFiles", r"C:\Program Files"), "Git", "bin", "bash.exe"), os.path.join(os.environ.get("ProgramFiles(x86)", r"C:\Program Files (x86)"), "Git", "bin", "bash.exe"), os.path.join(os.environ.get("LOCALAPPDATA", ""), "Programs", "Git", "bin", "bash.exe"), ): if candidate and os.path.isfile(candidate): return candidate raise RuntimeError( "Git Bash not found. Hermes Agent requires Git for Windows on Windows.\n" "Install it from: https://git-scm.com/download/win\n" "Or set HERMES_GIT_BASH_PATH to your bash.exe location." ) # Backward compat — process_registry.py imports this name _find_shell = _find_bash # Standard PATH entries for environments with minimal PATH. _SANE_PATH = ( "/opt/homebrew/bin:/opt/homebrew/sbin:" "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" ) def _make_run_env(env: dict) -> dict: """Build a run environment with a sane PATH and provider-var stripping.""" try: from tools.env_passthrough import is_env_passthrough as _is_passthrough except Exception: _is_passthrough = lambda _: False # noqa: E731 merged = dict(os.environ | env) run_env = {} for k, v in merged.items(): if k.startswith(_HERMES_PROVIDER_ENV_FORCE_PREFIX): real_key = k[len(_HERMES_PROVIDER_ENV_FORCE_PREFIX):] run_env[real_key] = v elif k not in _HERMES_PROVIDER_ENV_BLOCKLIST or _is_passthrough(k): run_env[k] = v existing_path = run_env.get("PATH", "") if "/usr/bin" not in existing_path.split(":"): run_env["PATH"] = f"{existing_path}:{_SANE_PATH}" if existing_path else _SANE_PATH # Per-profile HOME isolation: redirect system tool configs (git, ssh, gh, # npm …) into {HERMES_HOME}/home/ when that directory exists. Only the # subprocess sees the override — the Python process keeps the real HOME. from hermes_constants import get_subprocess_home _profile_home = get_subprocess_home() if _profile_home: run_env["HOME"] = _profile_home return run_env class LocalEnvironment(BaseEnvironment): """Run commands directly on the host machine. Spawn-per-call: every execute() spawns a fresh bash process. Session snapshot preserves env vars across calls. CWD persists via file-based read after each command. """ def __init__(self, cwd: str = "", timeout: int = 60, env: dict = None): super().__init__(cwd=cwd or os.getcwd(), timeout=timeout, env=env) self.init_session() def get_temp_dir(self) -> str: """Return a shell-safe writable temp dir for local execution. Termux does not provide /tmp by default, but exposes a POSIX TMPDIR. Prefer POSIX-style env vars when available, keep using /tmp on regular Unix systems, and only fall back to tempfile.gettempdir() when it also resolves to a POSIX path. Check the environment configured for this backend first so callers can override the temp root explicitly (for example via terminal.env or a custom TMPDIR), then fall back to the host process environment. """ for env_var in ("TMPDIR", "TMP", "TEMP"): candidate = self.env.get(env_var) or os.environ.get(env_var) if candidate and candidate.startswith("/"): return candidate.rstrip("/") or "/" if os.path.isdir("/tmp") and os.access("/tmp", os.W_OK | os.X_OK): return "/tmp" candidate = tempfile.gettempdir() if candidate.startswith("/"): return candidate.rstrip("/") or "/" return "/tmp" def _run_bash(self, cmd_string: str, *, login: bool = False, timeout: int = 120, stdin_data: str | None = None) -> subprocess.Popen: bash = _find_bash() args = [bash, "-l", "-c", cmd_string] if login else [bash, "-c", cmd_string] run_env = _make_run_env(self.env) proc = subprocess.Popen( args, text=True, env=run_env, encoding="utf-8", errors="replace", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE if stdin_data is not None else subprocess.DEVNULL, preexec_fn=None if _IS_WINDOWS else os.setsid, ) if stdin_data is not None: _pipe_stdin(proc, stdin_data) return proc def _kill_process(self, proc): """Kill the entire process group (all children).""" try: if _IS_WINDOWS: proc.terminate() else: pgid = os.getpgid(proc.pid) os.killpg(pgid, signal.SIGTERM) try: proc.wait(timeout=1.0) except subprocess.TimeoutExpired: os.killpg(pgid, signal.SIGKILL) except (ProcessLookupError, PermissionError): try: proc.kill() except Exception: pass def _update_cwd(self, result: dict): """Read CWD from temp file (local-only, no round-trip needed).""" try: cwd_path = open(self._cwd_file).read().strip() if cwd_path: self.cwd = cwd_path except (OSError, FileNotFoundError): pass # Still strip the marker from output so it's not visible self._extract_cwd_from_output(result) def cleanup(self): """Clean up temp files.""" for f in (self._snapshot_path, self._cwd_file): try: os.unlink(f) except OSError: pass