When neither apptainer nor singularity is installed, the Singularity backend silently defaults to "singularity" and fails with a cryptic FileNotFoundError inside _start_instance(). Add a preflight check that resolves the executable and verifies it responds, raising a clear RuntimeError with install instructions on failure. Closes #1511
370 lines
13 KiB
Python
370 lines
13 KiB
Python
"""Singularity/Apptainer persistent container environment.
|
|
|
|
Security-hardened with --containall, --no-home, capability dropping.
|
|
Supports configurable resource limits and optional filesystem persistence
|
|
via writable overlay directories that survive across sessions.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
import threading
|
|
import uuid
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Optional
|
|
|
|
from hermes_cli.config import get_hermes_home
|
|
from tools.environments.base import BaseEnvironment
|
|
from tools.interrupt import is_interrupted
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_SNAPSHOT_STORE = get_hermes_home() / "singularity_snapshots.json"
|
|
|
|
|
|
def _find_singularity_executable() -> str:
|
|
"""Locate the apptainer or singularity CLI binary.
|
|
|
|
Returns the executable name (``"apptainer"`` or ``"singularity"``).
|
|
Raises ``RuntimeError`` with install instructions if neither is found.
|
|
"""
|
|
if shutil.which("apptainer"):
|
|
return "apptainer"
|
|
if shutil.which("singularity"):
|
|
return "singularity"
|
|
raise RuntimeError(
|
|
"Neither 'apptainer' nor 'singularity' was found in PATH. "
|
|
"Install Apptainer (https://apptainer.org/docs/admin/main/installation.html) "
|
|
"or Singularity and ensure the CLI is available."
|
|
)
|
|
|
|
|
|
def _ensure_singularity_available() -> str:
|
|
"""Preflight check: resolve the executable and verify it responds.
|
|
|
|
Returns the executable name on success.
|
|
Raises ``RuntimeError`` with an actionable message on failure.
|
|
"""
|
|
exe = _find_singularity_executable()
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[exe, "version"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
except FileNotFoundError:
|
|
raise RuntimeError(
|
|
f"Singularity backend selected but the resolved executable '{exe}' "
|
|
"could not be executed. Check your installation."
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
raise RuntimeError(
|
|
f"'{exe} version' timed out. The runtime may be misconfigured."
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
stderr = result.stderr.strip()[:200]
|
|
raise RuntimeError(
|
|
f"'{exe} version' failed (exit code {result.returncode}): {stderr}"
|
|
)
|
|
|
|
return exe
|
|
|
|
|
|
def _load_snapshots() -> Dict[str, str]:
|
|
if _SNAPSHOT_STORE.exists():
|
|
try:
|
|
return json.loads(_SNAPSHOT_STORE.read_text())
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
|
|
def _save_snapshots(data: Dict[str, str]) -> None:
|
|
_SNAPSHOT_STORE.parent.mkdir(parents=True, exist_ok=True)
|
|
_SNAPSHOT_STORE.write_text(json.dumps(data, indent=2))
|
|
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Singularity helpers (scratch dir, SIF cache, SIF building)
|
|
# -------------------------------------------------------------------------
|
|
|
|
def _get_scratch_dir() -> Path:
|
|
"""Get the best directory for Singularity sandboxes.
|
|
|
|
Resolution order:
|
|
1. TERMINAL_SCRATCH_DIR (explicit override)
|
|
2. TERMINAL_SANDBOX_DIR / singularity (shared sandbox root)
|
|
3. /scratch (common on HPC clusters)
|
|
4. ~/.hermes/sandboxes/singularity (fallback)
|
|
"""
|
|
custom_scratch = os.getenv("TERMINAL_SCRATCH_DIR")
|
|
if custom_scratch:
|
|
scratch_path = Path(custom_scratch)
|
|
scratch_path.mkdir(parents=True, exist_ok=True)
|
|
return scratch_path
|
|
|
|
from tools.environments.base import get_sandbox_dir
|
|
sandbox = get_sandbox_dir() / "singularity"
|
|
|
|
scratch = Path("/scratch")
|
|
if scratch.exists() and os.access(scratch, os.W_OK):
|
|
user_scratch = scratch / os.getenv("USER", "hermes") / "hermes-agent"
|
|
user_scratch.mkdir(parents=True, exist_ok=True)
|
|
logger.info("Using /scratch for sandboxes: %s", user_scratch)
|
|
return user_scratch
|
|
|
|
sandbox.mkdir(parents=True, exist_ok=True)
|
|
return sandbox
|
|
|
|
|
|
def _get_apptainer_cache_dir() -> Path:
|
|
"""Get the Apptainer cache directory for SIF images."""
|
|
cache_dir = os.getenv("APPTAINER_CACHEDIR")
|
|
if cache_dir:
|
|
cache_path = Path(cache_dir)
|
|
cache_path.mkdir(parents=True, exist_ok=True)
|
|
return cache_path
|
|
scratch = _get_scratch_dir()
|
|
cache_path = scratch / ".apptainer"
|
|
cache_path.mkdir(parents=True, exist_ok=True)
|
|
return cache_path
|
|
|
|
|
|
_sif_build_lock = threading.Lock()
|
|
|
|
|
|
def _get_or_build_sif(image: str, executable: str = "apptainer") -> str:
|
|
"""Get or build a SIF image from a docker:// URL.
|
|
|
|
Returns the path unchanged if it's already a .sif file.
|
|
For docker:// URLs, checks the cache and builds if needed.
|
|
"""
|
|
if image.endswith('.sif') and Path(image).exists():
|
|
return image
|
|
if not image.startswith('docker://'):
|
|
return image
|
|
|
|
image_name = image.replace('docker://', '').replace('/', '-').replace(':', '-')
|
|
cache_dir = _get_apptainer_cache_dir()
|
|
sif_path = cache_dir / f"{image_name}.sif"
|
|
|
|
if sif_path.exists():
|
|
return str(sif_path)
|
|
|
|
with _sif_build_lock:
|
|
if sif_path.exists():
|
|
return str(sif_path)
|
|
|
|
logger.info("Building SIF image (one-time setup)...")
|
|
logger.info(" Source: %s", image)
|
|
logger.info(" Target: %s", sif_path)
|
|
|
|
tmp_dir = cache_dir / "tmp"
|
|
tmp_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
env = os.environ.copy()
|
|
env["APPTAINER_TMPDIR"] = str(tmp_dir)
|
|
env["APPTAINER_CACHEDIR"] = str(cache_dir)
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[executable, "build", str(sif_path), image],
|
|
capture_output=True, text=True, timeout=600, env=env,
|
|
)
|
|
if result.returncode != 0:
|
|
logger.warning("SIF build failed, falling back to docker:// URL")
|
|
logger.warning(" Error: %s", result.stderr[:500])
|
|
return image
|
|
logger.info("SIF image built successfully")
|
|
return str(sif_path)
|
|
except subprocess.TimeoutExpired:
|
|
logger.warning("SIF build timed out, falling back to docker:// URL")
|
|
if sif_path.exists():
|
|
sif_path.unlink()
|
|
return image
|
|
except Exception as e:
|
|
logger.warning("SIF build error: %s, falling back to docker:// URL", e)
|
|
return image
|
|
|
|
|
|
# -------------------------------------------------------------------------
|
|
# SingularityEnvironment
|
|
# -------------------------------------------------------------------------
|
|
|
|
class SingularityEnvironment(BaseEnvironment):
|
|
"""Hardened Singularity/Apptainer container with resource limits and persistence.
|
|
|
|
Security: --containall (isolated PID/IPC/mount namespaces, no host home mount),
|
|
--no-home, writable-tmpfs for scratch space. The container cannot see or modify
|
|
the host filesystem outside of explicitly bound paths.
|
|
|
|
Persistence: when enabled, the writable overlay directory is preserved across
|
|
sessions so installed packages and files survive cleanup/restore.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
image: str,
|
|
cwd: str = "~",
|
|
timeout: int = 60,
|
|
cpu: float = 0,
|
|
memory: int = 0,
|
|
disk: int = 0,
|
|
persistent_filesystem: bool = False,
|
|
task_id: str = "default",
|
|
):
|
|
super().__init__(cwd=cwd, timeout=timeout)
|
|
self.executable = _ensure_singularity_available()
|
|
self.image = _get_or_build_sif(image, self.executable)
|
|
self.instance_id = f"hermes_{uuid.uuid4().hex[:12]}"
|
|
self._instance_started = False
|
|
self._persistent = persistent_filesystem
|
|
self._task_id = task_id
|
|
self._overlay_dir: Optional[Path] = None
|
|
|
|
# Resource limits
|
|
self._cpu = cpu
|
|
self._memory = memory
|
|
|
|
# Persistent overlay directory
|
|
if self._persistent:
|
|
overlay_base = _get_scratch_dir() / "hermes-overlays"
|
|
overlay_base.mkdir(parents=True, exist_ok=True)
|
|
self._overlay_dir = overlay_base / f"overlay-{task_id}"
|
|
self._overlay_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
self._start_instance()
|
|
|
|
def _start_instance(self):
|
|
cmd = [self.executable, "instance", "start"]
|
|
|
|
# Security: full isolation from host
|
|
cmd.extend(["--containall", "--no-home"])
|
|
|
|
# Writable layer
|
|
if self._persistent and self._overlay_dir:
|
|
# Persistent writable overlay -- survives across restarts
|
|
cmd.extend(["--overlay", str(self._overlay_dir)])
|
|
else:
|
|
cmd.append("--writable-tmpfs")
|
|
|
|
# Resource limits (cgroup-based, may require root or appropriate config)
|
|
if self._memory > 0:
|
|
cmd.extend(["--memory", f"{self._memory}M"])
|
|
if self._cpu > 0:
|
|
cmd.extend(["--cpus", str(self._cpu)])
|
|
|
|
cmd.extend([str(self.image), self.instance_id])
|
|
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"Failed to start instance: {result.stderr}")
|
|
self._instance_started = True
|
|
logger.info("Singularity instance %s started (persistent=%s)",
|
|
self.instance_id, self._persistent)
|
|
except subprocess.TimeoutExpired:
|
|
raise RuntimeError("Instance start timed out")
|
|
|
|
def execute(self, command: str, cwd: str = "", *,
|
|
timeout: int | None = None,
|
|
stdin_data: str | None = None) -> dict:
|
|
if not self._instance_started:
|
|
return {"output": "Instance not started", "returncode": -1}
|
|
|
|
effective_timeout = timeout or self.timeout
|
|
work_dir = cwd or self.cwd
|
|
exec_command, sudo_stdin = self._prepare_command(command)
|
|
|
|
# Merge sudo password (if any) with caller-supplied stdin_data.
|
|
if sudo_stdin is not None and stdin_data is not None:
|
|
effective_stdin = sudo_stdin + stdin_data
|
|
elif sudo_stdin is not None:
|
|
effective_stdin = sudo_stdin
|
|
else:
|
|
effective_stdin = stdin_data
|
|
|
|
# apptainer exec --pwd doesn't expand ~, so prepend a cd into the command
|
|
if work_dir == "~" or work_dir.startswith("~/"):
|
|
exec_command = f"cd {work_dir} && {exec_command}"
|
|
work_dir = "/tmp"
|
|
|
|
cmd = [self.executable, "exec", "--pwd", work_dir,
|
|
f"instance://{self.instance_id}",
|
|
"bash", "-c", exec_command]
|
|
|
|
try:
|
|
import time as _time
|
|
_output_chunks = []
|
|
proc = subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
|
|
stdin=subprocess.PIPE if effective_stdin else subprocess.DEVNULL,
|
|
text=True,
|
|
)
|
|
if effective_stdin:
|
|
try:
|
|
proc.stdin.write(effective_stdin)
|
|
proc.stdin.close()
|
|
except Exception:
|
|
pass
|
|
|
|
def _drain():
|
|
try:
|
|
for line in proc.stdout:
|
|
_output_chunks.append(line)
|
|
except Exception:
|
|
pass
|
|
|
|
reader = threading.Thread(target=_drain, daemon=True)
|
|
reader.start()
|
|
deadline = _time.monotonic() + effective_timeout
|
|
|
|
while proc.poll() is None:
|
|
if is_interrupted():
|
|
proc.terminate()
|
|
try:
|
|
proc.wait(timeout=1)
|
|
except subprocess.TimeoutExpired:
|
|
proc.kill()
|
|
reader.join(timeout=2)
|
|
return {
|
|
"output": "".join(_output_chunks) + "\n[Command interrupted]",
|
|
"returncode": 130,
|
|
}
|
|
if _time.monotonic() > deadline:
|
|
proc.kill()
|
|
reader.join(timeout=2)
|
|
return self._timeout_result(effective_timeout)
|
|
_time.sleep(0.2)
|
|
|
|
reader.join(timeout=5)
|
|
return {"output": "".join(_output_chunks), "returncode": proc.returncode}
|
|
except Exception as e:
|
|
return {"output": f"Singularity execution error: {e}", "returncode": 1}
|
|
|
|
def cleanup(self):
|
|
"""Stop the instance. If persistent, the overlay dir survives for next creation."""
|
|
if self._instance_started:
|
|
try:
|
|
subprocess.run(
|
|
[self.executable, "instance", "stop", self.instance_id],
|
|
capture_output=True, text=True, timeout=30,
|
|
)
|
|
logger.info("Singularity instance %s stopped", self.instance_id)
|
|
except Exception as e:
|
|
logger.warning("Failed to stop Singularity instance %s: %s", self.instance_id, e)
|
|
self._instance_started = False
|
|
|
|
# Record overlay path for persistence restoration
|
|
if self._persistent and self._overlay_dir:
|
|
snapshots = _load_snapshots()
|
|
snapshots[self._task_id] = str(self._overlay_dir)
|
|
_save_snapshots(snapshots)
|