fix: harden salvaged worktree include checks
Use Path.relative_to-based containment checks for the salvaged .worktreeinclude guard, remove the replayed test logic from the cherry-picked PR, and add real integration regressions for file, directory, and symlink escapes.
This commit is contained in:
20
cli.py
20
cli.py
@@ -518,6 +518,15 @@ def _git_repo_root() -> Optional[str]:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _path_is_within_root(path: Path, root: Path) -> bool:
|
||||||
|
"""Return True when a resolved path stays within the expected root."""
|
||||||
|
try:
|
||||||
|
path.relative_to(root)
|
||||||
|
return True
|
||||||
|
except ValueError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def _setup_worktree(repo_root: str = None) -> Optional[Dict[str, str]]:
|
def _setup_worktree(repo_root: str = None) -> Optional[Dict[str, str]]:
|
||||||
"""Create an isolated git worktree for this CLI session.
|
"""Create an isolated git worktree for this CLI session.
|
||||||
|
|
||||||
@@ -579,18 +588,19 @@ def _setup_worktree(repo_root: str = None) -> Optional[Dict[str, str]]:
|
|||||||
continue
|
continue
|
||||||
src = Path(repo_root) / entry
|
src = Path(repo_root) / entry
|
||||||
dst = wt_path / entry
|
dst = wt_path / entry
|
||||||
# Prevent path traversal: ensure src stays within repo_root
|
# Prevent path traversal and symlink escapes: both the resolved
|
||||||
# and dst stays within the worktree directory
|
# source and the resolved destination must stay inside their
|
||||||
|
# expected roots before any file or symlink operation happens.
|
||||||
try:
|
try:
|
||||||
src_resolved = src.resolve()
|
src_resolved = src.resolve(strict=False)
|
||||||
dst_resolved = dst.resolve(strict=False)
|
dst_resolved = dst.resolve(strict=False)
|
||||||
except (OSError, ValueError):
|
except (OSError, ValueError):
|
||||||
logger.debug("Skipping invalid .worktreeinclude entry: %s", entry)
|
logger.debug("Skipping invalid .worktreeinclude entry: %s", entry)
|
||||||
continue
|
continue
|
||||||
if not str(src_resolved).startswith(str(repo_root_resolved) + os.sep) and src_resolved != repo_root_resolved:
|
if not _path_is_within_root(src_resolved, repo_root_resolved):
|
||||||
logger.warning("Skipping .worktreeinclude entry outside repo root: %s", entry)
|
logger.warning("Skipping .worktreeinclude entry outside repo root: %s", entry)
|
||||||
continue
|
continue
|
||||||
if not str(dst_resolved).startswith(str(wt_path_resolved) + os.sep) and dst_resolved != wt_path_resolved:
|
if not _path_is_within_root(dst_resolved, wt_path_resolved):
|
||||||
logger.warning("Skipping .worktreeinclude entry that escapes worktree: %s", entry)
|
logger.warning("Skipping .worktreeinclude entry that escapes worktree: %s", entry)
|
||||||
continue
|
continue
|
||||||
if src.is_file():
|
if src.is_file():
|
||||||
|
|||||||
@@ -633,75 +633,3 @@ class TestSystemPromptInjection:
|
|||||||
assert info["repo_root"] in wt_note
|
assert info["repo_root"] in wt_note
|
||||||
assert "isolated git worktree" in wt_note
|
assert "isolated git worktree" in wt_note
|
||||||
assert "commit and push" in wt_note
|
assert "commit and push" in wt_note
|
||||||
|
|
||||||
|
|
||||||
class TestWorktreeIncludePathTraversal:
|
|
||||||
"""Test that .worktreeinclude entries with path traversal are rejected."""
|
|
||||||
|
|
||||||
def test_rejects_parent_directory_traversal(self, git_repo):
|
|
||||||
"""Entries like '../../etc/passwd' must not escape the repo root."""
|
|
||||||
import shutil as _shutil
|
|
||||||
|
|
||||||
# Create a sensitive file outside the repo to simulate the attack
|
|
||||||
outside_file = git_repo.parent / "sensitive.txt"
|
|
||||||
outside_file.write_text("SENSITIVE DATA")
|
|
||||||
|
|
||||||
# Create a .worktreeinclude with a traversal entry
|
|
||||||
(git_repo / ".worktreeinclude").write_text("../sensitive.txt\n")
|
|
||||||
|
|
||||||
info = _setup_worktree(str(git_repo))
|
|
||||||
assert info is not None
|
|
||||||
|
|
||||||
wt_path = Path(info["path"])
|
|
||||||
|
|
||||||
# Replay the fixed logic from cli.py
|
|
||||||
repo_root_resolved = Path(str(git_repo)).resolve()
|
|
||||||
wt_path_resolved = wt_path.resolve()
|
|
||||||
include_file = git_repo / ".worktreeinclude"
|
|
||||||
|
|
||||||
copied_entries = []
|
|
||||||
for line in include_file.read_text().splitlines():
|
|
||||||
entry = line.strip()
|
|
||||||
if not entry or entry.startswith("#"):
|
|
||||||
continue
|
|
||||||
src = Path(str(git_repo)) / entry
|
|
||||||
dst = wt_path / entry
|
|
||||||
try:
|
|
||||||
src_resolved = src.resolve()
|
|
||||||
dst_resolved = dst.resolve(strict=False)
|
|
||||||
except (OSError, ValueError):
|
|
||||||
continue
|
|
||||||
if not str(src_resolved).startswith(str(repo_root_resolved) + os.sep) and src_resolved != repo_root_resolved:
|
|
||||||
continue
|
|
||||||
if not str(dst_resolved).startswith(str(wt_path_resolved) + os.sep) and dst_resolved != wt_path_resolved:
|
|
||||||
continue
|
|
||||||
copied_entries.append(entry)
|
|
||||||
|
|
||||||
# The traversal entry must have been skipped
|
|
||||||
assert len(copied_entries) == 0
|
|
||||||
# The sensitive file must NOT be in the worktree
|
|
||||||
assert not (wt_path / "../sensitive.txt").resolve().is_relative_to(wt_path_resolved)
|
|
||||||
|
|
||||||
def test_allows_valid_entries(self, git_repo):
|
|
||||||
"""Normal entries within the repo should still be processed."""
|
|
||||||
(git_repo / ".env").write_text("KEY=val")
|
|
||||||
(git_repo / ".worktreeinclude").write_text(".env\n")
|
|
||||||
|
|
||||||
info = _setup_worktree(str(git_repo))
|
|
||||||
assert info is not None
|
|
||||||
|
|
||||||
repo_root_resolved = Path(str(git_repo)).resolve()
|
|
||||||
include_file = git_repo / ".worktreeinclude"
|
|
||||||
|
|
||||||
accepted = []
|
|
||||||
for line in include_file.read_text().splitlines():
|
|
||||||
entry = line.strip()
|
|
||||||
if not entry or entry.startswith("#"):
|
|
||||||
continue
|
|
||||||
src = Path(str(git_repo)) / entry
|
|
||||||
src_resolved = src.resolve()
|
|
||||||
if not str(src_resolved).startswith(str(repo_root_resolved) + os.sep) and src_resolved != repo_root_resolved:
|
|
||||||
continue
|
|
||||||
accepted.append(entry)
|
|
||||||
|
|
||||||
assert ".env" in accepted
|
|
||||||
|
|||||||
130
tests/test_worktree_security.py
Normal file
130
tests/test_worktree_security.py
Normal file
@@ -0,0 +1,130 @@
|
|||||||
|
"""Security-focused integration tests for CLI worktree setup."""
|
||||||
|
|
||||||
|
import subprocess
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def git_repo(tmp_path):
|
||||||
|
"""Create a temporary git repo for testing real cli._setup_worktree behavior."""
|
||||||
|
repo = tmp_path / "test-repo"
|
||||||
|
repo.mkdir()
|
||||||
|
subprocess.run(["git", "init"], cwd=repo, check=True, capture_output=True)
|
||||||
|
subprocess.run(["git", "config", "user.email", "test@test.com"], cwd=repo, check=True, capture_output=True)
|
||||||
|
subprocess.run(["git", "config", "user.name", "Test"], cwd=repo, check=True, capture_output=True)
|
||||||
|
(repo / "README.md").write_text("# Test Repo\n")
|
||||||
|
subprocess.run(["git", "add", "."], cwd=repo, check=True, capture_output=True)
|
||||||
|
subprocess.run(["git", "commit", "-m", "Initial commit"], cwd=repo, check=True, capture_output=True)
|
||||||
|
return repo
|
||||||
|
|
||||||
|
|
||||||
|
def _force_remove_worktree(info: dict | None) -> None:
|
||||||
|
if not info:
|
||||||
|
return
|
||||||
|
subprocess.run(
|
||||||
|
["git", "worktree", "remove", info["path"], "--force"],
|
||||||
|
cwd=info["repo_root"],
|
||||||
|
capture_output=True,
|
||||||
|
check=False,
|
||||||
|
)
|
||||||
|
subprocess.run(
|
||||||
|
["git", "branch", "-D", info["branch"]],
|
||||||
|
cwd=info["repo_root"],
|
||||||
|
capture_output=True,
|
||||||
|
check=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestWorktreeIncludeSecurity:
|
||||||
|
def test_rejects_parent_directory_file_traversal(self, git_repo):
|
||||||
|
import cli as cli_mod
|
||||||
|
|
||||||
|
outside_file = git_repo.parent / "sensitive.txt"
|
||||||
|
outside_file.write_text("SENSITIVE DATA")
|
||||||
|
(git_repo / ".worktreeinclude").write_text("../sensitive.txt\n")
|
||||||
|
|
||||||
|
info = None
|
||||||
|
try:
|
||||||
|
info = cli_mod._setup_worktree(str(git_repo))
|
||||||
|
assert info is not None
|
||||||
|
|
||||||
|
wt_path = Path(info["path"])
|
||||||
|
assert not (wt_path.parent / "sensitive.txt").exists()
|
||||||
|
assert not (wt_path / "../sensitive.txt").resolve().exists()
|
||||||
|
finally:
|
||||||
|
_force_remove_worktree(info)
|
||||||
|
|
||||||
|
def test_rejects_parent_directory_directory_traversal(self, git_repo):
|
||||||
|
import cli as cli_mod
|
||||||
|
|
||||||
|
outside_dir = git_repo.parent / "outside-dir"
|
||||||
|
outside_dir.mkdir()
|
||||||
|
(outside_dir / "secret.txt").write_text("SENSITIVE DIR DATA")
|
||||||
|
(git_repo / ".worktreeinclude").write_text("../outside-dir\n")
|
||||||
|
|
||||||
|
info = None
|
||||||
|
try:
|
||||||
|
info = cli_mod._setup_worktree(str(git_repo))
|
||||||
|
assert info is not None
|
||||||
|
|
||||||
|
wt_path = Path(info["path"])
|
||||||
|
escaped_dir = wt_path.parent / "outside-dir"
|
||||||
|
assert not escaped_dir.exists()
|
||||||
|
assert not escaped_dir.is_symlink()
|
||||||
|
finally:
|
||||||
|
_force_remove_worktree(info)
|
||||||
|
|
||||||
|
def test_rejects_symlink_that_resolves_outside_repo(self, git_repo):
|
||||||
|
import cli as cli_mod
|
||||||
|
|
||||||
|
outside_file = git_repo.parent / "linked-secret.txt"
|
||||||
|
outside_file.write_text("LINKED SECRET")
|
||||||
|
(git_repo / "leak.txt").symlink_to(outside_file)
|
||||||
|
(git_repo / ".worktreeinclude").write_text("leak.txt\n")
|
||||||
|
|
||||||
|
info = None
|
||||||
|
try:
|
||||||
|
info = cli_mod._setup_worktree(str(git_repo))
|
||||||
|
assert info is not None
|
||||||
|
|
||||||
|
assert not (Path(info["path"]) / "leak.txt").exists()
|
||||||
|
finally:
|
||||||
|
_force_remove_worktree(info)
|
||||||
|
|
||||||
|
def test_allows_valid_file_include(self, git_repo):
|
||||||
|
import cli as cli_mod
|
||||||
|
|
||||||
|
(git_repo / ".env").write_text("SECRET=***\n")
|
||||||
|
(git_repo / ".worktreeinclude").write_text(".env\n")
|
||||||
|
|
||||||
|
info = None
|
||||||
|
try:
|
||||||
|
info = cli_mod._setup_worktree(str(git_repo))
|
||||||
|
assert info is not None
|
||||||
|
|
||||||
|
copied = Path(info["path"]) / ".env"
|
||||||
|
assert copied.exists()
|
||||||
|
assert copied.read_text() == "SECRET=***\n"
|
||||||
|
finally:
|
||||||
|
_force_remove_worktree(info)
|
||||||
|
|
||||||
|
def test_allows_valid_directory_include(self, git_repo):
|
||||||
|
import cli as cli_mod
|
||||||
|
|
||||||
|
assets_dir = git_repo / ".venv" / "lib"
|
||||||
|
assets_dir.mkdir(parents=True)
|
||||||
|
(assets_dir / "marker.txt").write_text("venv marker")
|
||||||
|
(git_repo / ".worktreeinclude").write_text(".venv\n")
|
||||||
|
|
||||||
|
info = None
|
||||||
|
try:
|
||||||
|
info = cli_mod._setup_worktree(str(git_repo))
|
||||||
|
assert info is not None
|
||||||
|
|
||||||
|
linked_dir = Path(info["path"]) / ".venv"
|
||||||
|
assert linked_dir.is_symlink()
|
||||||
|
assert (linked_dir / "lib" / "marker.txt").read_text() == "venv marker"
|
||||||
|
finally:
|
||||||
|
_force_remove_worktree(info)
|
||||||
Reference in New Issue
Block a user