Files
hermes-agent/tools/credential_files.py

417 lines
15 KiB
Python

"""File passthrough registry for remote terminal backends.
Remote backends (Docker, Modal, SSH) create sandboxes with no host files.
This module ensures that credential files, skill directories, and host-side
cache directories (documents, images, audio, screenshots) are mounted or
synced into those sandboxes so the agent can access them.
**Credentials and skills** — session-scoped registry fed by skill declarations
(``required_credential_files``) and user config (``terminal.credential_files``).
**Cache directories** — gateway-cached uploads, browser screenshots, TTS
audio, and processed images. Mounted read-only so the remote terminal can
reference files the host side created (e.g. ``unzip`` an uploaded archive).
Remote backends call :func:`get_credential_file_mounts`,
:func:`get_skills_directory_mount` / :func:`iter_skills_files`, and
:func:`get_cache_directory_mounts` / :func:`iter_cache_files` at sandbox
creation time and before each command (for resync on Modal).
"""
from __future__ import annotations
import logging
import os
from contextvars import ContextVar
from pathlib import Path
from typing import Dict, List
logger = logging.getLogger(__name__)
# Session-scoped list of credential files to mount.
# Backed by ContextVar to prevent cross-session data bleed in the gateway pipeline.
_registered_files_var: ContextVar[Dict[str, str]] = ContextVar("_registered_files")
def _get_registered() -> Dict[str, str]:
"""Get or create the registered credential files dict for the current context/session."""
try:
return _registered_files_var.get()
except LookupError:
val: Dict[str, str] = {}
_registered_files_var.set(val)
return val
# Cache for config-based file list (loaded once per process).
_config_files: List[Dict[str, str]] | None = None
def _resolve_hermes_home() -> Path:
return Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
def register_credential_file(
relative_path: str,
container_base: str = "/root/.hermes",
) -> bool:
"""Register a credential file for mounting into remote sandboxes.
*relative_path* is relative to ``HERMES_HOME`` (e.g. ``google_token.json``).
Returns True if the file exists on the host and was registered.
Security: rejects absolute paths and path traversal sequences (``..``).
The resolved host path must remain inside HERMES_HOME so that a malicious
skill cannot declare ``required_credential_files: ['../../.ssh/id_rsa']``
and exfiltrate sensitive host files into a container sandbox.
"""
hermes_home = _resolve_hermes_home()
# Reject absolute paths — they bypass the HERMES_HOME sandbox entirely.
if os.path.isabs(relative_path):
logger.warning(
"credential_files: rejected absolute path %r (must be relative to HERMES_HOME)",
relative_path,
)
return False
host_path = hermes_home / relative_path
# Resolve symlinks and normalise ``..`` before the containment check so
# that traversal like ``../. ssh/id_rsa`` cannot escape HERMES_HOME.
try:
resolved = host_path.resolve()
hermes_home_resolved = hermes_home.resolve()
resolved.relative_to(hermes_home_resolved) # raises ValueError if outside
except ValueError:
logger.warning(
"credential_files: rejected path traversal %r "
"(resolves to %s, outside HERMES_HOME %s)",
relative_path,
resolved,
hermes_home_resolved,
)
return False
if not resolved.is_file():
logger.debug("credential_files: skipping %s (not found)", resolved)
return False
container_path = f"{container_base.rstrip('/')}/{relative_path}"
_get_registered()[container_path] = str(resolved)
logger.debug("credential_files: registered %s -> %s", resolved, container_path)
return True
def register_credential_files(
entries: list,
container_base: str = "/root/.hermes",
) -> List[str]:
"""Register multiple credential files from skill frontmatter entries.
Each entry is either a string (relative path) or a dict with a ``path``
key. Returns the list of relative paths that were NOT found on the host
(i.e. missing files).
"""
missing = []
for entry in entries:
if isinstance(entry, str):
rel_path = entry.strip()
elif isinstance(entry, dict):
rel_path = (entry.get("path") or entry.get("name") or "").strip()
else:
continue
if not rel_path:
continue
if not register_credential_file(rel_path, container_base):
missing.append(rel_path)
return missing
def _load_config_files() -> List[Dict[str, str]]:
"""Load ``terminal.credential_files`` from config.yaml (cached)."""
global _config_files
if _config_files is not None:
return _config_files
result: List[Dict[str, str]] = []
try:
hermes_home = _resolve_hermes_home()
config_path = hermes_home / "config.yaml"
if config_path.exists():
import yaml
with open(config_path) as f:
cfg = yaml.safe_load(f) or {}
cred_files = cfg.get("terminal", {}).get("credential_files")
if isinstance(cred_files, list):
hermes_home_resolved = hermes_home.resolve()
for item in cred_files:
if isinstance(item, str) and item.strip():
rel = item.strip()
if os.path.isabs(rel):
logger.warning(
"credential_files: rejected absolute config path %r", rel,
)
continue
host_path = (hermes_home / rel).resolve()
try:
host_path.relative_to(hermes_home_resolved)
except ValueError:
logger.warning(
"credential_files: rejected config path traversal %r "
"(resolves to %s, outside HERMES_HOME %s)",
rel, host_path, hermes_home_resolved,
)
continue
if host_path.is_file():
container_path = f"/root/.hermes/{rel}"
result.append({
"host_path": str(host_path),
"container_path": container_path,
})
except Exception as e:
logger.debug("Could not read terminal.credential_files from config: %s", e)
_config_files = result
return _config_files
def get_credential_file_mounts() -> List[Dict[str, str]]:
"""Return all credential files that should be mounted into remote sandboxes.
Each item has ``host_path`` and ``container_path`` keys.
Combines skill-registered files and user config.
"""
mounts: Dict[str, str] = {}
# Skill-registered files
for container_path, host_path in _get_registered().items():
# Re-check existence (file may have been deleted since registration)
if Path(host_path).is_file():
mounts[container_path] = host_path
# Config-based files
for entry in _load_config_files():
cp = entry["container_path"]
if cp not in mounts and Path(entry["host_path"]).is_file():
mounts[cp] = entry["host_path"]
return [
{"host_path": hp, "container_path": cp}
for cp, hp in mounts.items()
]
def get_skills_directory_mount(
container_base: str = "/root/.hermes",
) -> list[Dict[str, str]]:
"""Return mount info for all skill directories (local + external).
Skills may include ``scripts/``, ``templates/``, and ``references/``
subdirectories that the agent needs to execute inside remote sandboxes.
**Security:** Bind mounts follow symlinks, so a malicious symlink inside
the skills tree could expose arbitrary host files to the container. When
symlinks are detected, this function creates a sanitized copy (regular
files only) in a temp directory and returns that path instead. When no
symlinks are present (the common case), the original directory is returned
directly with zero overhead.
Returns a list of dicts with ``host_path`` and ``container_path`` keys.
The local skills dir mounts at ``<container_base>/skills``, external dirs
at ``<container_base>/external_skills/<index>``.
"""
mounts = []
hermes_home = _resolve_hermes_home()
skills_dir = hermes_home / "skills"
if skills_dir.is_dir():
host_path = _safe_skills_path(skills_dir)
mounts.append({
"host_path": host_path,
"container_path": f"{container_base.rstrip('/')}/skills",
})
# Mount external skill dirs
try:
from agent.skill_utils import get_external_skills_dirs
for idx, ext_dir in enumerate(get_external_skills_dirs()):
if ext_dir.is_dir():
host_path = _safe_skills_path(ext_dir)
mounts.append({
"host_path": host_path,
"container_path": f"{container_base.rstrip('/')}/external_skills/{idx}",
})
except ImportError:
pass
return mounts
_safe_skills_tempdir: Path | None = None
def _safe_skills_path(skills_dir: Path) -> str:
"""Return *skills_dir* if symlink-free, else a sanitized temp copy."""
global _safe_skills_tempdir
symlinks = [p for p in skills_dir.rglob("*") if p.is_symlink()]
if not symlinks:
return str(skills_dir)
for link in symlinks:
logger.warning("credential_files: skipping symlink in skills dir: %s -> %s",
link, os.readlink(link))
import atexit
import shutil
import tempfile
# Reuse the same temp dir across calls to avoid accumulation.
if _safe_skills_tempdir and _safe_skills_tempdir.is_dir():
shutil.rmtree(_safe_skills_tempdir, ignore_errors=True)
safe_dir = Path(tempfile.mkdtemp(prefix="hermes-skills-safe-"))
_safe_skills_tempdir = safe_dir
for item in skills_dir.rglob("*"):
if item.is_symlink():
continue
rel = item.relative_to(skills_dir)
target = safe_dir / rel
if item.is_dir():
target.mkdir(parents=True, exist_ok=True)
elif item.is_file():
target.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(str(item), str(target))
def _cleanup():
if safe_dir.is_dir():
shutil.rmtree(safe_dir, ignore_errors=True)
atexit.register(_cleanup)
logger.info("credential_files: created symlink-safe skills copy at %s", safe_dir)
return str(safe_dir)
def iter_skills_files(
container_base: str = "/root/.hermes",
) -> List[Dict[str, str]]:
"""Yield individual (host_path, container_path) entries for skills files.
Includes both the local skills dir and any external dirs configured via
skills.external_dirs. Skips symlinks entirely. Preferred for backends
that upload files individually (Daytona, Modal) rather than mounting a
directory.
"""
result: List[Dict[str, str]] = []
hermes_home = _resolve_hermes_home()
skills_dir = hermes_home / "skills"
if skills_dir.is_dir():
container_root = f"{container_base.rstrip('/')}/skills"
for item in skills_dir.rglob("*"):
if item.is_symlink() or not item.is_file():
continue
rel = item.relative_to(skills_dir)
result.append({
"host_path": str(item),
"container_path": f"{container_root}/{rel}",
})
# Include external skill dirs
try:
from agent.skill_utils import get_external_skills_dirs
for idx, ext_dir in enumerate(get_external_skills_dirs()):
if not ext_dir.is_dir():
continue
container_root = f"{container_base.rstrip('/')}/external_skills/{idx}"
for item in ext_dir.rglob("*"):
if item.is_symlink() or not item.is_file():
continue
rel = item.relative_to(ext_dir)
result.append({
"host_path": str(item),
"container_path": f"{container_root}/{rel}",
})
except ImportError:
pass
return result
# ---------------------------------------------------------------------------
# Cache directory mounts (documents, images, audio, screenshots)
# ---------------------------------------------------------------------------
# The four cache subdirectories that should be mirrored into remote backends.
# Each tuple is (new_subpath, old_name) matching hermes_constants.get_hermes_dir().
_CACHE_DIRS: list[tuple[str, str]] = [
("cache/documents", "document_cache"),
("cache/images", "image_cache"),
("cache/audio", "audio_cache"),
("cache/screenshots", "browser_screenshots"),
]
def get_cache_directory_mounts(
container_base: str = "/root/.hermes",
) -> List[Dict[str, str]]:
"""Return mount entries for each cache directory that exists on disk.
Used by Docker to create bind mounts. Each entry has ``host_path`` and
``container_path`` keys. The host path is resolved via
``get_hermes_dir()`` for backward compatibility with old directory layouts.
"""
from hermes_constants import get_hermes_dir
mounts: List[Dict[str, str]] = []
for new_subpath, old_name in _CACHE_DIRS:
host_dir = get_hermes_dir(new_subpath, old_name)
if host_dir.is_dir():
# Always map to the *new* container layout regardless of host layout.
container_path = f"{container_base.rstrip('/')}/{new_subpath}"
mounts.append({
"host_path": str(host_dir),
"container_path": container_path,
})
return mounts
def iter_cache_files(
container_base: str = "/root/.hermes",
) -> List[Dict[str, str]]:
"""Return individual (host_path, container_path) entries for cache files.
Used by Modal to upload files individually and resync before each command.
Skips symlinks. The container paths use the new ``cache/<subdir>`` layout.
"""
from hermes_constants import get_hermes_dir
result: List[Dict[str, str]] = []
for new_subpath, old_name in _CACHE_DIRS:
host_dir = get_hermes_dir(new_subpath, old_name)
if not host_dir.is_dir():
continue
container_root = f"{container_base.rstrip('/')}/{new_subpath}"
for item in host_dir.rglob("*"):
if item.is_symlink() or not item.is_file():
continue
rel = item.relative_to(host_dir)
result.append({
"host_path": str(item),
"container_path": f"{container_root}/{rel}",
})
return result
def clear_credential_files() -> None:
"""Reset the skill-scoped registry (e.g. on session reset)."""
_get_registered().clear()
def reset_config_cache() -> None:
"""Force re-read of config on next access (for testing)."""
global _config_files
_config_files = None