fix(security): pipe sudo password via stdin instead of shell cmdline

This commit is contained in:
johnh4098
2026-03-08 17:46:11 +03:30
committed by teknium1
parent a2ea85924a
commit e9742e202f
8 changed files with 132 additions and 45 deletions

View File

@@ -59,8 +59,16 @@ class BaseEnvironment(ABC):
# Shared helpers (eliminate duplication across backends)
# ------------------------------------------------------------------
def _prepare_command(self, command: str) -> str:
"""Transform sudo commands if SUDO_PASSWORD is available."""
def _prepare_command(self, command: str) -> tuple[str, str | None]:
"""Transform sudo commands if SUDO_PASSWORD is available.
Returns:
(transformed_command, sudo_stdin) — see _transform_sudo_command
for the full contract. Callers that drive a subprocess directly
should prepend sudo_stdin (when not None) to any stdin_data they
pass to Popen. Callers that embed stdin via heredoc (modal,
daytona) handle sudo_stdin in their own execute() method.
"""
from tools.terminal_tool import _transform_sudo_command
return _transform_sudo_command(command)

View File

@@ -181,7 +181,20 @@ class DaytonaEnvironment(BaseEnvironment):
marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}"
command = f"{command} << '{marker}'\n{stdin_data}\n{marker}"
exec_command = self._prepare_command(command)
exec_command, sudo_stdin = self._prepare_command(command)
# Daytona sandboxes execute commands via the Daytona SDK and cannot
# pipe subprocess stdin directly the way a local Popen can. When a
# sudo password is present, use a shell-level pipe from printf so that
# the password feeds sudo -S without appearing as an echo argument
# embedded in the shell string. The password is still visible in the
# remote sandbox's command line, but it is not exposed on the user's
# local machine — which is the primary threat being mitigated.
if sudo_stdin is not None:
import shlex
exec_command = (
f"printf '%s\\n' {shlex.quote(sudo_stdin.rstrip())} | {exec_command}"
)
effective_cwd = cwd or self.cwd or None
effective_timeout = timeout or self.timeout

View File

@@ -193,10 +193,18 @@ class DockerEnvironment(BaseEnvironment):
def execute(self, command: str, cwd: str = "", *,
timeout: int | None = None,
stdin_data: str | None = None) -> dict:
exec_command = self._prepare_command(command)
exec_command, sudo_stdin = self._prepare_command(command)
work_dir = cwd or self.cwd
effective_timeout = timeout or self.timeout
# Merge sudo password (if any) with caller-supplied stdin_data.
if sudo_stdin is not None and stdin_data is not None:
effective_stdin = sudo_stdin + stdin_data
elif sudo_stdin is not None:
effective_stdin = sudo_stdin
else:
effective_stdin = stdin_data
# docker exec -w doesn't expand ~, so prepend a cd into the command
if work_dir == "~" or work_dir.startswith("~/"):
exec_command = f"cd {work_dir} && {exec_command}"
@@ -204,7 +212,7 @@ class DockerEnvironment(BaseEnvironment):
assert self._inner.container_id, "Container not started"
cmd = [self._inner.config.executable, "exec"]
if stdin_data is not None:
if effective_stdin is not None:
cmd.append("-i")
cmd.extend(["-w", work_dir])
for key in self._inner.config.forward_env:
@@ -219,12 +227,12 @@ class DockerEnvironment(BaseEnvironment):
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
stdin=subprocess.PIPE if stdin_data else subprocess.DEVNULL,
stdin=subprocess.PIPE if effective_stdin else subprocess.DEVNULL,
text=True,
)
if stdin_data:
if effective_stdin:
try:
proc.stdin.write(stdin_data)
proc.stdin.write(effective_stdin)
proc.stdin.close()
except Exception:
pass

View File

