From 37825189dddcff5686ff5f3dab4025c7313e72a0 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Mon, 30 Mar 2026 08:37:19 -0700 Subject: [PATCH] fix(skills): validate hub bundle paths before install (#3986) Co-authored-by: Gutslabs --- hermes_cli/skills_hub.py | 19 ++++++- tests/tools/test_skills_hub.py | 79 +++++++++++++++++++++++++++ tools/skills_hub.py | 98 ++++++++++++++++++++++++++++------ 3 files changed, 178 insertions(+), 18 deletions(-) diff --git a/hermes_cli/skills_hub.py b/hermes_cli/skills_hub.py index 359e8b912..370b69ab0 100644 --- a/hermes_cli/skills_hub.py +++ b/hermes_cli/skills_hub.py @@ -354,7 +354,14 @@ def do_install(identifier: str, category: str = "", force: bool = False, extra_metadata.update(getattr(bundle, "metadata", {}) or {}) # Quarantine the bundle - q_path = quarantine_bundle(bundle) + try: + q_path = quarantine_bundle(bundle) + except ValueError as exc: + c.print(f"[bold red]Installation blocked:[/] {exc}\n") + from tools.skills_hub import append_audit_log + append_audit_log("BLOCKED", bundle.name, bundle.source, + bundle.trust_level, "invalid_path", str(exc)) + return c.print(f"[dim]Quarantined to {q_path.relative_to(q_path.parent.parent.parent)}[/]") # Scan @@ -414,7 +421,15 @@ def do_install(identifier: str, category: str = "", force: bool = False, return # Install - install_dir = install_from_quarantine(q_path, bundle.name, category, bundle, result) + try: + install_dir = install_from_quarantine(q_path, bundle.name, category, bundle, result) + except ValueError as exc: + c.print(f"[bold red]Installation blocked:[/] {exc}\n") + shutil.rmtree(q_path, ignore_errors=True) + from tools.skills_hub import append_audit_log + append_audit_log("BLOCKED", bundle.name, bundle.source, + bundle.trust_level, "invalid_path", str(exc)) + return from tools.skills_hub import SKILLS_DIR c.print(f"[bold green]Installed:[/] {install_dir.relative_to(SKILLS_DIR)}") c.print(f"[dim]Files: {', '.join(bundle.files.keys())}[/]\n") diff --git a/tests/tools/test_skills_hub.py b/tests/tools/test_skills_hub.py index a55a91e00..58e035469 100644 --- a/tests/tools/test_skills_hub.py +++ b/tests/tools/test_skills_hub.py @@ -5,6 +5,7 @@ from pathlib import Path from unittest.mock import patch, MagicMock import httpx +import pytest from tools.skills_hub import ( GitHubAuth, @@ -648,6 +649,29 @@ class TestWellKnownSkillSource: assert bundle.files["SKILL.md"] == "# Code Review\n" assert bundle.files["references/checklist.md"] == "- [ ] security\n" + @patch("tools.skills_hub._write_index_cache") + @patch("tools.skills_hub._read_index_cache", return_value=None) + @patch("tools.skills_hub.httpx.get") + def test_fetch_rejects_unsafe_file_paths_from_well_known_endpoint(self, mock_get, _mock_read_cache, _mock_write_cache): + def fake_get(url, *args, **kwargs): + if url.endswith("/index.json"): + return MagicMock(status_code=200, json=lambda: { + "skills": [{ + "name": "code-review", + "description": "Review code", + "files": ["SKILL.md", "../../../escape.txt"], + }] + }) + if url.endswith("/code-review/SKILL.md"): + return MagicMock(status_code=200, text="# Code Review\n") + raise AssertionError(url) + + mock_get.side_effect = fake_get + + bundle = self._source().fetch("well-known:https://example.com/.well-known/skills/code-review") + + assert bundle is None + class TestCheckForSkillUpdates: def test_bundle_content_hash_matches_installed_content_hash(self, tmp_path): @@ -1143,6 +1167,61 @@ class TestQuarantineBundleBinaryAssets: assert (q_path / "SKILL.md").read_text(encoding="utf-8").startswith("---") assert (q_path / "assets" / "neutts-cli" / "samples" / "jo.wav").read_bytes() == b"RIFF\x00\x01fakewav" + def test_quarantine_bundle_rejects_traversal_file_paths(self, tmp_path): + import tools.skills_hub as hub + + hub_dir = tmp_path / "skills" / ".hub" + with patch.object(hub, "SKILLS_DIR", tmp_path / "skills"), \ + patch.object(hub, "HUB_DIR", hub_dir), \ + patch.object(hub, "LOCK_FILE", hub_dir / "lock.json"), \ + patch.object(hub, "QUARANTINE_DIR", hub_dir / "quarantine"), \ + patch.object(hub, "AUDIT_LOG", hub_dir / "audit.log"), \ + patch.object(hub, "TAPS_FILE", hub_dir / "taps.json"), \ + patch.object(hub, "INDEX_CACHE_DIR", hub_dir / "index-cache"): + bundle = SkillBundle( + name="demo", + files={ + "SKILL.md": "---\nname: demo\n---\n", + "../../../escape.txt": "owned", + }, + source="well-known", + identifier="well-known:https://example.com/.well-known/skills/demo", + trust_level="community", + ) + + with pytest.raises(ValueError, match="Unsafe bundle file path"): + quarantine_bundle(bundle) + + assert not (tmp_path / "skills" / "escape.txt").exists() + + def test_quarantine_bundle_rejects_absolute_file_paths(self, tmp_path): + import tools.skills_hub as hub + + hub_dir = tmp_path / "skills" / ".hub" + absolute_target = tmp_path / "outside.txt" + with patch.object(hub, "SKILLS_DIR", tmp_path / "skills"), \ + patch.object(hub, "HUB_DIR", hub_dir), \ + patch.object(hub, "LOCK_FILE", hub_dir / "lock.json"), \ + patch.object(hub, "QUARANTINE_DIR", hub_dir / "quarantine"), \ + patch.object(hub, "AUDIT_LOG", hub_dir / "audit.log"), \ + patch.object(hub, "TAPS_FILE", hub_dir / "taps.json"), \ + patch.object(hub, "INDEX_CACHE_DIR", hub_dir / "index-cache"): + bundle = SkillBundle( + name="demo", + files={ + "SKILL.md": "---\nname: demo\n---\n", + str(absolute_target): "owned", + }, + source="well-known", + identifier="well-known:https://example.com/.well-known/skills/demo", + trust_level="community", + ) + + with pytest.raises(ValueError, match="Unsafe bundle file path"): + quarantine_bundle(bundle) + + assert not absolute_target.exists() + # --------------------------------------------------------------------------- # GitHubSource._download_directory — tree API + fallback (#2940) diff --git a/tools/skills_hub.py b/tools/skills_hub.py index 86f8e47d1..a824c3e3b 100644 --- a/tools/skills_hub.py +++ b/tools/skills_hub.py @@ -24,7 +24,7 @@ import time from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime, timezone -from pathlib import Path +from pathlib import Path, PurePosixPath from hermes_constants import get_hermes_home from typing import Any, Dict, List, Optional, Tuple, Union from urllib.parse import urlparse, urlunparse @@ -85,6 +85,43 @@ class SkillBundle: metadata: Dict[str, Any] = field(default_factory=dict) +def _normalize_bundle_path(path_value: str, *, field_name: str, allow_nested: bool) -> str: + """Normalize and validate bundle-controlled paths before touching disk.""" + if not isinstance(path_value, str): + raise ValueError(f"Unsafe {field_name}: expected a string") + + raw = path_value.strip() + if not raw: + raise ValueError(f"Unsafe {field_name}: empty path") + + normalized = raw.replace("\\", "/") + path = PurePosixPath(normalized) + parts = [part for part in path.parts if part not in ("", ".")] + + if normalized.startswith("/") or path.is_absolute(): + raise ValueError(f"Unsafe {field_name}: {path_value}") + if not parts or any(part == ".." for part in parts): + raise ValueError(f"Unsafe {field_name}: {path_value}") + if re.fullmatch(r"[A-Za-z]:", parts[0]): + raise ValueError(f"Unsafe {field_name}: {path_value}") + if not allow_nested and len(parts) != 1: + raise ValueError(f"Unsafe {field_name}: {path_value}") + + return "/".join(parts) + + +def _validate_skill_name(name: str) -> str: + return _normalize_bundle_path(name, field_name="skill name", allow_nested=False) + + +def _validate_category_name(category: str) -> str: + return _normalize_bundle_path(category, field_name="category", allow_nested=False) + + +def _validate_bundle_rel_path(rel_path: str) -> str: + return _normalize_bundle_path(rel_path, field_name="bundle file path", allow_nested=True) + + # --------------------------------------------------------------------------- # GitHub Authentication # --------------------------------------------------------------------------- @@ -701,6 +738,12 @@ class WellKnownSkillSource(SkillSource): if not parsed: return None + try: + skill_name = _validate_skill_name(parsed["skill_name"]) + except ValueError: + logger.warning("Well-known skill identifier contained unsafe skill name: %s", identifier) + return None + entry = self._index_entry(parsed["index_url"], parsed["skill_name"]) if not entry: return None @@ -713,19 +756,28 @@ class WellKnownSkillSource(SkillSource): for rel_path in files: if not isinstance(rel_path, str) or not rel_path: continue - text = self._fetch_text(f"{parsed['skill_url']}/{rel_path}") + try: + safe_rel_path = _validate_bundle_rel_path(rel_path) + except ValueError: + logger.warning( + "Well-known skill %s advertised unsafe file path: %r", + identifier, + rel_path, + ) + return None + text = self._fetch_text(f"{parsed['skill_url']}/{safe_rel_path}") if text is None: return None - downloaded[rel_path] = text + downloaded[safe_rel_path] = text if "SKILL.md" not in downloaded: return None return SkillBundle( - name=parsed["skill_name"], + name=skill_name, files=downloaded, source="well-known", - identifier=self._wrap_identifier(parsed["base_url"], parsed["skill_name"]), + identifier=self._wrap_identifier(parsed["base_url"], skill_name), trust_level="community", metadata={ "index_url": parsed["index_url"], @@ -1752,9 +1804,10 @@ class ClawHubSource(SkillSource): for info in zf.infolist(): if info.is_dir(): continue - # Sanitize path — strip leading slashes and .. - name = info.filename.lstrip("/") - if ".." in name or name.startswith("/"): + try: + name = _validate_bundle_rel_path(info.filename) + except ValueError: + logger.debug("Skipping unsafe ZIP member path: %s", info.filename) continue # Only extract text-sized files (skip large binaries) if info.file_size > 500_000: @@ -2423,13 +2476,19 @@ def ensure_hub_dirs() -> None: def quarantine_bundle(bundle: SkillBundle) -> Path: """Write a skill bundle to the quarantine directory for scanning.""" ensure_hub_dirs() - dest = QUARANTINE_DIR / bundle.name + skill_name = _validate_skill_name(bundle.name) + validated_files: List[Tuple[str, Union[str, bytes]]] = [] + for rel_path, file_content in bundle.files.items(): + safe_rel_path = _validate_bundle_rel_path(rel_path) + validated_files.append((safe_rel_path, file_content)) + + dest = QUARANTINE_DIR / skill_name if dest.exists(): shutil.rmtree(dest) dest.mkdir(parents=True) - for rel_path, file_content in bundle.files.items(): - file_dest = dest / rel_path + for rel_path, file_content in validated_files: + file_dest = dest.joinpath(*rel_path.split("/")) file_dest.parent.mkdir(parents=True, exist_ok=True) if isinstance(file_content, bytes): file_dest.write_bytes(file_content) @@ -2447,10 +2506,17 @@ def install_from_quarantine( scan_result: ScanResult, ) -> Path: """Move a scanned skill from quarantine into the skills directory.""" - if category: - install_dir = SKILLS_DIR / category / skill_name + safe_skill_name = _validate_skill_name(skill_name) + safe_category = _validate_category_name(category) if category else "" + quarantine_resolved = quarantine_path.resolve() + quarantine_root = QUARANTINE_DIR.resolve() + if not quarantine_resolved.is_relative_to(quarantine_root): + raise ValueError(f"Unsafe quarantine path: {quarantine_path}") + + if safe_category: + install_dir = SKILLS_DIR / safe_category / safe_skill_name else: - install_dir = SKILLS_DIR / skill_name + install_dir = SKILLS_DIR / safe_skill_name if install_dir.exists(): shutil.rmtree(install_dir) @@ -2461,7 +2527,7 @@ def install_from_quarantine( # Record in lock file lock = HubLockFile() lock.record_install( - name=skill_name, + name=safe_skill_name, source=bundle.source, identifier=bundle.identifier, trust_level=bundle.trust_level, @@ -2473,7 +2539,7 @@ def install_from_quarantine( ) append_audit_log( - "INSTALL", skill_name, bundle.source, + "INSTALL", safe_skill_name, bundle.source, bundle.trust_level, scan_result.verdict, content_hash(install_dir), )