Some checks failed
Nix / nix (ubuntu-latest) (pull_request) Failing after 15s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Failing after 19s
Docker Build and Publish / build-and-push (pull_request) Failing after 28s
Tests / test (pull_request) Failing after 9m43s
Nix / nix (macos-latest) (pull_request) Has been cancelled
- Replace pickle with JSON + HMAC-SHA256 state serialization - Add constant-time signature verification - Implement replay attack protection with nonce expiration - Add comprehensive security test suite (54 tests) - Harden token storage with integrity verification Resolves: V-006 (CVSS 8.8)
326 lines
12 KiB
Python
326 lines
12 KiB
Python
"""Shared slash command helpers for skills and built-in prompt-style modes.
|
|
|
|
Shared between CLI (cli.py) and gateway (gateway/run.py) so both surfaces
|
|
can invoke skills via /skill-name commands and prompt-only built-ins like
|
|
/plan.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Optional
|
|
|
|
from agent.skill_security import (
|
|
validate_skill_name,
|
|
resolve_skill_path,
|
|
SkillSecurityError,
|
|
PathTraversalError,
|
|
InvalidSkillNameError,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_skill_commands: Dict[str, Dict[str, Any]] = {}
|
|
_PLAN_SLUG_RE = re.compile(r"[^a-z0-9]+")
|
|
|
|
|
|
def build_plan_path(
|
|
user_instruction: str = "",
|
|
*,
|
|
now: datetime | None = None,
|
|
) -> Path:
|
|
"""Return the default workspace-relative markdown path for a /plan invocation.
|
|
|
|
Relative paths are intentional: file tools are task/backend-aware and resolve
|
|
them against the active working directory for local, docker, ssh, modal,
|
|
daytona, and similar terminal backends. That keeps the plan with the active
|
|
workspace instead of the Hermes host's global home directory.
|
|
"""
|
|
slug_source = (user_instruction or "").strip().splitlines()[0] if user_instruction else ""
|
|
slug = _PLAN_SLUG_RE.sub("-", slug_source.lower()).strip("-")
|
|
if slug:
|
|
slug = "-".join(part for part in slug.split("-")[:8] if part)[:48].strip("-")
|
|
slug = slug or "conversation-plan"
|
|
timestamp = (now or datetime.now()).strftime("%Y-%m-%d_%H%M%S")
|
|
return Path(".hermes") / "plans" / f"{timestamp}-{slug}.md"
|
|
|
|
|
|
def _load_skill_payload(skill_identifier: str, task_id: str | None = None) -> tuple[dict[str, Any], Path | None, str] | None:
|
|
"""Load a skill by name/path and return (loaded_payload, skill_dir, display_name)."""
|
|
raw_identifier = (skill_identifier or "").strip()
|
|
if not raw_identifier:
|
|
return None
|
|
|
|
# Security: Validate skill identifier to prevent path traversal (V-011)
|
|
try:
|
|
validate_skill_name(raw_identifier, allow_path_separator=True)
|
|
except SkillSecurityError as e:
|
|
logger.warning("Security: Blocked skill loading attempt with invalid identifier '%s': %s", raw_identifier, e)
|
|
return None
|
|
|
|
try:
|
|
from tools.skills_tool import SKILLS_DIR, skill_view
|
|
|
|
# Security: Block absolute paths and home directory expansion attempts
|
|
identifier_path = Path(raw_identifier)
|
|
if identifier_path.is_absolute():
|
|
logger.warning("Security: Blocked absolute path in skill identifier: %s", raw_identifier)
|
|
return None
|
|
|
|
# Normalize the identifier: remove leading slashes and validate
|
|
normalized = raw_identifier.lstrip("/")
|
|
|
|
# Security: Double-check no traversal patterns remain after normalization
|
|
if ".." in normalized or "~" in normalized:
|
|
logger.warning("Security: Blocked path traversal in skill identifier: %s", raw_identifier)
|
|
return None
|
|
|
|
# Security: Verify the resolved path stays within SKILLS_DIR
|
|
try:
|
|
target_path = (SKILLS_DIR / normalized).resolve()
|
|
target_path.relative_to(SKILLS_DIR.resolve())
|
|
except (ValueError, OSError):
|
|
logger.warning("Security: Skill path escapes skills directory: %s", raw_identifier)
|
|
return None
|
|
|
|
loaded_skill = json.loads(skill_view(normalized, task_id=task_id))
|
|
except Exception:
|
|
return None
|
|
|
|
if not loaded_skill.get("success"):
|
|
return None
|
|
|
|
skill_name = str(loaded_skill.get("name") or normalized)
|
|
skill_path = str(loaded_skill.get("path") or "")
|
|
skill_dir = None
|
|
if skill_path:
|
|
try:
|
|
skill_dir = SKILLS_DIR / Path(skill_path).parent
|
|
except Exception:
|
|
skill_dir = None
|
|
|
|
return loaded_skill, skill_dir, skill_name
|
|
|
|
|
|
def _build_skill_message(
|
|
loaded_skill: dict[str, Any],
|
|
skill_dir: Path | None,
|
|
activation_note: str,
|
|
user_instruction: str = "",
|
|
runtime_note: str = "",
|
|
) -> str:
|
|
"""Format a loaded skill into a user/system message payload."""
|
|
from tools.skills_tool import SKILLS_DIR
|
|
|
|
content = str(loaded_skill.get("content") or "")
|
|
|
|
parts = [activation_note, "", content.strip()]
|
|
|
|
if loaded_skill.get("setup_skipped"):
|
|
parts.extend(
|
|
[
|
|
"",
|
|
"[Skill setup note: Required environment setup was skipped. Continue loading the skill and explain any reduced functionality if it matters.]",
|
|
]
|
|
)
|
|
elif loaded_skill.get("gateway_setup_hint"):
|
|
parts.extend(
|
|
[
|
|
"",
|
|
f"[Skill setup note: {loaded_skill['gateway_setup_hint']}]",
|
|
]
|
|
)
|
|
elif loaded_skill.get("setup_needed") and loaded_skill.get("setup_note"):
|
|
parts.extend(
|
|
[
|
|
"",
|
|
f"[Skill setup note: {loaded_skill['setup_note']}]",
|
|
]
|
|
)
|
|
|
|
supporting = []
|
|
linked_files = loaded_skill.get("linked_files") or {}
|
|
for entries in linked_files.values():
|
|
if isinstance(entries, list):
|
|
supporting.extend(entries)
|
|
|
|
if not supporting and skill_dir:
|
|
for subdir in ("references", "templates", "scripts", "assets"):
|
|
subdir_path = skill_dir / subdir
|
|
if subdir_path.exists():
|
|
for f in sorted(subdir_path.rglob("*")):
|
|
if f.is_file():
|
|
rel = str(f.relative_to(skill_dir))
|
|
supporting.append(rel)
|
|
|
|
if supporting and skill_dir:
|
|
try:
|
|
skill_view_target = str(skill_dir.relative_to(SKILLS_DIR))
|
|
except ValueError:
|
|
# Skill is from an external dir — use the skill name instead
|
|
skill_view_target = skill_dir.name
|
|
parts.append("")
|
|
parts.append("[This skill has supporting files you can load with the skill_view tool:]")
|
|
for sf in supporting:
|
|
parts.append(f"- {sf}")
|
|
parts.append(
|
|
f'\nTo view any of these, use: skill_view(name="{skill_view_target}", file_path="<path>")'
|
|
)
|
|
|
|
if user_instruction:
|
|
parts.append("")
|
|
parts.append(f"The user has provided the following instruction alongside the skill invocation: {user_instruction}")
|
|
|
|
if runtime_note:
|
|
parts.append("")
|
|
parts.append(f"[Runtime note: {runtime_note}]")
|
|
|
|
return "\n".join(parts)
|
|
|
|
|
|
def scan_skill_commands() -> Dict[str, Dict[str, Any]]:
|
|
"""Scan ~/.hermes/skills/ and return a mapping of /command -> skill info.
|
|
|
|
Returns:
|
|
Dict mapping "/skill-name" to {name, description, skill_md_path, skill_dir}.
|
|
"""
|
|
global _skill_commands
|
|
_skill_commands = {}
|
|
try:
|
|
from tools.skills_tool import SKILLS_DIR, _parse_frontmatter, skill_matches_platform, _get_disabled_skill_names
|
|
from agent.skill_utils import get_external_skills_dirs
|
|
disabled = _get_disabled_skill_names()
|
|
seen_names: set = set()
|
|
|
|
# Scan local dir first, then external dirs
|
|
dirs_to_scan = []
|
|
if SKILLS_DIR.exists():
|
|
dirs_to_scan.append(SKILLS_DIR)
|
|
dirs_to_scan.extend(get_external_skills_dirs())
|
|
|
|
for scan_dir in dirs_to_scan:
|
|
for skill_md in scan_dir.rglob("SKILL.md"):
|
|
if any(part in ('.git', '.github', '.hub') for part in skill_md.parts):
|
|
continue
|
|
try:
|
|
content = skill_md.read_text(encoding='utf-8')
|
|
frontmatter, body = _parse_frontmatter(content)
|
|
# Skip skills incompatible with the current OS platform
|
|
if not skill_matches_platform(frontmatter):
|
|
continue
|
|
name = frontmatter.get('name', skill_md.parent.name)
|
|
if name in seen_names:
|
|
continue
|
|
# Respect user's disabled skills config
|
|
if name in disabled:
|
|
continue
|
|
description = frontmatter.get('description', '')
|
|
if not description:
|
|
for line in body.strip().split('\n'):
|
|
line = line.strip()
|
|
if line and not line.startswith('#'):
|
|
description = line[:80]
|
|
break
|
|
seen_names.add(name)
|
|
cmd_name = name.lower().replace(' ', '-').replace('_', '-')
|
|
_skill_commands[f"/{cmd_name}"] = {
|
|
"name": name,
|
|
"description": description or f"Invoke the {name} skill",
|
|
"skill_md_path": str(skill_md),
|
|
"skill_dir": str(skill_md.parent),
|
|
}
|
|
except Exception:
|
|
continue
|
|
except Exception:
|
|
pass
|
|
return _skill_commands
|
|
|
|
|
|
def get_skill_commands() -> Dict[str, Dict[str, Any]]:
|
|
"""Return the current skill commands mapping (scan first if empty)."""
|
|
if not _skill_commands:
|
|
scan_skill_commands()
|
|
return _skill_commands
|
|
|
|
|
|
def build_skill_invocation_message(
|
|
cmd_key: str,
|
|
user_instruction: str = "",
|
|
task_id: str | None = None,
|
|
runtime_note: str = "",
|
|
) -> Optional[str]:
|
|
"""Build the user message content for a skill slash command invocation.
|
|
|
|
Args:
|
|
cmd_key: The command key including leading slash (e.g., "/gif-search").
|
|
user_instruction: Optional text the user typed after the command.
|
|
|
|
Returns:
|
|
The formatted message string, or None if the skill wasn't found.
|
|
"""
|
|
commands = get_skill_commands()
|
|
skill_info = commands.get(cmd_key)
|
|
if not skill_info:
|
|
return None
|
|
|
|
loaded = _load_skill_payload(skill_info["skill_dir"], task_id=task_id)
|
|
if not loaded:
|
|
return f"[Failed to load skill: {skill_info['name']}]"
|
|
|
|
loaded_skill, skill_dir, skill_name = loaded
|
|
activation_note = (
|
|
f'[SYSTEM: The user has invoked the "{skill_name}" skill, indicating they want '
|
|
"you to follow its instructions. The full skill content is loaded below.]"
|
|
)
|
|
return _build_skill_message(
|
|
loaded_skill,
|
|
skill_dir,
|
|
activation_note,
|
|
user_instruction=user_instruction,
|
|
runtime_note=runtime_note,
|
|
)
|
|
|
|
|
|
def build_preloaded_skills_prompt(
|
|
skill_identifiers: list[str],
|
|
task_id: str | None = None,
|
|
) -> tuple[str, list[str], list[str]]:
|
|
"""Load one or more skills for session-wide CLI preloading.
|
|
|
|
Returns (prompt_text, loaded_skill_names, missing_identifiers).
|
|
"""
|
|
prompt_parts: list[str] = []
|
|
loaded_names: list[str] = []
|
|
missing: list[str] = []
|
|
|
|
seen: set[str] = set()
|
|
for raw_identifier in skill_identifiers:
|
|
identifier = (raw_identifier or "").strip()
|
|
if not identifier or identifier in seen:
|
|
continue
|
|
seen.add(identifier)
|
|
|
|
loaded = _load_skill_payload(identifier, task_id=task_id)
|
|
if not loaded:
|
|
missing.append(identifier)
|
|
continue
|
|
|
|
loaded_skill, skill_dir, skill_name = loaded
|
|
activation_note = (
|
|
f'[SYSTEM: The user launched this CLI session with the "{skill_name}" skill '
|
|
"preloaded. Treat its instructions as active guidance for the duration of this "
|
|
"session unless the user overrides them.]"
|
|
)
|
|
prompt_parts.append(
|
|
_build_skill_message(
|
|
loaded_skill,
|
|
skill_dir,
|
|
activation_note,
|
|
)
|
|
)
|
|
loaded_names.append(skill_name)
|
|
|
|
return "\n\n".join(prompt_parts), loaded_names, missing
|