@@ -161,7 +161,18 @@ class LocalEnvironment(BaseEnvironment):
work_dir = cwd or self.cwd or os.getcwd()
effective_timeout = timeout or self.timeout
exec_command = self._prepare_command(command)
exec_command, sudo_stdin = self._prepare_command(command)
# Merge the sudo password (if any) with caller-supplied stdin_data.
# sudo -S reads exactly one line (the password) then passes the rest
# of stdin to the child, so prepending is safe even when stdin_data
# is also present.
if sudo_stdin is not None and stdin_data is not None:
effective_stdin = sudo_stdin + stdin_data
elif sudo_stdin is not None:
effective_stdin = sudo_stdin
else:
effective_stdin = stdin_data
try:
# The fence wrapper uses bash syntax (semicolons, $?, printf).
@@ -195,14 +206,14 @@ class LocalEnvironment(BaseEnvironment):
errors="replace",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.PIPE if stdin_data is not None else subprocess.DEVNULL,
stdin=subprocess.PIPE if effective_stdin is not None else subprocess.DEVNULL,
preexec_fn=None if _IS_WINDOWS else os.setsid,
)
if stdin_data is not None:
if effective_stdin is not None:
def _write_stdin():
try:
proc.stdin.write(stdin_data)
proc.stdin.write(effective_stdin)
proc.stdin.close()
except (BrokenPipeError, OSError):
pass

View File

@@ -106,7 +106,20 @@ class ModalEnvironment(BaseEnvironment):
marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}"
command = f"{command} << '{marker}'\n{stdin_data}\n{marker}"
exec_command = self._prepare_command(command)
exec_command, sudo_stdin = self._prepare_command(command)
# Modal sandboxes execute commands via the Modal SDK and cannot pipe
# subprocess stdin directly the way a local Popen can. When a sudo
# password is present, use a shell-level pipe from printf so that the
# password feeds sudo -S without appearing as an echo argument embedded
# in the shell string. The password is still visible in the remote
# sandbox's command line, but it is not exposed on the user's local
# machine — which is the primary threat being mitigated.
if sudo_stdin is not None:
import shlex
exec_command = (
f"printf '%s\\n' {shlex.quote(sudo_stdin.rstrip())} | {exec_command}"
)
# Run in a background thread so we can poll for interrupts
result_holder = {"value": None, "error": None}

View File

@@ -228,7 +228,15 @@ class SingularityEnvironment(BaseEnvironment):
effective_timeout = timeout or self.timeout
work_dir = cwd or self.cwd
exec_command = self._prepare_command(command)
exec_command, sudo_stdin = self._prepare_command(command)
# Merge sudo password (if any) with caller-supplied stdin_data.
if sudo_stdin is not None and stdin_data is not None:
effective_stdin = sudo_stdin + stdin_data
elif sudo_stdin is not None:
effective_stdin = sudo_stdin
else:
effective_stdin = stdin_data
# apptainer exec --pwd doesn't expand ~, so prepend a cd into the command
if work_dir == "~" or work_dir.startswith("~/"):
@@ -245,12 +253,12 @@ class SingularityEnvironment(BaseEnvironment):
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
stdin=subprocess.PIPE if stdin_data else subprocess.DEVNULL,
stdin=subprocess.PIPE if effective_stdin else subprocess.DEVNULL,
text=True,
)
if stdin_data:
if effective_stdin:
try:
proc.stdin.write(stdin_data)
proc.stdin.write(effective_stdin)
proc.stdin.close()
except Exception:
pass

View File

@@ -69,15 +69,23 @@ class SSHEnvironment(BaseEnvironment):
timeout: int | None = None,
stdin_data: str | None = None) -> dict:
work_dir = cwd or self.cwd
exec_command = self._prepare_command(command)
exec_command, sudo_stdin = self._prepare_command(command)
wrapped = f'cd {work_dir} && {exec_command}'
effective_timeout = timeout or self.timeout
# Merge sudo password (if any) with caller-supplied stdin_data.
if sudo_stdin is not None and stdin_data is not None:
effective_stdin = sudo_stdin + stdin_data
elif sudo_stdin is not None:
effective_stdin = sudo_stdin
else:
effective_stdin = stdin_data
cmd = self._build_ssh_command()
cmd.extend(["bash", "-c", wrapped])
try:
kwargs = self._build_run_kwargs(timeout, stdin_data)
kwargs = self._build_run_kwargs(timeout, effective_stdin)
# Remove timeout from kwargs -- we handle it in the poll loop
kwargs.pop("timeout", None)
@@ -87,13 +95,13 @@ class SSHEnvironment(BaseEnvironment):
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.PIPE if stdin_data else subprocess.DEVNULL,
stdin=subprocess.PIPE if effective_stdin else subprocess.DEVNULL,
text=True,
)
if stdin_data:
if effective_stdin:
try:
proc.stdin.write(stdin_data)
proc.stdin.write(effective_stdin)
proc.stdin.close()
except Exception:
pass

