diff --git a/agent/prompt_builder.py b/agent/prompt_builder.py index 6ed6e90a7..29e2c22f9 100644 --- a/agent/prompt_builder.py +++ b/agent/prompt_builder.py @@ -4,14 +4,27 @@ All functions are stateless. AIAgent._build_system_prompt() calls these to assemble pieces, then combines them with memory and ephemeral prompts. """ +import json import logging import os import re +import threading +from collections import OrderedDict from pathlib import Path from hermes_constants import get_hermes_home from typing import Optional +from agent.skill_utils import ( + extract_skill_conditions, + extract_skill_description, + get_disabled_skill_names, + iter_skill_index_files, + parse_frontmatter, + skill_matches_platform, +) +from utils import atomic_json_write + logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- @@ -230,6 +243,111 @@ CONTEXT_TRUNCATE_HEAD_RATIO = 0.7 CONTEXT_TRUNCATE_TAIL_RATIO = 0.2 +# ========================================================================= +# Skills prompt cache +# ========================================================================= + +_SKILLS_PROMPT_CACHE_MAX = 8 +_SKILLS_PROMPT_CACHE: OrderedDict[tuple, str] = OrderedDict() +_SKILLS_PROMPT_CACHE_LOCK = threading.Lock() +_SKILLS_SNAPSHOT_VERSION = 1 + + +def _skills_prompt_snapshot_path() -> Path: + return get_hermes_home() / ".skills_prompt_snapshot.json" + + +def clear_skills_system_prompt_cache(*, clear_snapshot: bool = False) -> None: + """Drop the in-process skills prompt cache (and optionally the disk snapshot).""" + with _SKILLS_PROMPT_CACHE_LOCK: + _SKILLS_PROMPT_CACHE.clear() + if clear_snapshot: + try: + _skills_prompt_snapshot_path().unlink(missing_ok=True) + except OSError as e: + logger.debug("Could not remove skills prompt snapshot: %s", e) + + +def _build_skills_manifest(skills_dir: Path) -> dict[str, list[int]]: + """Build an mtime/size manifest of all SKILL.md and DESCRIPTION.md files.""" + manifest: dict[str, list[int]] = {} + for filename in ("SKILL.md", "DESCRIPTION.md"): + for path in iter_skill_index_files(skills_dir, filename): + try: + st = path.stat() + except OSError: + continue + manifest[str(path.relative_to(skills_dir))] = [st.st_mtime_ns, st.st_size] + return manifest + + +def _load_skills_snapshot(skills_dir: Path) -> Optional[dict]: + """Load the disk snapshot if it exists and its manifest still matches.""" + snapshot_path = _skills_prompt_snapshot_path() + if not snapshot_path.exists(): + return None + try: + snapshot = json.loads(snapshot_path.read_text(encoding="utf-8")) + except Exception: + return None + if not isinstance(snapshot, dict): + return None + if snapshot.get("version") != _SKILLS_SNAPSHOT_VERSION: + return None + if snapshot.get("manifest") != _build_skills_manifest(skills_dir): + return None + return snapshot + + +def _write_skills_snapshot( + skills_dir: Path, + manifest: dict[str, list[int]], + skill_entries: list[dict], + category_descriptions: dict[str, str], +) -> None: + """Persist skill metadata to disk for fast cold-start reuse.""" + payload = { + "version": _SKILLS_SNAPSHOT_VERSION, + "manifest": manifest, + "skills": skill_entries, + "category_descriptions": category_descriptions, + } + try: + atomic_json_write(_skills_prompt_snapshot_path(), payload) + except Exception as e: + logger.debug("Could not write skills prompt snapshot: %s", e) + + +def _build_snapshot_entry( + skill_file: Path, + skills_dir: Path, + frontmatter: dict, + description: str, +) -> dict: + """Build a serialisable metadata dict for one skill.""" + rel_path = skill_file.relative_to(skills_dir) + parts = rel_path.parts + if len(parts) >= 2: + skill_name = parts[-2] + category = "/".join(parts[:-2]) if len(parts) > 2 else parts[0] + else: + category = "general" + skill_name = skill_file.parent.name + + platforms = frontmatter.get("platforms") or [] + if isinstance(platforms, str): + platforms = [platforms] + + return { + "skill_name": skill_name, + "category": category, + "frontmatter_name": str(frontmatter.get("name", skill_name)), + "description": description, + "platforms": [str(p).strip() for p in platforms if str(p).strip()], + "conditions": extract_skill_conditions(frontmatter), + } + + # ========================================================================= # Skills index # ========================================================================= @@ -241,22 +359,13 @@ def _parse_skill_file(skill_file: Path) -> tuple[bool, dict, str]: (True, {}, "") to err on the side of showing the skill. """ try: - from tools.skills_tool import _parse_frontmatter, skill_matches_platform - raw = skill_file.read_text(encoding="utf-8")[:2000] - frontmatter, _ = _parse_frontmatter(raw) + frontmatter, _ = parse_frontmatter(raw) if not skill_matches_platform(frontmatter): - return False, {}, "" + return False, frontmatter, "" - desc = "" - raw_desc = frontmatter.get("description", "") - if raw_desc: - desc = str(raw_desc).strip().strip("'\"") - if len(desc) > 60: - desc = desc[:57] + "..." - - return True, frontmatter, desc + return True, frontmatter, extract_skill_description(frontmatter) except Exception as e: logger.debug("Failed to parse skill file %s: %s", skill_file, e) return True, {}, "" @@ -265,16 +374,9 @@ def _parse_skill_file(skill_file: Path) -> tuple[bool, dict, str]: def _read_skill_conditions(skill_file: Path) -> dict: """Extract conditional activation fields from SKILL.md frontmatter.""" try: - from tools.skills_tool import _parse_frontmatter raw = skill_file.read_text(encoding="utf-8")[:2000] - frontmatter, _ = _parse_frontmatter(raw) - hermes = frontmatter.get("metadata", {}).get("hermes", {}) - return { - "fallback_for_toolsets": hermes.get("fallback_for_toolsets", []), - "requires_toolsets": hermes.get("requires_toolsets", []), - "fallback_for_tools": hermes.get("fallback_for_tools", []), - "requires_tools": hermes.get("requires_tools", []), - } + frontmatter, _ = parse_frontmatter(raw) + return extract_skill_conditions(frontmatter) except Exception as e: logger.debug("Failed to read skill conditions from %s: %s", skill_file, e) return {} @@ -317,10 +419,12 @@ def build_skills_system_prompt( ) -> str: """Build a compact skill index for the system prompt. - Scans ~/.hermes/skills/ for SKILL.md files grouped by category. - Includes per-skill descriptions from frontmatter so the model can - match skills by meaning, not just name. - Filters out skills incompatible with the current OS platform. + Two-layer cache: + 1. In-process LRU dict keyed by (skills_dir, tools, toolsets) + 2. Disk snapshot (``.skills_prompt_snapshot.json``) validated by + mtime/size manifest — survives process restarts + + Falls back to a full filesystem scan when both layers miss. """ hermes_home = get_hermes_home() skills_dir = hermes_home / "skills" @@ -328,98 +432,140 @@ def build_skills_system_prompt( if not skills_dir.exists(): return "" - # Collect skills with descriptions, grouped by category. - # Each entry: (skill_name, description) - # Supports sub-categories: skills/mlops/training/axolotl/SKILL.md - # -> category "mlops/training", skill "axolotl" - # Load disabled skill names once for the entire scan - try: - from tools.skills_tool import _get_disabled_skill_names - disabled = _get_disabled_skill_names() - except Exception: - disabled = set() + # ── Layer 1: in-process LRU cache ───────────────────────────────── + cache_key = ( + str(skills_dir.resolve()), + tuple(sorted(str(t) for t in (available_tools or set()))), + tuple(sorted(str(ts) for ts in (available_toolsets or set()))), + ) + with _SKILLS_PROMPT_CACHE_LOCK: + cached = _SKILLS_PROMPT_CACHE.get(cache_key) + if cached is not None: + _SKILLS_PROMPT_CACHE.move_to_end(cache_key) + return cached + + disabled = get_disabled_skill_names() + + # ── Layer 2: disk snapshot ──────────────────────────────────────── + snapshot = _load_skills_snapshot(skills_dir) skills_by_category: dict[str, list[tuple[str, str]]] = {} - for skill_file in skills_dir.rglob("SKILL.md"): - is_compatible, frontmatter, desc = _parse_skill_file(skill_file) - if not is_compatible: - continue - rel_path = skill_file.relative_to(skills_dir) - parts = rel_path.parts - if len(parts) >= 2: - skill_name = parts[-2] - category = "/".join(parts[:-2]) if len(parts) > 2 else parts[0] - else: - category = "general" - skill_name = skill_file.parent.name - # Respect user's disabled skills config - fm_name = frontmatter.get("name", skill_name) - if fm_name in disabled or skill_name in disabled: - continue - # Extract conditions inline from already-parsed frontmatter - # (avoids redundant file re-read that _read_skill_conditions would do) - hermes_meta = (frontmatter.get("metadata") or {}).get("hermes") or {} - conditions = { - "fallback_for_toolsets": hermes_meta.get("fallback_for_toolsets", []), - "requires_toolsets": hermes_meta.get("requires_toolsets", []), - "fallback_for_tools": hermes_meta.get("fallback_for_tools", []), - "requires_tools": hermes_meta.get("requires_tools", []), + category_descriptions: dict[str, str] = {} + + if snapshot is not None: + # Fast path: use pre-parsed metadata from disk + for entry in snapshot.get("skills", []): + if not isinstance(entry, dict): + continue + skill_name = entry.get("skill_name") or "" + category = entry.get("category") or "general" + frontmatter_name = entry.get("frontmatter_name") or skill_name + platforms = entry.get("platforms") or [] + if not skill_matches_platform({"platforms": platforms}): + continue + if frontmatter_name in disabled or skill_name in disabled: + continue + if not _skill_should_show( + entry.get("conditions") or {}, + available_tools, + available_toolsets, + ): + continue + skills_by_category.setdefault(category, []).append( + (skill_name, entry.get("description", "")) + ) + category_descriptions = { + str(k): str(v) + for k, v in (snapshot.get("category_descriptions") or {}).items() } - if not _skill_should_show(conditions, available_tools, available_toolsets): - continue - skills_by_category.setdefault(category, []).append((skill_name, desc)) + else: + # Cold path: full filesystem scan + write snapshot for next time + skill_entries: list[dict] = [] + for skill_file in iter_skill_index_files(skills_dir, "SKILL.md"): + is_compatible, frontmatter, desc = _parse_skill_file(skill_file) + entry = _build_snapshot_entry(skill_file, skills_dir, frontmatter, desc) + skill_entries.append(entry) + if not is_compatible: + continue + skill_name = entry["skill_name"] + if entry["frontmatter_name"] in disabled or skill_name in disabled: + continue + if not _skill_should_show( + extract_skill_conditions(frontmatter), + available_tools, + available_toolsets, + ): + continue + skills_by_category.setdefault(entry["category"], []).append( + (skill_name, entry["description"]) + ) - if not skills_by_category: - return "" - - # Read category-level descriptions from DESCRIPTION.md - # Checks both the exact category path and parent directories - category_descriptions = {} - for category in skills_by_category: - cat_path = Path(category) - desc_file = skills_dir / cat_path / "DESCRIPTION.md" - if desc_file.exists(): + # Read category-level DESCRIPTION.md files + for desc_file in iter_skill_index_files(skills_dir, "DESCRIPTION.md"): try: content = desc_file.read_text(encoding="utf-8") - match = re.search(r"^---\s*\n.*?description:\s*(.+?)\s*\n.*?^---", content, re.MULTILINE | re.DOTALL) - if match: - category_descriptions[category] = match.group(1).strip() + fm, _ = parse_frontmatter(content) + cat_desc = fm.get("description") + if not cat_desc: + continue + rel = desc_file.relative_to(skills_dir) + cat = "/".join(rel.parts[:-1]) if len(rel.parts) > 1 else "general" + category_descriptions[cat] = str(cat_desc).strip().strip("'\"") except Exception as e: logger.debug("Could not read skill description %s: %s", desc_file, e) - index_lines = [] - for category in sorted(skills_by_category.keys()): - cat_desc = category_descriptions.get(category, "") - if cat_desc: - index_lines.append(f" {category}: {cat_desc}") - else: - index_lines.append(f" {category}:") - # Deduplicate and sort skills within each category - seen = set() - for name, desc in sorted(skills_by_category[category], key=lambda x: x[0]): - if name in seen: - continue - seen.add(name) - if desc: - index_lines.append(f" - {name}: {desc}") - else: - index_lines.append(f" - {name}") + _write_skills_snapshot( + skills_dir, + _build_skills_manifest(skills_dir), + skill_entries, + category_descriptions, + ) - return ( - "## Skills (mandatory)\n" - "Before replying, scan the skills below. If one clearly matches your task, " - "load it with skill_view(name) and follow its instructions. " - "If a skill has issues, fix it with skill_manage(action='patch').\n" - "After difficult/iterative tasks, offer to save as a skill. " - "If a skill you loaded was missing steps, had wrong commands, or needed " - "pitfalls you discovered, update it before finishing.\n" - "\n" - "\n" - + "\n".join(index_lines) + "\n" - "\n" - "\n" - "If none match, proceed normally without loading a skill." - ) + if not skills_by_category: + result = "" + else: + index_lines = [] + for category in sorted(skills_by_category.keys()): + cat_desc = category_descriptions.get(category, "") + if cat_desc: + index_lines.append(f" {category}: {cat_desc}") + else: + index_lines.append(f" {category}:") + # Deduplicate and sort skills within each category + seen = set() + for name, desc in sorted(skills_by_category[category], key=lambda x: x[0]): + if name in seen: + continue + seen.add(name) + if desc: + index_lines.append(f" - {name}: {desc}") + else: + index_lines.append(f" - {name}") + + result = ( + "## Skills (mandatory)\n" + "Before replying, scan the skills below. If one clearly matches your task, " + "load it with skill_view(name) and follow its instructions. " + "If a skill has issues, fix it with skill_manage(action='patch').\n" + "After difficult/iterative tasks, offer to save as a skill. " + "If a skill you loaded was missing steps, had wrong commands, or needed " + "pitfalls you discovered, update it before finishing.\n" + "\n" + "\n" + + "\n".join(index_lines) + "\n" + "\n" + "\n" + "If none match, proceed normally without loading a skill." + ) + + # ── Store in LRU cache ──────────────────────────────────────────── + with _SKILLS_PROMPT_CACHE_LOCK: + _SKILLS_PROMPT_CACHE[cache_key] = result + _SKILLS_PROMPT_CACHE.move_to_end(cache_key) + while len(_SKILLS_PROMPT_CACHE) > _SKILLS_PROMPT_CACHE_MAX: + _SKILLS_PROMPT_CACHE.popitem(last=False) + + return result # ========================================================================= diff --git a/agent/skill_utils.py b/agent/skill_utils.py new file mode 100644 index 000000000..5cb2a7105 --- /dev/null +++ b/agent/skill_utils.py @@ -0,0 +1,203 @@ +"""Lightweight skill metadata utilities shared by prompt_builder and skills_tool. + +This module intentionally avoids importing the tool registry, CLI config, or any +heavy dependency chain. It is safe to import at module level without triggering +tool registration or provider resolution. +""" + +import logging +import os +import re +import sys +from pathlib import Path +from typing import Any, Dict, List, Optional, Set, Tuple + +from hermes_constants import get_hermes_home + +logger = logging.getLogger(__name__) + +# ── Platform mapping ────────────────────────────────────────────────────── + +PLATFORM_MAP = { + "macos": "darwin", + "linux": "linux", + "windows": "win32", +} + +EXCLUDED_SKILL_DIRS = frozenset((".git", ".github", ".hub")) + +# ── Lazy YAML loader ───────────────────────────────────────────────────── + +_yaml_load_fn = None + + +def yaml_load(content: str): + """Parse YAML with lazy import and CSafeLoader preference.""" + global _yaml_load_fn + if _yaml_load_fn is None: + import yaml + + loader = getattr(yaml, "CSafeLoader", None) or yaml.SafeLoader + + def _load(value: str): + return yaml.load(value, Loader=loader) + + _yaml_load_fn = _load + return _yaml_load_fn(content) + + +# ── Frontmatter parsing ────────────────────────────────────────────────── + + +def parse_frontmatter(content: str) -> Tuple[Dict[str, Any], str]: + """Parse YAML frontmatter from a markdown string. + + Uses yaml with CSafeLoader for full YAML support (nested metadata, lists) + with a fallback to simple key:value splitting for robustness. + + Returns: + (frontmatter_dict, remaining_body) + """ + frontmatter: Dict[str, Any] = {} + body = content + + if not content.startswith("---"): + return frontmatter, body + + end_match = re.search(r"\n---\s*\n", content[3:]) + if not end_match: + return frontmatter, body + + yaml_content = content[3 : end_match.start() + 3] + body = content[end_match.end() + 3 :] + + try: + parsed = yaml_load(yaml_content) + if isinstance(parsed, dict): + frontmatter = parsed + except Exception: + # Fallback: simple key:value parsing for malformed YAML + for line in yaml_content.strip().split("\n"): + if ":" not in line: + continue + key, value = line.split(":", 1) + frontmatter[key.strip()] = value.strip() + + return frontmatter, body + + +# ── Platform matching ───────────────────────────────────────────────────── + + +def skill_matches_platform(frontmatter: Dict[str, Any]) -> bool: + """Return True when the skill is compatible with the current OS. + + Skills declare platform requirements via a top-level ``platforms`` list + in their YAML frontmatter:: + + platforms: [macos] # macOS only + platforms: [macos, linux] # macOS and Linux + + If the field is absent or empty the skill is compatible with **all** + platforms (backward-compatible default). + """ + platforms = frontmatter.get("platforms") + if not platforms: + return True + if not isinstance(platforms, list): + platforms = [platforms] + current = sys.platform + for platform in platforms: + normalized = str(platform).lower().strip() + mapped = PLATFORM_MAP.get(normalized, normalized) + if current.startswith(mapped): + return True + return False + + +# ── Disabled skills ─────────────────────────────────────────────────────── + + +def get_disabled_skill_names() -> Set[str]: + """Read disabled skill names from config.yaml. + + Resolves platform from ``HERMES_PLATFORM`` env var, falls back to + the global disabled list. Reads the config file directly (no CLI + config imports) to stay lightweight. + """ + config_path = get_hermes_home() / "config.yaml" + if not config_path.exists(): + return set() + try: + parsed = yaml_load(config_path.read_text(encoding="utf-8")) + except Exception as e: + logger.debug("Could not read skill config %s: %s", config_path, e) + return set() + if not isinstance(parsed, dict): + return set() + + skills_cfg = parsed.get("skills") + if not isinstance(skills_cfg, dict): + return set() + + resolved_platform = os.getenv("HERMES_PLATFORM") + if resolved_platform: + platform_disabled = (skills_cfg.get("platform_disabled") or {}).get( + resolved_platform + ) + if platform_disabled is not None: + return _normalize_string_set(platform_disabled) + return _normalize_string_set(skills_cfg.get("disabled")) + + +def _normalize_string_set(values) -> Set[str]: + if values is None: + return set() + if isinstance(values, str): + values = [values] + return {str(v).strip() for v in values if str(v).strip()} + + +# ── Condition extraction ────────────────────────────────────────────────── + + +def extract_skill_conditions(frontmatter: Dict[str, Any]) -> Dict[str, List]: + """Extract conditional activation fields from parsed frontmatter.""" + hermes = (frontmatter.get("metadata") or {}).get("hermes") or {} + return { + "fallback_for_toolsets": hermes.get("fallback_for_toolsets", []), + "requires_toolsets": hermes.get("requires_toolsets", []), + "fallback_for_tools": hermes.get("fallback_for_tools", []), + "requires_tools": hermes.get("requires_tools", []), + } + + +# ── Description extraction ──────────────────────────────────────────────── + + +def extract_skill_description(frontmatter: Dict[str, Any]) -> str: + """Extract a truncated description from parsed frontmatter.""" + raw_desc = frontmatter.get("description", "") + if not raw_desc: + return "" + desc = str(raw_desc).strip().strip("'\"") + if len(desc) > 60: + return desc[:57] + "..." + return desc + + +# ── File iteration ──────────────────────────────────────────────────────── + + +def iter_skill_index_files(skills_dir: Path, filename: str): + """Walk skills_dir yielding sorted paths matching *filename*. + + Excludes ``.git``, ``.github``, ``.hub`` directories. + """ + matches = [] + for root, dirs, files in os.walk(skills_dir): + dirs[:] = [d for d in dirs if d not in EXCLUDED_SKILL_DIRS] + if filename in files: + matches.append(Path(root) / filename) + for path in sorted(matches, key=lambda p: str(p.relative_to(skills_dir))): + yield path diff --git a/hermes_cli/skills_hub.py b/hermes_cli/skills_hub.py index a36ee78ce..62f91ce9a 100644 --- a/hermes_cli/skills_hub.py +++ b/hermes_cli/skills_hub.py @@ -417,6 +417,13 @@ def do_install(identifier: str, category: str = "", force: bool = False, c.print(f"[bold green]Installed:[/] {install_dir.relative_to(SKILLS_DIR)}") c.print(f"[dim]Files: {', '.join(bundle.files.keys())}[/]\n") + # Invalidate the skills prompt cache so the new skill appears immediately + try: + from agent.prompt_builder import clear_skills_system_prompt_cache + clear_skills_system_prompt_cache(clear_snapshot=True) + except Exception: + pass + def do_inspect(identifier: str, console: Optional[Console] = None) -> None: """Preview a skill's SKILL.md content without installing.""" @@ -623,6 +630,11 @@ def do_uninstall(name: str, console: Optional[Console] = None, success, msg = uninstall_skill(name) if success: c.print(f"[bold green]{msg}[/]\n") + try: + from agent.prompt_builder import clear_skills_system_prompt_cache + clear_skills_system_prompt_cache(clear_snapshot=True) + except Exception: + pass else: c.print(f"[bold red]Error:[/] {msg}\n") diff --git a/tests/agent/test_prompt_builder.py b/tests/agent/test_prompt_builder.py index e2ad5a780..ecbc0892b 100644 --- a/tests/agent/test_prompt_builder.py +++ b/tests/agent/test_prompt_builder.py @@ -232,7 +232,18 @@ class TestPromptBuilderImports: # ========================================================================= +import pytest + + class TestBuildSkillsSystemPrompt: + @pytest.fixture(autouse=True) + def _clear_skills_cache(self): + """Ensure the in-process skills prompt cache doesn't leak between tests.""" + from agent.prompt_builder import clear_skills_system_prompt_cache + clear_skills_system_prompt_cache(clear_snapshot=True) + yield + clear_skills_system_prompt_cache(clear_snapshot=True) + def test_empty_when_no_skills_dir(self, monkeypatch, tmp_path): monkeypatch.setenv("HERMES_HOME", str(tmp_path)) result = build_skills_system_prompt() @@ -302,7 +313,7 @@ class TestBuildSkillsSystemPrompt: from unittest.mock import patch - with patch("tools.skills_tool.sys") as mock_sys: + with patch("agent.skill_utils.sys") as mock_sys: mock_sys.platform = "darwin" result = build_skills_system_prompt() @@ -330,7 +341,7 @@ class TestBuildSkillsSystemPrompt: from unittest.mock import patch with patch( - "tools.skills_tool._get_disabled_skill_names", + "agent.prompt_builder.get_disabled_skill_names", return_value={"old-tool"}, ): result = build_skills_system_prompt() @@ -804,6 +815,13 @@ class TestSkillShouldShow: class TestBuildSkillsSystemPromptConditional: + @pytest.fixture(autouse=True) + def _clear_skills_cache(self): + from agent.prompt_builder import clear_skills_system_prompt_cache + clear_skills_system_prompt_cache(clear_snapshot=True) + yield + clear_skills_system_prompt_cache(clear_snapshot=True) + def test_fallback_skill_hidden_when_primary_available(self, monkeypatch, tmp_path): monkeypatch.setenv("HERMES_HOME", str(tmp_path)) skill_dir = tmp_path / "skills" / "search" / "duckduckgo" diff --git a/tests/agent/test_skill_commands.py b/tests/agent/test_skill_commands.py index f6a114db6..1f18d9bf4 100644 --- a/tests/agent/test_skill_commands.py +++ b/tests/agent/test_skill_commands.py @@ -54,7 +54,7 @@ class TestScanSkillCommands: """macOS-only skills should not register slash commands on Linux.""" with ( patch("tools.skills_tool.SKILLS_DIR", tmp_path), - patch("tools.skills_tool.sys") as mock_sys, + patch("agent.skill_utils.sys") as mock_sys, ): mock_sys.platform = "linux" _make_skill(tmp_path, "imessage", frontmatter_extra="platforms: [macos]\n") @@ -67,7 +67,7 @@ class TestScanSkillCommands: """macOS-only skills should register slash commands on macOS.""" with ( patch("tools.skills_tool.SKILLS_DIR", tmp_path), - patch("tools.skills_tool.sys") as mock_sys, + patch("agent.skill_utils.sys") as mock_sys, ): mock_sys.platform = "darwin" _make_skill(tmp_path, "imessage", frontmatter_extra="platforms: [macos]\n") @@ -78,7 +78,7 @@ class TestScanSkillCommands: """Skills without platforms field should register on any platform.""" with ( patch("tools.skills_tool.SKILLS_DIR", tmp_path), - patch("tools.skills_tool.sys") as mock_sys, + patch("agent.skill_utils.sys") as mock_sys, ): mock_sys.platform = "win32" _make_skill(tmp_path, "generic-tool") diff --git a/tests/tools/test_skills_tool.py b/tests/tools/test_skills_tool.py index 6af2c83cb..8f054e7e2 100644 --- a/tests/tools/test_skills_tool.py +++ b/tests/tools/test_skills_tool.py @@ -589,38 +589,38 @@ class TestSkillMatchesPlatform: assert skill_matches_platform({"platforms": None}) is True def test_macos_on_darwin(self): - with patch("tools.skills_tool.sys") as mock_sys: + with patch("agent.skill_utils.sys") as mock_sys: mock_sys.platform = "darwin" assert skill_matches_platform({"platforms": ["macos"]}) is True def test_macos_on_linux(self): - with patch("tools.skills_tool.sys") as mock_sys: + with patch("agent.skill_utils.sys") as mock_sys: mock_sys.platform = "linux" assert skill_matches_platform({"platforms": ["macos"]}) is False def test_linux_on_linux(self): - with patch("tools.skills_tool.sys") as mock_sys: + with patch("agent.skill_utils.sys") as mock_sys: mock_sys.platform = "linux" assert skill_matches_platform({"platforms": ["linux"]}) is True def test_linux_on_darwin(self): - with patch("tools.skills_tool.sys") as mock_sys: + with patch("agent.skill_utils.sys") as mock_sys: mock_sys.platform = "darwin" assert skill_matches_platform({"platforms": ["linux"]}) is False def test_windows_on_win32(self): - with patch("tools.skills_tool.sys") as mock_sys: + with patch("agent.skill_utils.sys") as mock_sys: mock_sys.platform = "win32" assert skill_matches_platform({"platforms": ["windows"]}) is True def test_windows_on_linux(self): - with patch("tools.skills_tool.sys") as mock_sys: + with patch("agent.skill_utils.sys") as mock_sys: mock_sys.platform = "linux" assert skill_matches_platform({"platforms": ["windows"]}) is False def test_multi_platform_match(self): """Skills listing multiple platforms should match any of them.""" - with patch("tools.skills_tool.sys") as mock_sys: + with patch("agent.skill_utils.sys") as mock_sys: mock_sys.platform = "darwin" assert skill_matches_platform({"platforms": ["macos", "linux"]}) is True mock_sys.platform = "linux" @@ -630,20 +630,20 @@ class TestSkillMatchesPlatform: def test_string_instead_of_list(self): """A single string value should be treated as a one-element list.""" - with patch("tools.skills_tool.sys") as mock_sys: + with patch("agent.skill_utils.sys") as mock_sys: mock_sys.platform = "darwin" assert skill_matches_platform({"platforms": "macos"}) is True mock_sys.platform = "linux" assert skill_matches_platform({"platforms": "macos"}) is False def test_case_insensitive(self): - with patch("tools.skills_tool.sys") as mock_sys: + with patch("agent.skill_utils.sys") as mock_sys: mock_sys.platform = "darwin" assert skill_matches_platform({"platforms": ["MacOS"]}) is True assert skill_matches_platform({"platforms": ["MACOS"]}) is True def test_unknown_platform_no_match(self): - with patch("tools.skills_tool.sys") as mock_sys: + with patch("agent.skill_utils.sys") as mock_sys: mock_sys.platform = "linux" assert skill_matches_platform({"platforms": ["freebsd"]}) is False @@ -659,7 +659,7 @@ class TestFindAllSkillsPlatformFiltering: def test_excludes_incompatible_platform(self, tmp_path): with ( patch("tools.skills_tool.SKILLS_DIR", tmp_path), - patch("tools.skills_tool.sys") as mock_sys, + patch("agent.skill_utils.sys") as mock_sys, ): mock_sys.platform = "linux" _make_skill(tmp_path, "universal-skill") @@ -672,7 +672,7 @@ class TestFindAllSkillsPlatformFiltering: def test_includes_matching_platform(self, tmp_path): with ( patch("tools.skills_tool.SKILLS_DIR", tmp_path), - patch("tools.skills_tool.sys") as mock_sys, + patch("agent.skill_utils.sys") as mock_sys, ): mock_sys.platform = "darwin" _make_skill(tmp_path, "mac-only", frontmatter_extra="platforms: [macos]\n") @@ -684,7 +684,7 @@ class TestFindAllSkillsPlatformFiltering: """Skills without platforms field should appear on any platform.""" with ( patch("tools.skills_tool.SKILLS_DIR", tmp_path), - patch("tools.skills_tool.sys") as mock_sys, + patch("agent.skill_utils.sys") as mock_sys, ): mock_sys.platform = "win32" _make_skill(tmp_path, "generic-skill") @@ -695,7 +695,7 @@ class TestFindAllSkillsPlatformFiltering: def test_multi_platform_skill(self, tmp_path): with ( patch("tools.skills_tool.SKILLS_DIR", tmp_path), - patch("tools.skills_tool.sys") as mock_sys, + patch("agent.skill_utils.sys") as mock_sys, ): _make_skill( tmp_path, "cross-plat", frontmatter_extra="platforms: [macos, linux]\n" diff --git a/tools/skill_manager_tool.py b/tools/skill_manager_tool.py index 045e13500..3c8619496 100644 --- a/tools/skill_manager_tool.py +++ b/tools/skill_manager_tool.py @@ -547,6 +547,13 @@ def skill_manage( else: result = {"success": False, "error": f"Unknown action '{action}'. Use: create, edit, patch, delete, write_file, remove_file"} + if result.get("success"): + try: + from agent.prompt_builder import clear_skills_system_prompt_cache + clear_skills_system_prompt_cache(clear_snapshot=True) + except Exception: + pass + return json.dumps(result, ensure_ascii=False) diff --git a/tools/skills_tool.py b/tools/skills_tool.py index fef89f198..cdee7a6f0 100644 --- a/tools/skills_tool.py +++ b/tools/skills_tool.py @@ -120,28 +120,11 @@ def set_secret_capture_callback(callback) -> None: def skill_matches_platform(frontmatter: Dict[str, Any]) -> bool: """Check if a skill is compatible with the current OS platform. - Skills declare platform requirements via a top-level ``platforms`` list - in their YAML frontmatter:: - - platforms: [macos] # macOS only - platforms: [macos, linux] # macOS and Linux - - Valid values: ``macos``, ``linux``, ``windows``. - - If the field is absent or empty the skill is compatible with **all** - platforms (backward-compatible default). + Delegates to ``agent.skill_utils.skill_matches_platform`` — kept here + as a public re-export so existing callers don't need updating. """ - platforms = frontmatter.get("platforms") - if not platforms: - return True # No restriction → loads everywhere - if not isinstance(platforms, list): - platforms = [platforms] - current = sys.platform - for p in platforms: - mapped = _PLATFORM_MAP.get(str(p).lower().strip(), str(p).lower().strip()) - if current.startswith(mapped): - return True - return False + from agent.skill_utils import skill_matches_platform as _impl + return _impl(frontmatter) def _normalize_prerequisite_values(value: Any) -> List[str]: @@ -419,40 +402,13 @@ def check_skills_requirements() -> bool: def _parse_frontmatter(content: str) -> Tuple[Dict[str, Any], str]: + """Parse YAML frontmatter from markdown content. + + Delegates to ``agent.skill_utils.parse_frontmatter`` — kept here + as a public re-export so existing callers don't need updating. """ - Parse YAML frontmatter from markdown content. - - Uses yaml.safe_load for full YAML support (nested metadata, lists, etc.) - with a fallback to simple key:value splitting for robustness. - - Args: - content: Full markdown file content - - Returns: - Tuple of (frontmatter dict, remaining content) - """ - frontmatter = {} - body = content - - if content.startswith("---"): - end_match = re.search(r"\n---\s*\n", content[3:]) - if end_match: - yaml_content = content[3 : end_match.start() + 3] - body = content[end_match.end() + 3 :] - - try: - parsed = yaml.safe_load(yaml_content) - if isinstance(parsed, dict): - frontmatter = parsed - # yaml.safe_load returns None for empty frontmatter - except yaml.YAMLError: - # Fallback: simple key:value parsing for malformed YAML - for line in yaml_content.strip().split("\n"): - if ":" in line: - key, value = line.split(":", 1) - frontmatter[key.strip()] = value.strip() - - return frontmatter, body + from agent.skill_utils import parse_frontmatter + return parse_frontmatter(content) def _get_category_from_path(skill_path: Path) -> Optional[str]: @@ -516,24 +472,13 @@ def _parse_tags(tags_value) -> List[str]: def _get_disabled_skill_names() -> Set[str]: - """Load disabled skill names from config (once per call). + """Load disabled skill names from config. - Resolves platform from ``HERMES_PLATFORM`` env var, falls back to - the global disabled list. + Delegates to ``agent.skill_utils.get_disabled_skill_names`` — kept here + as a public re-export so existing callers don't need updating. """ - import os - try: - from hermes_cli.config import load_config - config = load_config() - skills_cfg = config.get("skills", {}) - resolved_platform = os.getenv("HERMES_PLATFORM") - if resolved_platform: - platform_disabled = skills_cfg.get("platform_disabled", {}).get(resolved_platform) - if platform_disabled is not None: - return set(platform_disabled) - return set(skills_cfg.get("disabled", [])) - except Exception: - return set() + from agent.skill_utils import get_disabled_skill_names + return get_disabled_skill_names() def _is_skill_disabled(name: str, platform: str = None) -> bool: