fix(skills): validate hub bundle paths before install (#3986)

Co-authored-by: Gutslabs <gutslabsxyz@gmail.com>
This commit is contained in:
Teknium
2026-03-30 08:37:19 -07:00
committed by GitHub
parent e08778fa1e
commit 37825189dd
3 changed files with 178 additions and 18 deletions

View File

@@ -354,7 +354,14 @@ def do_install(identifier: str, category: str = "", force: bool = False,
extra_metadata.update(getattr(bundle, "metadata", {}) or {})
# Quarantine the bundle
q_path = quarantine_bundle(bundle)
try:
q_path = quarantine_bundle(bundle)
except ValueError as exc:
c.print(f"[bold red]Installation blocked:[/] {exc}\n")
from tools.skills_hub import append_audit_log
append_audit_log("BLOCKED", bundle.name, bundle.source,
bundle.trust_level, "invalid_path", str(exc))
return
c.print(f"[dim]Quarantined to {q_path.relative_to(q_path.parent.parent.parent)}[/]")
# Scan
@@ -414,7 +421,15 @@ def do_install(identifier: str, category: str = "", force: bool = False,
return
# Install
install_dir = install_from_quarantine(q_path, bundle.name, category, bundle, result)
try:
install_dir = install_from_quarantine(q_path, bundle.name, category, bundle, result)
except ValueError as exc:
c.print(f"[bold red]Installation blocked:[/] {exc}\n")
shutil.rmtree(q_path, ignore_errors=True)
from tools.skills_hub import append_audit_log
append_audit_log("BLOCKED", bundle.name, bundle.source,
bundle.trust_level, "invalid_path", str(exc))
return
from tools.skills_hub import SKILLS_DIR
c.print(f"[bold green]Installed:[/] {install_dir.relative_to(SKILLS_DIR)}")
c.print(f"[dim]Files: {', '.join(bundle.files.keys())}[/]\n")

View File

@@ -5,6 +5,7 @@ from pathlib import Path
from unittest.mock import patch, MagicMock
import httpx
import pytest
from tools.skills_hub import (
GitHubAuth,
@@ -648,6 +649,29 @@ class TestWellKnownSkillSource:
assert bundle.files["SKILL.md"] == "# Code Review\n"
assert bundle.files["references/checklist.md"] == "- [ ] security\n"
@patch("tools.skills_hub._write_index_cache")
@patch("tools.skills_hub._read_index_cache", return_value=None)
@patch("tools.skills_hub.httpx.get")
def test_fetch_rejects_unsafe_file_paths_from_well_known_endpoint(self, mock_get, _mock_read_cache, _mock_write_cache):
def fake_get(url, *args, **kwargs):
if url.endswith("/index.json"):
return MagicMock(status_code=200, json=lambda: {
"skills": [{
"name": "code-review",
"description": "Review code",
"files": ["SKILL.md", "../../../escape.txt"],
}]
})
if url.endswith("/code-review/SKILL.md"):
return MagicMock(status_code=200, text="# Code Review\n")
raise AssertionError(url)
mock_get.side_effect = fake_get
bundle = self._source().fetch("well-known:https://example.com/.well-known/skills/code-review")
assert bundle is None
class TestCheckForSkillUpdates:
def test_bundle_content_hash_matches_installed_content_hash(self, tmp_path):
@@ -1143,6 +1167,61 @@ class TestQuarantineBundleBinaryAssets:
assert (q_path / "SKILL.md").read_text(encoding="utf-8").startswith("---")
assert (q_path / "assets" / "neutts-cli" / "samples" / "jo.wav").read_bytes() == b"RIFF\x00\x01fakewav"
def test_quarantine_bundle_rejects_traversal_file_paths(self, tmp_path):
import tools.skills_hub as hub
hub_dir = tmp_path / "skills" / ".hub"
with patch.object(hub, "SKILLS_DIR", tmp_path / "skills"), \
patch.object(hub, "HUB_DIR", hub_dir), \
patch.object(hub, "LOCK_FILE", hub_dir / "lock.json"), \
patch.object(hub, "QUARANTINE_DIR", hub_dir / "quarantine"), \
patch.object(hub, "AUDIT_LOG", hub_dir / "audit.log"), \
patch.object(hub, "TAPS_FILE", hub_dir / "taps.json"), \
patch.object(hub, "INDEX_CACHE_DIR", hub_dir / "index-cache"):
bundle = SkillBundle(
name="demo",
files={
"SKILL.md": "---\nname: demo\n---\n",
"../../../escape.txt": "owned",
},
source="well-known",
identifier="well-known:https://example.com/.well-known/skills/demo",
trust_level="community",
)
with pytest.raises(ValueError, match="Unsafe bundle file path"):
quarantine_bundle(bundle)
assert not (tmp_path / "skills" / "escape.txt").exists()
def test_quarantine_bundle_rejects_absolute_file_paths(self, tmp_path):
import tools.skills_hub as hub
hub_dir = tmp_path / "skills" / ".hub"
absolute_target = tmp_path / "outside.txt"
with patch.object(hub, "SKILLS_DIR", tmp_path / "skills"), \
patch.object(hub, "HUB_DIR", hub_dir), \
patch.object(hub, "LOCK_FILE", hub_dir / "lock.json"), \
patch.object(hub, "QUARANTINE_DIR", hub_dir / "quarantine"), \
patch.object(hub, "AUDIT_LOG", hub_dir / "audit.log"), \
patch.object(hub, "TAPS_FILE", hub_dir / "taps.json"), \
patch.object(hub, "INDEX_CACHE_DIR", hub_dir / "index-cache"):
bundle = SkillBundle(
name="demo",
files={
"SKILL.md": "---\nname: demo\n---\n",
str(absolute_target): "owned",
},
source="well-known",
identifier="well-known:https://example.com/.well-known/skills/demo",
trust_level="community",
)
with pytest.raises(ValueError, match="Unsafe bundle file path"):
quarantine_bundle(bundle)
assert not absolute_target.exists()
# ---------------------------------------------------------------------------
# GitHubSource._download_directory — tree API + fallback (#2940)

View File

@@ -24,7 +24,7 @@ import time
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from pathlib import Path, PurePosixPath
from hermes_constants import get_hermes_home
from typing import Any, Dict, List, Optional, Tuple, Union
from urllib.parse import urlparse, urlunparse
@@ -85,6 +85,43 @@ class SkillBundle:
metadata: Dict[str, Any] = field(default_factory=dict)
def _normalize_bundle_path(path_value: str, *, field_name: str, allow_nested: bool) -> str:
"""Normalize and validate bundle-controlled paths before touching disk."""
if not isinstance(path_value, str):
raise ValueError(f"Unsafe {field_name}: expected a string")
raw = path_value.strip()
if not raw:
raise ValueError(f"Unsafe {field_name}: empty path")
normalized = raw.replace("\\", "/")
path = PurePosixPath(normalized)
parts = [part for part in path.parts if part not in ("", ".")]
if normalized.startswith("/") or path.is_absolute():
raise ValueError(f"Unsafe {field_name}: {path_value}")
if not parts or any(part == ".." for part in parts):
raise ValueError(f"Unsafe {field_name}: {path_value}")
if re.fullmatch(r"[A-Za-z]:", parts[0]):
raise ValueError(f"Unsafe {field_name}: {path_value}")
if not allow_nested and len(parts) != 1:
raise ValueError(f"Unsafe {field_name}: {path_value}")
return "/".join(parts)
def _validate_skill_name(name: str) -> str:
return _normalize_bundle_path(name, field_name="skill name", allow_nested=False)
def _validate_category_name(category: str) -> str:
return _normalize_bundle_path(category, field_name="category", allow_nested=False)
def _validate_bundle_rel_path(rel_path: str) -> str:
return _normalize_bundle_path(rel_path, field_name="bundle file path", allow_nested=True)
# ---------------------------------------------------------------------------
# GitHub Authentication
# ---------------------------------------------------------------------------
@@ -701,6 +738,12 @@ class WellKnownSkillSource(SkillSource):
if not parsed:
return None
try:
skill_name = _validate_skill_name(parsed["skill_name"])
except ValueError:
logger.warning("Well-known skill identifier contained unsafe skill name: %s", identifier)
return None
entry = self._index_entry(parsed["index_url"], parsed["skill_name"])
if not entry:
return None
@@ -713,19 +756,28 @@ class WellKnownSkillSource(SkillSource):
for rel_path in files:
if not isinstance(rel_path, str) or not rel_path:
continue
text = self._fetch_text(f"{parsed['skill_url']}/{rel_path}")
try:
safe_rel_path = _validate_bundle_rel_path(rel_path)
except ValueError:
logger.warning(
"Well-known skill %s advertised unsafe file path: %r",
identifier,
rel_path,
)
return None
text = self._fetch_text(f"{parsed['skill_url']}/{safe_rel_path}")
if text is None:
return None
downloaded[rel_path] = text
downloaded[safe_rel_path] = text
if "SKILL.md" not in downloaded:
return None
return SkillBundle(
name=parsed["skill_name"],
name=skill_name,
files=downloaded,
source="well-known",
identifier=self._wrap_identifier(parsed["base_url"], parsed["skill_name"]),
identifier=self._wrap_identifier(parsed["base_url"], skill_name),
trust_level="community",
metadata={
"index_url": parsed["index_url"],
@@ -1752,9 +1804,10 @@ class ClawHubSource(SkillSource):
for info in zf.infolist():
if info.is_dir():
continue
# Sanitize path — strip leading slashes and ..
name = info.filename.lstrip("/")
if ".." in name or name.startswith("/"):
try:
name = _validate_bundle_rel_path(info.filename)
except ValueError:
logger.debug("Skipping unsafe ZIP member path: %s", info.filename)
continue
# Only extract text-sized files (skip large binaries)
if info.file_size > 500_000:
@@ -2423,13 +2476,19 @@ def ensure_hub_dirs() -> None:
def quarantine_bundle(bundle: SkillBundle) -> Path:
"""Write a skill bundle to the quarantine directory for scanning."""
ensure_hub_dirs()
dest = QUARANTINE_DIR / bundle.name
skill_name = _validate_skill_name(bundle.name)
validated_files: List[Tuple[str, Union[str, bytes]]] = []
for rel_path, file_content in bundle.files.items():
safe_rel_path = _validate_bundle_rel_path(rel_path)
validated_files.append((safe_rel_path, file_content))
dest = QUARANTINE_DIR / skill_name
if dest.exists():
shutil.rmtree(dest)
dest.mkdir(parents=True)
for rel_path, file_content in bundle.files.items():
file_dest = dest / rel_path
for rel_path, file_content in validated_files:
file_dest = dest.joinpath(*rel_path.split("/"))
file_dest.parent.mkdir(parents=True, exist_ok=True)
if isinstance(file_content, bytes):
file_dest.write_bytes(file_content)
@@ -2447,10 +2506,17 @@ def install_from_quarantine(
scan_result: ScanResult,
) -> Path:
"""Move a scanned skill from quarantine into the skills directory."""
if category:
install_dir = SKILLS_DIR / category / skill_name
safe_skill_name = _validate_skill_name(skill_name)
safe_category = _validate_category_name(category) if category else ""
quarantine_resolved = quarantine_path.resolve()
quarantine_root = QUARANTINE_DIR.resolve()
if not quarantine_resolved.is_relative_to(quarantine_root):
raise ValueError(f"Unsafe quarantine path: {quarantine_path}")
if safe_category:
install_dir = SKILLS_DIR / safe_category / safe_skill_name
else:
install_dir = SKILLS_DIR / skill_name
install_dir = SKILLS_DIR / safe_skill_name
if install_dir.exists():
shutil.rmtree(install_dir)
@@ -2461,7 +2527,7 @@ def install_from_quarantine(
# Record in lock file
lock = HubLockFile()
lock.record_install(
name=skill_name,
name=safe_skill_name,
source=bundle.source,
identifier=bundle.identifier,
trust_level=bundle.trust_level,
@@ -2473,7 +2539,7 @@ def install_from_quarantine(
)
append_audit_log(
"INSTALL", skill_name, bundle.source,
"INSTALL", safe_skill_name, bundle.source,
bundle.trust_level, scan_result.verdict,
content_hash(install_dir),
)