View File

@@ -291,32 +291,50 @@ def _prompt_for_sudo_password(timeout_seconds: int = 45) -> str:
del os.environ["HERMES_SPINNER_PAUSE"]
def _transform_sudo_command(command: str) -> str:
def _transform_sudo_command(command: str) -> tuple[str, str | None]:
"""
Transform sudo commands to use -S flag if SUDO_PASSWORD is available.
This is a shared helper used by all execution environments to provide
consistent sudo handling across local, SSH, and container environments.
If SUDO_PASSWORD is set (via env, config, or interactive prompt):
'sudo apt install curl' -> password piped via sudo -S
Returns:
(transformed_command, sudo_stdin) where:
- transformed_command has every bare ``sudo`` replaced with
``sudo -S -p ''`` so sudo reads its password from stdin.
- sudo_stdin is the password string with a trailing newline that the
caller must prepend to the process's stdin stream. sudo -S reads
exactly one line (the password) and passes the rest of stdin to the
child command, so prepending is safe even when the caller also has
its own stdin_data to pipe.
- If no password is available, sudo_stdin is None and the command is
returned unchanged so it fails gracefully with
"sudo: a password is required".
Callers that drive a subprocess directly (local, ssh, docker, singularity)
should prepend sudo_stdin to their stdin_data and pass the merged bytes to
Popen's stdin pipe.
Callers that cannot pipe subprocess stdin (modal, daytona) must embed the
password in the command string themselves; see their execute() methods for
how they handle the non-None sudo_stdin case.
If SUDO_PASSWORD is not set and in interactive mode (HERMES_INTERACTIVE=1):
Prompts user for password with 45s timeout, caches for session.
If SUDO_PASSWORD is not set and NOT interactive:
Command runs as-is (fails gracefully with "sudo: a password is required").
"""
global _cached_sudo_password
import re
# Check if command even contains sudo
if not re.search(r'\bsudo\b', command):
return command # No sudo in command, return as-is
return command, None # No sudo in command, nothing to do
# Try to get password from: env var -> session cache -> interactive prompt
sudo_password = os.getenv("SUDO_PASSWORD", "") or _cached_sudo_password
if not sudo_password:
# No password configured - check if we're in interactive mode
if os.getenv("HERMES_INTERACTIVE"):
@@ -324,21 +342,21 @@ def _transform_sudo_command(command: str) -> str:
sudo_password = _prompt_for_sudo_password(timeout_seconds=45)
if sudo_password:
_cached_sudo_password = sudo_password # Cache for session
if not sudo_password:
return command # No password, let it fail gracefully
return command, None # No password, let it fail gracefully
def replace_sudo(match):
# Replace 'sudo' with password-piped version
# The -S flag makes sudo read password from stdin
# The -p '' suppresses the password prompt
# Use shlex.quote() to prevent shell injection via password content
import shlex
return f"echo {shlex.quote(sudo_password)} | sudo -S -p ''"
# Replace bare 'sudo' with 'sudo -S -p ""'.
# The password is returned as sudo_stdin and must be written to the
# process's stdin pipe by the caller — it never appears in any
# command-line argument or shell string.
return "sudo -S -p ''"
# Match 'sudo' at word boundaries (not 'visudo' or 'sudoers')
# This handles: sudo, sudo -flag, etc.
return re.sub(r'\bsudo\b', replace_sudo, command)
transformed = re.sub(r'\bsudo\b', replace_sudo, command)
# Trailing newline is required: sudo -S reads one line for the password.
return transformed, sudo_password + "\n"
# Environment classes now live in tools/environments/