Files
hermes-agent/tools/environments/ssh.py

308 lines
12 KiB
Python
Raw Permalink Normal View History

2026-02-21 22:31:43 -08:00
"""SSH remote execution environment with ControlMaster connection persistence."""
import logging
import shutil
2026-02-21 22:31:43 -08:00
import subprocess
import tempfile
import threading
import time
2026-02-21 22:31:43 -08:00
from pathlib import Path
from tools.environments.base import BaseEnvironment
from tools.environments.persistent_shell import PersistentShellMixin
from tools.interrupt import is_interrupted
2026-02-21 22:31:43 -08:00
logger = logging.getLogger(__name__)
def _ensure_ssh_available() -> None:
"""Fail fast with a clear error when the SSH client is unavailable."""
if not shutil.which("ssh"):
raise RuntimeError(
"SSH is not installed or not in PATH. Install OpenSSH client: apt install openssh-client"
)
class SSHEnvironment(PersistentShellMixin, BaseEnvironment):
2026-02-21 22:31:43 -08:00
"""Run commands on a remote machine over SSH.
Uses SSH ControlMaster for connection persistence so subsequent
commands are fast. Security benefit: the agent cannot modify its
own code since execution happens on a separate machine.
Foreground commands are interruptible: the local ssh process is killed
and a remote kill is attempted over the ControlMaster socket.
2026-03-12 23:38:11 +05:30
When ``persistent=True``, a single long-lived bash shell is kept alive
over SSH and state (cwd, env vars, shell variables) persists across
``execute()`` calls. Output capture uses file-based IPC on the remote
host (stdout/stderr/exit-code written to temp files, polled via fast
ControlMaster one-shot reads).
2026-02-21 22:31:43 -08:00
"""
def __init__(self, host: str, user: str, cwd: str = "~",
2026-03-12 23:38:11 +05:30
timeout: int = 60, port: int = 22, key_path: str = "",
persistent: bool = False):
2026-02-21 22:31:43 -08:00
super().__init__(cwd=cwd, timeout=timeout)
self.host = host
self.user = user
self.port = port
self.key_path = key_path
2026-03-12 23:38:11 +05:30
self.persistent = persistent
2026-02-21 22:31:43 -08:00
self.control_dir = Path(tempfile.gettempdir()) / "hermes-ssh"
self.control_dir.mkdir(parents=True, exist_ok=True)
self.control_socket = self.control_dir / f"{user}@{host}:{port}.sock"
_ensure_ssh_available()
2026-02-21 22:31:43 -08:00
self._establish_connection()
self._remote_home = self._detect_remote_home()
self._sync_skills_and_credentials()
2026-02-21 22:31:43 -08:00
2026-03-12 23:38:11 +05:30
if self.persistent:
self._init_persistent_shell()
2026-03-12 23:38:11 +05:30
def _build_ssh_command(self, extra_args: list | None = None) -> list:
2026-02-21 22:31:43 -08:00
cmd = ["ssh"]
cmd.extend(["-o", f"ControlPath={self.control_socket}"])
cmd.extend(["-o", "ControlMaster=auto"])
cmd.extend(["-o", "ControlPersist=300"])
cmd.extend(["-o", "BatchMode=yes"])
cmd.extend(["-o", "StrictHostKeyChecking=accept-new"])
cmd.extend(["-o", "ConnectTimeout=10"])
if self.port != 22:
cmd.extend(["-p", str(self.port)])
if self.key_path:
cmd.extend(["-i", self.key_path])
if extra_args:
cmd.extend(extra_args)
cmd.append(f"{self.user}@{self.host}")
return cmd
def _establish_connection(self):
cmd = self._build_ssh_command()
cmd.append("echo 'SSH connection established'")
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
if result.returncode != 0:
error_msg = result.stderr.strip() or result.stdout.strip()
raise RuntimeError(f"SSH connection failed: {error_msg}")
except subprocess.TimeoutExpired:
raise RuntimeError(f"SSH connection to {self.user}@{self.host} timed out")
def _detect_remote_home(self) -> str:
"""Detect the remote user's home directory."""
try:
cmd = self._build_ssh_command()
cmd.append("echo $HOME")
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
home = result.stdout.strip()
if home and result.returncode == 0:
logger.debug("SSH: remote home = %s", home)
return home
except Exception:
pass
# Fallback: guess from username
if self.user == "root":
return "/root"
return f"/home/{self.user}"
def _sync_skills_and_credentials(self) -> None:
"""Rsync skills directory and credential files to the remote host."""
try:
container_base = f"{self._remote_home}/.hermes"
from tools.credential_files import get_credential_file_mounts, get_skills_directory_mount
rsync_base = ["rsync", "-az", "--timeout=30", "--safe-links"]
ssh_opts = f"ssh -o ControlPath={self.control_socket} -o ControlMaster=auto"
if self.port != 22:
ssh_opts += f" -p {self.port}"
if self.key_path:
ssh_opts += f" -i {self.key_path}"
rsync_base.extend(["-e", ssh_opts])
dest_prefix = f"{self.user}@{self.host}"
# Sync individual credential files (remap /root/.hermes to detected home)
for mount_entry in get_credential_file_mounts():
remote_path = mount_entry["container_path"].replace("/root/.hermes", container_base, 1)
parent_dir = str(Path(remote_path).parent)
mkdir_cmd = self._build_ssh_command()
mkdir_cmd.append(f"mkdir -p {parent_dir}")
subprocess.run(mkdir_cmd, capture_output=True, text=True, timeout=10)
cmd = rsync_base + [mount_entry["host_path"], f"{dest_prefix}:{remote_path}"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
logger.info("SSH: synced credential %s -> %s", mount_entry["host_path"], remote_path)
else:
logger.debug("SSH: rsync credential failed: %s", result.stderr.strip())
# Sync skills directory (remap to detected home)
skills_mount = get_skills_directory_mount(container_base=container_base)
if skills_mount:
remote_path = skills_mount["container_path"]
mkdir_cmd = self._build_ssh_command()
mkdir_cmd.append(f"mkdir -p {remote_path}")
subprocess.run(mkdir_cmd, capture_output=True, text=True, timeout=10)
cmd = rsync_base + [
skills_mount["host_path"].rstrip("/") + "/",
f"{dest_prefix}:{remote_path}/",
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode == 0:
logger.info("SSH: synced skills dir %s -> %s", skills_mount["host_path"], remote_path)
else:
logger.debug("SSH: rsync skills dir failed: %s", result.stderr.strip())
except Exception as e:
logger.debug("SSH: could not sync skills/credentials: %s", e)
def execute(self, command: str, cwd: str = "", *,
timeout: int | None = None,
stdin_data: str | None = None) -> dict:
# Incremental sync before each command so mid-session credential
# refreshes and skill updates are picked up.
self._sync_skills_and_credentials()
return super().execute(command, cwd, timeout=timeout, stdin_data=stdin_data)
_poll_interval_start: float = 0.15 # SSH: higher initial interval (150ms) for network latency
@property
def _temp_prefix(self) -> str:
return f"/tmp/hermes-ssh-{self._session_id}"
2026-03-12 23:38:11 +05:30
def _spawn_shell_process(self) -> subprocess.Popen:
2026-03-12 23:38:11 +05:30
cmd = self._build_ssh_command()
cmd.append("bash -l")
return subprocess.Popen(
2026-03-12 23:38:11 +05:30
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
2026-03-15 01:12:16 +05:30
stderr=subprocess.DEVNULL,
2026-03-12 23:38:11 +05:30
text=True,
)
def _read_temp_files(self, *paths: str) -> list[str]:
if len(paths) == 1:
cmd = self._build_ssh_command()
cmd.append(f"cat {paths[0]} 2>/dev/null")
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=10,
)
return [result.stdout]
except (subprocess.TimeoutExpired, OSError):
return [""]
delim = f"__HERMES_SEP_{self._session_id}__"
script = "; ".join(
f"cat {p} 2>/dev/null; echo '{delim}'" for p in paths
2026-03-12 23:38:11 +05:30
)
cmd = self._build_ssh_command()
cmd.append(script)
2026-03-12 23:38:11 +05:30
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=10,
2026-03-12 23:38:11 +05:30
)
parts = result.stdout.split(delim + "\n")
return [parts[i] if i < len(parts) else "" for i in range(len(paths))]
2026-03-12 23:38:11 +05:30
except (subprocess.TimeoutExpired, OSError):
return [""] * len(paths)
2026-03-12 23:38:11 +05:30
def _kill_shell_children(self):
if self._shell_pid is None:
2026-03-12 23:38:11 +05:30
return
cmd = self._build_ssh_command()
cmd.append(f"pkill -P {self._shell_pid} 2>/dev/null; true")
2026-03-12 23:38:11 +05:30
try:
subprocess.run(cmd, capture_output=True, timeout=5)
except (subprocess.TimeoutExpired, OSError):
pass
def _cleanup_temp_files(self):
cmd = self._build_ssh_command()
cmd.append(f"rm -f {self._temp_prefix}-*")
2026-03-12 23:38:11 +05:30
try:
subprocess.run(cmd, capture_output=True, timeout=5)
except (subprocess.TimeoutExpired, OSError):
pass
2026-03-12 23:38:11 +05:30
def _execute_oneshot(self, command: str, cwd: str = "", *,
timeout: int | None = None,
stdin_data: str | None = None) -> dict:
2026-02-21 22:31:43 -08:00
work_dir = cwd or self.cwd
exec_command, sudo_stdin = self._prepare_command(command)
2026-02-21 22:31:43 -08:00
wrapped = f'cd {work_dir} && {exec_command}'
effective_timeout = timeout or self.timeout
2026-02-21 22:31:43 -08:00
if sudo_stdin is not None and stdin_data is not None:
effective_stdin = sudo_stdin + stdin_data
elif sudo_stdin is not None:
effective_stdin = sudo_stdin
else:
effective_stdin = stdin_data
2026-02-21 22:31:43 -08:00
cmd = self._build_ssh_command()
cmd.append(wrapped)
2026-02-21 22:31:43 -08:00
kwargs = self._build_run_kwargs(timeout, effective_stdin)
kwargs.pop("timeout", None)
_output_chunks = []
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.PIPE if effective_stdin else subprocess.DEVNULL,
text=True,
)
if effective_stdin:
try:
proc.stdin.write(effective_stdin)
proc.stdin.close()
except (BrokenPipeError, OSError):
pass
def _drain():
try:
for line in proc.stdout:
_output_chunks.append(line)
except Exception:
pass
reader = threading.Thread(target=_drain, daemon=True)
reader.start()
deadline = time.monotonic() + effective_timeout
2026-02-21 22:31:43 -08:00
while proc.poll() is None:
if is_interrupted():
proc.terminate()
try:
proc.wait(timeout=1)
except subprocess.TimeoutExpired:
proc.kill()
reader.join(timeout=2)
return {
"output": "".join(_output_chunks) + "\n[Command interrupted]",
"returncode": 130,
}
if time.monotonic() > deadline:
proc.kill()
reader.join(timeout=2)
return self._timeout_result(effective_timeout)
time.sleep(0.2)
reader.join(timeout=5)
return {"output": "".join(_output_chunks), "returncode": proc.returncode}
2026-03-12 23:38:11 +05:30
2026-02-21 22:31:43 -08:00
def cleanup(self):
super().cleanup()
2026-02-21 22:31:43 -08:00
if self.control_socket.exists():
try:
cmd = ["ssh", "-o", f"ControlPath={self.control_socket}",
"-O", "exit", f"{self.user}@{self.host}"]
subprocess.run(cmd, capture_output=True, timeout=5)
except (OSError, subprocess.SubprocessError):
pass
try:
self.control_socket.unlink()
except OSError:
pass