diff --git a/hermes_cli/profiles.py b/hermes_cli/profiles.py index 30da7eb1a..5809186f5 100644 --- a/hermes_cli/profiles.py +++ b/hermes_cli/profiles.py @@ -27,7 +27,7 @@ import stat import subprocess import sys from dataclasses import dataclass, field -from pathlib import Path +from pathlib import Path, PurePosixPath, PureWindowsPath from typing import List, Optional _PROFILE_ID_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,63}$") @@ -702,6 +702,58 @@ def export_profile(name: str, output_path: str) -> Path: return Path(result) +def _normalize_profile_archive_parts(member_name: str) -> List[str]: + """Return safe path parts for a profile archive member.""" + normalized_name = member_name.replace("\\", "/") + posix_path = PurePosixPath(normalized_name) + windows_path = PureWindowsPath(member_name) + + if ( + not normalized_name + or posix_path.is_absolute() + or windows_path.is_absolute() + or windows_path.drive + ): + raise ValueError(f"Unsafe archive member path: {member_name}") + + parts = [part for part in posix_path.parts if part not in ("", ".")] + if not parts or any(part == ".." for part in parts): + raise ValueError(f"Unsafe archive member path: {member_name}") + return parts + + +def _safe_extract_profile_archive(archive: Path, destination: Path) -> None: + """Extract a profile archive without allowing path escapes or links.""" + import tarfile + + with tarfile.open(archive, "r:gz") as tf: + for member in tf.getmembers(): + parts = _normalize_profile_archive_parts(member.name) + target = destination.joinpath(*parts) + + if member.isdir(): + target.mkdir(parents=True, exist_ok=True) + continue + + if not member.isfile(): + raise ValueError( + f"Unsupported archive member type: {member.name}" + ) + + target.parent.mkdir(parents=True, exist_ok=True) + extracted = tf.extractfile(member) + if extracted is None: + raise ValueError(f"Cannot read archive member: {member.name}") + + with extracted, open(target, "wb") as dst: + shutil.copyfileobj(extracted, dst) + + try: + os.chmod(target, member.mode & 0o777) + except OSError: + pass + + def import_profile(archive_path: str, name: Optional[str] = None) -> Path: """Import a profile from a tar.gz archive. @@ -716,9 +768,18 @@ def import_profile(archive_path: str, name: Optional[str] = None) -> Path: # Peek at the archive to find the top-level directory name with tarfile.open(archive, "r:gz") as tf: - top_dirs = {m.name.split("/")[0] for m in tf.getmembers() if "/" in m.name} + top_dirs = { + parts[0] + for member in tf.getmembers() + for parts in [_normalize_profile_archive_parts(member.name)] + if len(parts) > 1 or member.isdir() + } if not top_dirs: - top_dirs = {m.name for m in tf.getmembers() if m.isdir()} + top_dirs = { + _normalize_profile_archive_parts(member.name)[0] + for member in tf.getmembers() + if member.isdir() + } inferred_name = name or (top_dirs.pop() if len(top_dirs) == 1 else None) if not inferred_name: @@ -735,7 +796,7 @@ def import_profile(archive_path: str, name: Optional[str] = None) -> Path: profiles_root = _get_profiles_root() profiles_root.mkdir(parents=True, exist_ok=True) - shutil.unpack_archive(str(archive), str(profiles_root)) + _safe_extract_profile_archive(archive, profiles_root) # If the archive extracted under a different name, rename extracted = profiles_root / (top_dirs.pop() if top_dirs else inferred_name) diff --git a/tests/hermes_cli/test_profiles.py b/tests/hermes_cli/test_profiles.py index 80152a4a0..4e59d250e 100644 --- a/tests/hermes_cli/test_profiles.py +++ b/tests/hermes_cli/test_profiles.py @@ -6,6 +6,7 @@ and shell completion generation. """ import json +import io import os import tarfile from pathlib import Path @@ -449,6 +450,40 @@ class TestExportImport: with pytest.raises(FileExistsError): import_profile(str(archive_path), name="coder") + def test_import_rejects_traversal_archive_member(self, profile_env, tmp_path): + archive_path = tmp_path / "export" / "evil.tar.gz" + archive_path.parent.mkdir(parents=True, exist_ok=True) + escape_path = tmp_path / "escape.txt" + + with tarfile.open(archive_path, "w:gz") as tf: + info = tarfile.TarInfo("../../escape.txt") + data = b"pwned" + info.size = len(data) + tf.addfile(info, io.BytesIO(data)) + + with pytest.raises(ValueError, match="Unsafe archive member path"): + import_profile(str(archive_path), name="coder") + + assert not escape_path.exists() + assert not get_profile_dir("coder").exists() + + def test_import_rejects_absolute_archive_member(self, profile_env, tmp_path): + archive_path = tmp_path / "export" / "evil-abs.tar.gz" + archive_path.parent.mkdir(parents=True, exist_ok=True) + absolute_target = tmp_path / "abs-escape.txt" + + with tarfile.open(archive_path, "w:gz") as tf: + info = tarfile.TarInfo(str(absolute_target)) + data = b"pwned" + info.size = len(data) + tf.addfile(info, io.BytesIO(data)) + + with pytest.raises(ValueError, match="Unsafe archive member path"): + import_profile(str(archive_path), name="coder") + + assert not absolute_target.exists() + assert not get_profile_dir("coder").exists() + def test_export_nonexistent_raises(self, profile_env, tmp_path): with pytest.raises(FileNotFoundError): export_profile("nonexistent", str(tmp_path / "out.tar.gz"))