fix(profiles): validate tar archive member paths on import
Fixes a zip-slip path traversal vulnerability in hermes profile import. shutil.unpack_archive() on untrusted tar members allows entries like ../../escape.txt to write files outside ~/.hermes/profiles/. - Add _normalize_profile_archive_parts() to reject absolute paths (POSIX and Windows), traversal (..), empty paths, backslash tricks - Add _safe_extract_profile_archive() for manual per-member extraction that only allows regular files and directories (rejects symlinks) - Replace shutil.unpack_archive() with the safe extraction path - Add regression tests for traversal and absolute-path attacks Co-authored-by: Gutslabs <gutslabsxyz@gmail.com>
This commit is contained in:
@@ -27,7 +27,7 @@ import stat
|
||||
import subprocess
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from pathlib import Path, PurePosixPath, PureWindowsPath
|
||||
from typing import List, Optional
|
||||
|
||||
_PROFILE_ID_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,63}$")
|
||||
@@ -702,6 +702,58 @@ def export_profile(name: str, output_path: str) -> Path:
|
||||
return Path(result)
|
||||
|
||||
|
||||
def _normalize_profile_archive_parts(member_name: str) -> List[str]:
|
||||
"""Return safe path parts for a profile archive member."""
|
||||
normalized_name = member_name.replace("\\", "/")
|
||||
posix_path = PurePosixPath(normalized_name)
|
||||
windows_path = PureWindowsPath(member_name)
|
||||
|
||||
if (
|
||||
not normalized_name
|
||||
or posix_path.is_absolute()
|
||||
or windows_path.is_absolute()
|
||||
or windows_path.drive
|
||||
):
|
||||
raise ValueError(f"Unsafe archive member path: {member_name}")
|
||||
|
||||
parts = [part for part in posix_path.parts if part not in ("", ".")]
|
||||
if not parts or any(part == ".." for part in parts):
|
||||
raise ValueError(f"Unsafe archive member path: {member_name}")
|
||||
return parts
|
||||
|
||||
|
||||
def _safe_extract_profile_archive(archive: Path, destination: Path) -> None:
|
||||
"""Extract a profile archive without allowing path escapes or links."""
|
||||
import tarfile
|
||||
|
||||
with tarfile.open(archive, "r:gz") as tf:
|
||||
for member in tf.getmembers():
|
||||
parts = _normalize_profile_archive_parts(member.name)
|
||||
target = destination.joinpath(*parts)
|
||||
|
||||
if member.isdir():
|
||||
target.mkdir(parents=True, exist_ok=True)
|
||||
continue
|
||||
|
||||
if not member.isfile():
|
||||
raise ValueError(
|
||||
f"Unsupported archive member type: {member.name}"
|
||||
)
|
||||
|
||||
target.parent.mkdir(parents=True, exist_ok=True)
|
||||
extracted = tf.extractfile(member)
|
||||
if extracted is None:
|
||||
raise ValueError(f"Cannot read archive member: {member.name}")
|
||||
|
||||
with extracted, open(target, "wb") as dst:
|
||||
shutil.copyfileobj(extracted, dst)
|
||||
|
||||
try:
|
||||
os.chmod(target, member.mode & 0o777)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def import_profile(archive_path: str, name: Optional[str] = None) -> Path:
|
||||
"""Import a profile from a tar.gz archive.
|
||||
|
||||
@@ -716,9 +768,18 @@ def import_profile(archive_path: str, name: Optional[str] = None) -> Path:
|
||||
|
||||
# Peek at the archive to find the top-level directory name
|
||||
with tarfile.open(archive, "r:gz") as tf:
|
||||
top_dirs = {m.name.split("/")[0] for m in tf.getmembers() if "/" in m.name}
|
||||
top_dirs = {
|
||||
parts[0]
|
||||
for member in tf.getmembers()
|
||||
for parts in [_normalize_profile_archive_parts(member.name)]
|
||||
if len(parts) > 1 or member.isdir()
|
||||
}
|
||||
if not top_dirs:
|
||||
top_dirs = {m.name for m in tf.getmembers() if m.isdir()}
|
||||
top_dirs = {
|
||||
_normalize_profile_archive_parts(member.name)[0]
|
||||
for member in tf.getmembers()
|
||||
if member.isdir()
|
||||
}
|
||||
|
||||
inferred_name = name or (top_dirs.pop() if len(top_dirs) == 1 else None)
|
||||
if not inferred_name:
|
||||
@@ -735,7 +796,7 @@ def import_profile(archive_path: str, name: Optional[str] = None) -> Path:
|
||||
profiles_root = _get_profiles_root()
|
||||
profiles_root.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
shutil.unpack_archive(str(archive), str(profiles_root))
|
||||
_safe_extract_profile_archive(archive, profiles_root)
|
||||
|
||||
# If the archive extracted under a different name, rename
|
||||
extracted = profiles_root / (top_dirs.pop() if top_dirs else inferred_name)
|
||||
|
||||
@@ -6,6 +6,7 @@ and shell completion generation.
|
||||
"""
|
||||
|
||||
import json
|
||||
import io
|
||||
import os
|
||||
import tarfile
|
||||
from pathlib import Path
|
||||
@@ -449,6 +450,40 @@ class TestExportImport:
|
||||
with pytest.raises(FileExistsError):
|
||||
import_profile(str(archive_path), name="coder")
|
||||
|
||||
def test_import_rejects_traversal_archive_member(self, profile_env, tmp_path):
|
||||
archive_path = tmp_path / "export" / "evil.tar.gz"
|
||||
archive_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
escape_path = tmp_path / "escape.txt"
|
||||
|
||||
with tarfile.open(archive_path, "w:gz") as tf:
|
||||
info = tarfile.TarInfo("../../escape.txt")
|
||||
data = b"pwned"
|
||||
info.size = len(data)
|
||||
tf.addfile(info, io.BytesIO(data))
|
||||
|
||||
with pytest.raises(ValueError, match="Unsafe archive member path"):
|
||||
import_profile(str(archive_path), name="coder")
|
||||
|
||||
assert not escape_path.exists()
|
||||
assert not get_profile_dir("coder").exists()
|
||||
|
||||
def test_import_rejects_absolute_archive_member(self, profile_env, tmp_path):
|
||||
archive_path = tmp_path / "export" / "evil-abs.tar.gz"
|
||||
archive_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
absolute_target = tmp_path / "abs-escape.txt"
|
||||
|
||||
with tarfile.open(archive_path, "w:gz") as tf:
|
||||
info = tarfile.TarInfo(str(absolute_target))
|
||||
data = b"pwned"
|
||||
info.size = len(data)
|
||||
tf.addfile(info, io.BytesIO(data))
|
||||
|
||||
with pytest.raises(ValueError, match="Unsafe archive member path"):
|
||||
import_profile(str(archive_path), name="coder")
|
||||
|
||||
assert not absolute_target.exists()
|
||||
assert not get_profile_dir("coder").exists()
|
||||
|
||||
def test_export_nonexistent_raises(self, profile_env, tmp_path):
|
||||
with pytest.raises(FileNotFoundError):
|
||||
export_profile("nonexistent", str(tmp_path / "out.tar.gz"))
|
||||
|
||||
Reference in New Issue
Block a user