When PyYAML is unavailable or YAML frontmatter is malformed, the fallback
parser may return metadata as a string instead of a dict. This causes
AttributeError when calling .get("hermes") on the string.
Added explicit type checks to handle cases where metadata or hermes fields
are not dicts, preventing the crash.
Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com>
277 lines
9.4 KiB
Python
277 lines
9.4 KiB
Python
"""Lightweight skill metadata utilities shared by prompt_builder and skills_tool.
|
|
|
|
This module intentionally avoids importing the tool registry, CLI config, or any
|
|
heavy dependency chain. It is safe to import at module level without triggering
|
|
tool registration or provider resolution.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Set, Tuple
|
|
|
|
from hermes_constants import get_hermes_home
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ── Platform mapping ──────────────────────────────────────────────────────
|
|
|
|
PLATFORM_MAP = {
|
|
"macos": "darwin",
|
|
"linux": "linux",
|
|
"windows": "win32",
|
|
}
|
|
|
|
EXCLUDED_SKILL_DIRS = frozenset((".git", ".github", ".hub"))
|
|
|
|
# ── Lazy YAML loader ─────────────────────────────────────────────────────
|
|
|
|
_yaml_load_fn = None
|
|
|
|
|
|
def yaml_load(content: str):
|
|
"""Parse YAML with lazy import and CSafeLoader preference."""
|
|
global _yaml_load_fn
|
|
if _yaml_load_fn is None:
|
|
import yaml
|
|
|
|
loader = getattr(yaml, "CSafeLoader", None) or yaml.SafeLoader
|
|
|
|
def _load(value: str):
|
|
return yaml.load(value, Loader=loader)
|
|
|
|
_yaml_load_fn = _load
|
|
return _yaml_load_fn(content)
|
|
|
|
|
|
# ── Frontmatter parsing ──────────────────────────────────────────────────
|
|
|
|
|
|
def parse_frontmatter(content: str) -> Tuple[Dict[str, Any], str]:
|
|
"""Parse YAML frontmatter from a markdown string.
|
|
|
|
Uses yaml with CSafeLoader for full YAML support (nested metadata, lists)
|
|
with a fallback to simple key:value splitting for robustness.
|
|
|
|
Returns:
|
|
(frontmatter_dict, remaining_body)
|
|
"""
|
|
frontmatter: Dict[str, Any] = {}
|
|
body = content
|
|
|
|
if not content.startswith("---"):
|
|
return frontmatter, body
|
|
|
|
end_match = re.search(r"\n---\s*\n", content[3:])
|
|
if not end_match:
|
|
return frontmatter, body
|
|
|
|
yaml_content = content[3 : end_match.start() + 3]
|
|
body = content[end_match.end() + 3 :]
|
|
|
|
try:
|
|
parsed = yaml_load(yaml_content)
|
|
if isinstance(parsed, dict):
|
|
frontmatter = parsed
|
|
except Exception:
|
|
# Fallback: simple key:value parsing for malformed YAML
|
|
for line in yaml_content.strip().split("\n"):
|
|
if ":" not in line:
|
|
continue
|
|
key, value = line.split(":", 1)
|
|
frontmatter[key.strip()] = value.strip()
|
|
|
|
return frontmatter, body
|
|
|
|
|
|
# ── Platform matching ─────────────────────────────────────────────────────
|
|
|
|
|
|
def skill_matches_platform(frontmatter: Dict[str, Any]) -> bool:
|
|
"""Return True when the skill is compatible with the current OS.
|
|
|
|
Skills declare platform requirements via a top-level ``platforms`` list
|
|
in their YAML frontmatter::
|
|
|
|
platforms: [macos] # macOS only
|
|
platforms: [macos, linux] # macOS and Linux
|
|
|
|
If the field is absent or empty the skill is compatible with **all**
|
|
platforms (backward-compatible default).
|
|
"""
|
|
platforms = frontmatter.get("platforms")
|
|
if not platforms:
|
|
return True
|
|
if not isinstance(platforms, list):
|
|
platforms = [platforms]
|
|
current = sys.platform
|
|
for platform in platforms:
|
|
normalized = str(platform).lower().strip()
|
|
mapped = PLATFORM_MAP.get(normalized, normalized)
|
|
if current.startswith(mapped):
|
|
return True
|
|
return False
|
|
|
|
|
|
# ── Disabled skills ───────────────────────────────────────────────────────
|
|
|
|
|
|
def get_disabled_skill_names() -> Set[str]:
|
|
"""Read disabled skill names from config.yaml.
|
|
|
|
Resolves platform from ``HERMES_PLATFORM`` env var, falls back to
|
|
the global disabled list. Reads the config file directly (no CLI
|
|
config imports) to stay lightweight.
|
|
"""
|
|
config_path = get_hermes_home() / "config.yaml"
|
|
if not config_path.exists():
|
|
return set()
|
|
try:
|
|
parsed = yaml_load(config_path.read_text(encoding="utf-8"))
|
|
except Exception as e:
|
|
logger.debug("Could not read skill config %s: %s", config_path, e)
|
|
return set()
|
|
if not isinstance(parsed, dict):
|
|
return set()
|
|
|
|
skills_cfg = parsed.get("skills")
|
|
if not isinstance(skills_cfg, dict):
|
|
return set()
|
|
|
|
resolved_platform = os.getenv("HERMES_PLATFORM")
|
|
if resolved_platform:
|
|
platform_disabled = (skills_cfg.get("platform_disabled") or {}).get(
|
|
resolved_platform
|
|
)
|
|
if platform_disabled is not None:
|
|
return _normalize_string_set(platform_disabled)
|
|
return _normalize_string_set(skills_cfg.get("disabled"))
|
|
|
|
|
|
def _normalize_string_set(values) -> Set[str]:
|
|
if values is None:
|
|
return set()
|
|
if isinstance(values, str):
|
|
values = [values]
|
|
return {str(v).strip() for v in values if str(v).strip()}
|
|
|
|
|
|
# ── External skills directories ──────────────────────────────────────────
|
|
|
|
|
|
def get_external_skills_dirs() -> List[Path]:
|
|
"""Read ``skills.external_dirs`` from config.yaml and return validated paths.
|
|
|
|
Each entry is expanded (``~`` and ``${VAR}``) and resolved to an absolute
|
|
path. Only directories that actually exist are returned. Duplicates and
|
|
paths that resolve to the local ``~/.hermes/skills/`` are silently skipped.
|
|
"""
|
|
config_path = get_hermes_home() / "config.yaml"
|
|
if not config_path.exists():
|
|
return []
|
|
try:
|
|
parsed = yaml_load(config_path.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return []
|
|
if not isinstance(parsed, dict):
|
|
return []
|
|
|
|
skills_cfg = parsed.get("skills")
|
|
if not isinstance(skills_cfg, dict):
|
|
return []
|
|
|
|
raw_dirs = skills_cfg.get("external_dirs")
|
|
if not raw_dirs:
|
|
return []
|
|
if isinstance(raw_dirs, str):
|
|
raw_dirs = [raw_dirs]
|
|
if not isinstance(raw_dirs, list):
|
|
return []
|
|
|
|
local_skills = (get_hermes_home() / "skills").resolve()
|
|
seen: Set[Path] = set()
|
|
result: List[Path] = []
|
|
|
|
for entry in raw_dirs:
|
|
entry = str(entry).strip()
|
|
if not entry:
|
|
continue
|
|
# Expand ~ and environment variables
|
|
expanded = os.path.expanduser(os.path.expandvars(entry))
|
|
p = Path(expanded).resolve()
|
|
if p == local_skills:
|
|
continue
|
|
if p in seen:
|
|
continue
|
|
if p.is_dir():
|
|
seen.add(p)
|
|
result.append(p)
|
|
else:
|
|
logger.debug("External skills dir does not exist, skipping: %s", p)
|
|
|
|
return result
|
|
|
|
|
|
def get_all_skills_dirs() -> List[Path]:
|
|
"""Return all skill directories: local ``~/.hermes/skills/`` first, then external.
|
|
|
|
The local dir is always first (and always included even if it doesn't exist
|
|
yet — callers handle that). External dirs follow in config order.
|
|
"""
|
|
dirs = [get_hermes_home() / "skills"]
|
|
dirs.extend(get_external_skills_dirs())
|
|
return dirs
|
|
|
|
|
|
# ── Condition extraction ──────────────────────────────────────────────────
|
|
|
|
|
|
def extract_skill_conditions(frontmatter: Dict[str, Any]) -> Dict[str, List]:
|
|
"""Extract conditional activation fields from parsed frontmatter."""
|
|
metadata = frontmatter.get("metadata")
|
|
# Handle cases where metadata is not a dict (e.g., a string from malformed YAML)
|
|
if not isinstance(metadata, dict):
|
|
metadata = {}
|
|
hermes = metadata.get("hermes") or {}
|
|
if not isinstance(hermes, dict):
|
|
hermes = {}
|
|
return {
|
|
"fallback_for_toolsets": hermes.get("fallback_for_toolsets", []),
|
|
"requires_toolsets": hermes.get("requires_toolsets", []),
|
|
"fallback_for_tools": hermes.get("fallback_for_tools", []),
|
|
"requires_tools": hermes.get("requires_tools", []),
|
|
}
|
|
|
|
|
|
# ── Description extraction ────────────────────────────────────────────────
|
|
|
|
|
|
def extract_skill_description(frontmatter: Dict[str, Any]) -> str:
|
|
"""Extract a truncated description from parsed frontmatter."""
|
|
raw_desc = frontmatter.get("description", "")
|
|
if not raw_desc:
|
|
return ""
|
|
desc = str(raw_desc).strip().strip("'\"")
|
|
if len(desc) > 60:
|
|
return desc[:57] + "..."
|
|
return desc
|
|
|
|
|
|
# ── File iteration ────────────────────────────────────────────────────────
|
|
|
|
|
|
def iter_skill_index_files(skills_dir: Path, filename: str):
|
|
"""Walk skills_dir yielding sorted paths matching *filename*.
|
|
|
|
Excludes ``.git``, ``.github``, ``.hub`` directories.
|
|
"""
|
|
matches = []
|
|
for root, dirs, files in os.walk(skills_dir):
|
|
dirs[:] = [d for d in dirs if d not in EXCLUDED_SKILL_DIRS]
|
|
if filename in files:
|
|
matches.append(Path(root) / filename)
|
|
for path in sorted(matches, key=lambda p: str(p.relative_to(skills_dir))):
|
|
yield path
|