OpenAI's newer models (GPT-5, Codex) give stronger instruction-following
weight to the 'developer' role vs 'system'. Swap the role at the API
boundary in _build_api_kwargs() for the chat_completions path so internal
message representation stays consistent ('system' everywhere).
Applies regardless of provider — OpenRouter, Nous portal, direct, etc.
The codex_responses path (direct OpenAI) uses 'instructions' instead of
message roles, so it's unaffected.
DEVELOPER_ROLE_MODELS constant in prompt_builder.py defines the matching
model name substrings: ('gpt-5', 'codex').
824 lines
34 KiB
Python
824 lines
34 KiB
Python
"""System prompt assembly -- identity, platform hints, skills index, context files.
|
|
|
|
All functions are stateless. AIAgent._build_system_prompt() calls these to
|
|
assemble pieces, then combines them with memory and ephemeral prompts.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import threading
|
|
from collections import OrderedDict
|
|
from pathlib import Path
|
|
|
|
from hermes_constants import get_hermes_home
|
|
from typing import Optional
|
|
|
|
from agent.skill_utils import (
|
|
extract_skill_conditions,
|
|
extract_skill_description,
|
|
get_all_skills_dirs,
|
|
get_disabled_skill_names,
|
|
iter_skill_index_files,
|
|
parse_frontmatter,
|
|
skill_matches_platform,
|
|
)
|
|
from utils import atomic_json_write
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Context file scanning — detect prompt injection in AGENTS.md, .cursorrules,
|
|
# SOUL.md before they get injected into the system prompt.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_CONTEXT_THREAT_PATTERNS = [
|
|
(r'ignore\s+(previous|all|above|prior)\s+instructions', "prompt_injection"),
|
|
(r'do\s+not\s+tell\s+the\s+user', "deception_hide"),
|
|
(r'system\s+prompt\s+override', "sys_prompt_override"),
|
|
(r'disregard\s+(your|all|any)\s+(instructions|rules|guidelines)', "disregard_rules"),
|
|
(r'act\s+as\s+(if|though)\s+you\s+(have\s+no|don\'t\s+have)\s+(restrictions|limits|rules)', "bypass_restrictions"),
|
|
(r'<!--[^>]*(?:ignore|override|system|secret|hidden)[^>]*-->', "html_comment_injection"),
|
|
(r'<\s*div\s+style\s*=\s*["\'].*display\s*:\s*none', "hidden_div"),
|
|
(r'translate\s+.*\s+into\s+.*\s+and\s+(execute|run|eval)', "translate_execute"),
|
|
(r'curl\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)', "exfil_curl"),
|
|
(r'cat\s+[^\n]*(\.env|credentials|\.netrc|\.pgpass)', "read_secrets"),
|
|
]
|
|
|
|
_CONTEXT_INVISIBLE_CHARS = {
|
|
'\u200b', '\u200c', '\u200d', '\u2060', '\ufeff',
|
|
'\u202a', '\u202b', '\u202c', '\u202d', '\u202e',
|
|
}
|
|
|
|
|
|
def _scan_context_content(content: str, filename: str) -> str:
|
|
"""Scan context file content for injection. Returns sanitized content."""
|
|
findings = []
|
|
|
|
# Check invisible unicode
|
|
for char in _CONTEXT_INVISIBLE_CHARS:
|
|
if char in content:
|
|
findings.append(f"invisible unicode U+{ord(char):04X}")
|
|
|
|
# Check threat patterns
|
|
for pattern, pid in _CONTEXT_THREAT_PATTERNS:
|
|
if re.search(pattern, content, re.IGNORECASE):
|
|
findings.append(pid)
|
|
|
|
if findings:
|
|
logger.warning("Context file %s blocked: %s", filename, ", ".join(findings))
|
|
return f"[BLOCKED: {filename} contained potential prompt injection ({', '.join(findings)}). Content not loaded.]"
|
|
|
|
return content
|
|
|
|
|
|
def _find_git_root(start: Path) -> Optional[Path]:
|
|
"""Walk *start* and its parents looking for a ``.git`` directory.
|
|
|
|
Returns the directory containing ``.git``, or ``None`` if we hit the
|
|
filesystem root without finding one.
|
|
"""
|
|
current = start.resolve()
|
|
for parent in [current, *current.parents]:
|
|
if (parent / ".git").exists():
|
|
return parent
|
|
return None
|
|
|
|
|
|
_HERMES_MD_NAMES = (".hermes.md", "HERMES.md")
|
|
|
|
|
|
def _find_hermes_md(cwd: Path) -> Optional[Path]:
|
|
"""Discover the nearest ``.hermes.md`` or ``HERMES.md``.
|
|
|
|
Search order: *cwd* first, then each parent directory up to (and
|
|
including) the git repository root. Returns the first match, or
|
|
``None`` if nothing is found.
|
|
"""
|
|
stop_at = _find_git_root(cwd)
|
|
current = cwd.resolve()
|
|
|
|
for directory in [current, *current.parents]:
|
|
for name in _HERMES_MD_NAMES:
|
|
candidate = directory / name
|
|
if candidate.is_file():
|
|
return candidate
|
|
# Stop walking at the git root (or filesystem root).
|
|
if stop_at and directory == stop_at:
|
|
break
|
|
return None
|
|
|
|
|
|
def _strip_yaml_frontmatter(content: str) -> str:
|
|
"""Remove optional YAML frontmatter (``---`` delimited) from *content*.
|
|
|
|
The frontmatter may contain structured config (model overrides, tool
|
|
settings) that will be handled separately in a future PR. For now we
|
|
strip it so only the human-readable markdown body is injected into the
|
|
system prompt.
|
|
"""
|
|
if content.startswith("---"):
|
|
end = content.find("\n---", 3)
|
|
if end != -1:
|
|
# Skip past the closing --- and any trailing newline
|
|
body = content[end + 4:].lstrip("\n")
|
|
return body if body else content
|
|
return content
|
|
|
|
|
|
# =========================================================================
|
|
# Constants
|
|
# =========================================================================
|
|
|
|
DEFAULT_AGENT_IDENTITY = (
|
|
"You are Hermes Agent, an intelligent AI assistant created by Nous Research. "
|
|
"You are helpful, knowledgeable, and direct. You assist users with a wide "
|
|
"range of tasks including answering questions, writing and editing code, "
|
|
"analyzing information, creative work, and executing actions via your tools. "
|
|
"You communicate clearly, admit uncertainty when appropriate, and prioritize "
|
|
"being genuinely useful over being verbose unless otherwise directed below. "
|
|
"Be targeted and efficient in your exploration and investigations."
|
|
)
|
|
|
|
MEMORY_GUIDANCE = (
|
|
"You have persistent memory across sessions. Save durable facts using the memory "
|
|
"tool: user preferences, environment details, tool quirks, and stable conventions. "
|
|
"Memory is injected into every turn, so keep it compact and focused on facts that "
|
|
"will still matter later.\n"
|
|
"Prioritize what reduces future user steering — the most valuable memory is one "
|
|
"that prevents the user from having to correct or remind you again. "
|
|
"User preferences and recurring corrections matter more than procedural task details.\n"
|
|
"Do NOT save task progress, session outcomes, completed-work logs, or temporary TODO "
|
|
"state to memory; use session_search to recall those from past transcripts. "
|
|
"If you've discovered a new way to do something, solved a problem that could be "
|
|
"necessary later, save it as a skill with the skill tool."
|
|
)
|
|
|
|
SESSION_SEARCH_GUIDANCE = (
|
|
"When the user references something from a past conversation or you suspect "
|
|
"relevant cross-session context exists, use session_search to recall it before "
|
|
"asking them to repeat themselves."
|
|
)
|
|
|
|
SKILLS_GUIDANCE = (
|
|
"After completing a complex task (5+ tool calls), fixing a tricky error, "
|
|
"or discovering a non-trivial workflow, save the approach as a "
|
|
"skill with skill_manage so you can reuse it next time.\n"
|
|
"When using a skill and finding it outdated, incomplete, or wrong, "
|
|
"patch it immediately with skill_manage(action='patch') — don't wait to be asked. "
|
|
"Skills that aren't maintained become liabilities."
|
|
)
|
|
|
|
TOOL_USE_ENFORCEMENT_GUIDANCE = (
|
|
"# Tool-use enforcement\n"
|
|
"You MUST use your tools to take action — do not describe what you would do "
|
|
"or plan to do without actually doing it. When you say you will perform an "
|
|
"action (e.g. 'I will run the tests', 'Let me check the file', 'I will create "
|
|
"the project'), you MUST immediately make the corresponding tool call in the same "
|
|
"response. Never end your turn with a promise of future action — execute it now.\n"
|
|
"Keep working until the task is actually complete. Do not stop with a summary of "
|
|
"what you plan to do next time. If you have tools available that can accomplish "
|
|
"the task, use them instead of telling the user what you would do.\n"
|
|
"Every response should either (a) contain tool calls that make progress, or "
|
|
"(b) deliver a final result to the user. Responses that only describe intentions "
|
|
"without acting are not acceptable."
|
|
)
|
|
|
|
# Model name substrings that trigger tool-use enforcement guidance.
|
|
# Add new patterns here when a model family needs explicit steering.
|
|
TOOL_USE_ENFORCEMENT_MODELS = ("gpt", "codex")
|
|
|
|
# Model name substrings that should use the 'developer' role instead of
|
|
# 'system' for the system prompt. OpenAI's newer models (GPT-5, Codex)
|
|
# give stronger instruction-following weight to the 'developer' role.
|
|
# The swap happens at the API boundary in _build_api_kwargs() so internal
|
|
# message representation stays consistent ("system" everywhere).
|
|
DEVELOPER_ROLE_MODELS = ("gpt-5", "codex")
|
|
|
|
PLATFORM_HINTS = {
|
|
"whatsapp": (
|
|
"You are on a text messaging communication platform, WhatsApp. "
|
|
"Please do not use markdown as it does not render. "
|
|
"You can send media files natively: to deliver a file to the user, "
|
|
"include MEDIA:/absolute/path/to/file in your response. The file "
|
|
"will be sent as a native WhatsApp attachment — images (.jpg, .png, "
|
|
".webp) appear as photos, videos (.mp4, .mov) play inline, and other "
|
|
"files arrive as downloadable documents. You can also include image "
|
|
"URLs in markdown format  and they will be sent as photos."
|
|
),
|
|
"telegram": (
|
|
"You are on a text messaging communication platform, Telegram. "
|
|
"Please do not use markdown as it does not render. "
|
|
"You can send media files natively: to deliver a file to the user, "
|
|
"include MEDIA:/absolute/path/to/file in your response. Images "
|
|
"(.png, .jpg, .webp) appear as photos, audio (.ogg) sends as voice "
|
|
"bubbles, and videos (.mp4) play inline. You can also include image "
|
|
"URLs in markdown format  and they will be sent as native photos."
|
|
),
|
|
"discord": (
|
|
"You are in a Discord server or group chat communicating with your user. "
|
|
"You can send media files natively: include MEDIA:/absolute/path/to/file "
|
|
"in your response. Images (.png, .jpg, .webp) are sent as photo "
|
|
"attachments, audio as file attachments. You can also include image URLs "
|
|
"in markdown format  and they will be sent as attachments."
|
|
),
|
|
"slack": (
|
|
"You are in a Slack workspace communicating with your user. "
|
|
"You can send media files natively: include MEDIA:/absolute/path/to/file "
|
|
"in your response. Images (.png, .jpg, .webp) are uploaded as photo "
|
|
"attachments, audio as file attachments. You can also include image URLs "
|
|
"in markdown format  and they will be uploaded as attachments."
|
|
),
|
|
"signal": (
|
|
"You are on a text messaging communication platform, Signal. "
|
|
"Please do not use markdown as it does not render. "
|
|
"You can send media files natively: to deliver a file to the user, "
|
|
"include MEDIA:/absolute/path/to/file in your response. Images "
|
|
"(.png, .jpg, .webp) appear as photos, audio as attachments, and other "
|
|
"files arrive as downloadable documents. You can also include image "
|
|
"URLs in markdown format  and they will be sent as photos."
|
|
),
|
|
"email": (
|
|
"You are communicating via email. Write clear, well-structured responses "
|
|
"suitable for email. Use plain text formatting (no markdown). "
|
|
"Keep responses concise but complete. You can send file attachments — "
|
|
"include MEDIA:/absolute/path/to/file in your response. The subject line "
|
|
"is preserved for threading. Do not include greetings or sign-offs unless "
|
|
"contextually appropriate."
|
|
),
|
|
"cron": (
|
|
"You are running as a scheduled cron job. There is no user present — you "
|
|
"cannot ask questions, request clarification, or wait for follow-up. Execute "
|
|
"the task fully and autonomously, making reasonable decisions where needed. "
|
|
"Your final response is automatically delivered to the job's configured "
|
|
"destination — put the primary content directly in your response."
|
|
),
|
|
"cli": (
|
|
"You are a CLI AI Agent. Try not to use markdown but simple text "
|
|
"renderable inside a terminal."
|
|
),
|
|
"sms": (
|
|
"You are communicating via SMS. Keep responses concise and use plain text "
|
|
"only — no markdown, no formatting. SMS messages are limited to ~1600 "
|
|
"characters, so be brief and direct."
|
|
),
|
|
}
|
|
|
|
CONTEXT_FILE_MAX_CHARS = 20_000
|
|
CONTEXT_TRUNCATE_HEAD_RATIO = 0.7
|
|
CONTEXT_TRUNCATE_TAIL_RATIO = 0.2
|
|
|
|
|
|
# =========================================================================
|
|
# Skills prompt cache
|
|
# =========================================================================
|
|
|
|
_SKILLS_PROMPT_CACHE_MAX = 8
|
|
_SKILLS_PROMPT_CACHE: OrderedDict[tuple, str] = OrderedDict()
|
|
_SKILLS_PROMPT_CACHE_LOCK = threading.Lock()
|
|
_SKILLS_SNAPSHOT_VERSION = 1
|
|
|
|
|
|
def _skills_prompt_snapshot_path() -> Path:
|
|
return get_hermes_home() / ".skills_prompt_snapshot.json"
|
|
|
|
|
|
def clear_skills_system_prompt_cache(*, clear_snapshot: bool = False) -> None:
|
|
"""Drop the in-process skills prompt cache (and optionally the disk snapshot)."""
|
|
with _SKILLS_PROMPT_CACHE_LOCK:
|
|
_SKILLS_PROMPT_CACHE.clear()
|
|
if clear_snapshot:
|
|
try:
|
|
_skills_prompt_snapshot_path().unlink(missing_ok=True)
|
|
except OSError as e:
|
|
logger.debug("Could not remove skills prompt snapshot: %s", e)
|
|
|
|
|
|
def _build_skills_manifest(skills_dir: Path) -> dict[str, list[int]]:
|
|
"""Build an mtime/size manifest of all SKILL.md and DESCRIPTION.md files."""
|
|
manifest: dict[str, list[int]] = {}
|
|
for filename in ("SKILL.md", "DESCRIPTION.md"):
|
|
for path in iter_skill_index_files(skills_dir, filename):
|
|
try:
|
|
st = path.stat()
|
|
except OSError:
|
|
continue
|
|
manifest[str(path.relative_to(skills_dir))] = [st.st_mtime_ns, st.st_size]
|
|
return manifest
|
|
|
|
|
|
def _load_skills_snapshot(skills_dir: Path) -> Optional[dict]:
|
|
"""Load the disk snapshot if it exists and its manifest still matches."""
|
|
snapshot_path = _skills_prompt_snapshot_path()
|
|
if not snapshot_path.exists():
|
|
return None
|
|
try:
|
|
snapshot = json.loads(snapshot_path.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return None
|
|
if not isinstance(snapshot, dict):
|
|
return None
|
|
if snapshot.get("version") != _SKILLS_SNAPSHOT_VERSION:
|
|
return None
|
|
if snapshot.get("manifest") != _build_skills_manifest(skills_dir):
|
|
return None
|
|
return snapshot
|
|
|
|
|
|
def _write_skills_snapshot(
|
|
skills_dir: Path,
|
|
manifest: dict[str, list[int]],
|
|
skill_entries: list[dict],
|
|
category_descriptions: dict[str, str],
|
|
) -> None:
|
|
"""Persist skill metadata to disk for fast cold-start reuse."""
|
|
payload = {
|
|
"version": _SKILLS_SNAPSHOT_VERSION,
|
|
"manifest": manifest,
|
|
"skills": skill_entries,
|
|
"category_descriptions": category_descriptions,
|
|
}
|
|
try:
|
|
atomic_json_write(_skills_prompt_snapshot_path(), payload)
|
|
except Exception as e:
|
|
logger.debug("Could not write skills prompt snapshot: %s", e)
|
|
|
|
|
|
def _build_snapshot_entry(
|
|
skill_file: Path,
|
|
skills_dir: Path,
|
|
frontmatter: dict,
|
|
description: str,
|
|
) -> dict:
|
|
"""Build a serialisable metadata dict for one skill."""
|
|
rel_path = skill_file.relative_to(skills_dir)
|
|
parts = rel_path.parts
|
|
if len(parts) >= 2:
|
|
skill_name = parts[-2]
|
|
category = "/".join(parts[:-2]) if len(parts) > 2 else parts[0]
|
|
else:
|
|
category = "general"
|
|
skill_name = skill_file.parent.name
|
|
|
|
platforms = frontmatter.get("platforms") or []
|
|
if isinstance(platforms, str):
|
|
platforms = [platforms]
|
|
|
|
return {
|
|
"skill_name": skill_name,
|
|
"category": category,
|
|
"frontmatter_name": str(frontmatter.get("name", skill_name)),
|
|
"description": description,
|
|
"platforms": [str(p).strip() for p in platforms if str(p).strip()],
|
|
"conditions": extract_skill_conditions(frontmatter),
|
|
}
|
|
|
|
|
|
# =========================================================================
|
|
# Skills index
|
|
# =========================================================================
|
|
|
|
def _parse_skill_file(skill_file: Path) -> tuple[bool, dict, str]:
|
|
"""Read a SKILL.md once and return platform compatibility, frontmatter, and description.
|
|
|
|
Returns (is_compatible, frontmatter, description). On any error, returns
|
|
(True, {}, "") to err on the side of showing the skill.
|
|
"""
|
|
try:
|
|
raw = skill_file.read_text(encoding="utf-8")[:2000]
|
|
frontmatter, _ = parse_frontmatter(raw)
|
|
|
|
if not skill_matches_platform(frontmatter):
|
|
return False, frontmatter, ""
|
|
|
|
return True, frontmatter, extract_skill_description(frontmatter)
|
|
except Exception as e:
|
|
logger.debug("Failed to parse skill file %s: %s", skill_file, e)
|
|
return True, {}, ""
|
|
|
|
|
|
def _read_skill_conditions(skill_file: Path) -> dict:
|
|
"""Extract conditional activation fields from SKILL.md frontmatter."""
|
|
try:
|
|
raw = skill_file.read_text(encoding="utf-8")[:2000]
|
|
frontmatter, _ = parse_frontmatter(raw)
|
|
return extract_skill_conditions(frontmatter)
|
|
except Exception as e:
|
|
logger.debug("Failed to read skill conditions from %s: %s", skill_file, e)
|
|
return {}
|
|
|
|
|
|
def _skill_should_show(
|
|
conditions: dict,
|
|
available_tools: "set[str] | None",
|
|
available_toolsets: "set[str] | None",
|
|
) -> bool:
|
|
"""Return False if the skill's conditional activation rules exclude it."""
|
|
if available_tools is None and available_toolsets is None:
|
|
return True # No filtering info — show everything (backward compat)
|
|
|
|
at = available_tools or set()
|
|
ats = available_toolsets or set()
|
|
|
|
# fallback_for: hide when the primary tool/toolset IS available
|
|
for ts in conditions.get("fallback_for_toolsets", []):
|
|
if ts in ats:
|
|
return False
|
|
for t in conditions.get("fallback_for_tools", []):
|
|
if t in at:
|
|
return False
|
|
|
|
# requires: hide when a required tool/toolset is NOT available
|
|
for ts in conditions.get("requires_toolsets", []):
|
|
if ts not in ats:
|
|
return False
|
|
for t in conditions.get("requires_tools", []):
|
|
if t not in at:
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def build_skills_system_prompt(
|
|
available_tools: "set[str] | None" = None,
|
|
available_toolsets: "set[str] | None" = None,
|
|
) -> str:
|
|
"""Build a compact skill index for the system prompt.
|
|
|
|
Two-layer cache:
|
|
1. In-process LRU dict keyed by (skills_dir, tools, toolsets)
|
|
2. Disk snapshot (``.skills_prompt_snapshot.json``) validated by
|
|
mtime/size manifest — survives process restarts
|
|
|
|
Falls back to a full filesystem scan when both layers miss.
|
|
|
|
External skill directories (``skills.external_dirs`` in config.yaml) are
|
|
scanned alongside the local ``~/.hermes/skills/`` directory. External dirs
|
|
are read-only — they appear in the index but new skills are always created
|
|
in the local dir. Local skills take precedence when names collide.
|
|
"""
|
|
hermes_home = get_hermes_home()
|
|
skills_dir = hermes_home / "skills"
|
|
external_dirs = get_all_skills_dirs()[1:] # skip local (index 0)
|
|
|
|
if not skills_dir.exists() and not external_dirs:
|
|
return ""
|
|
|
|
# ── Layer 1: in-process LRU cache ─────────────────────────────────
|
|
cache_key = (
|
|
str(skills_dir.resolve()),
|
|
tuple(str(d) for d in external_dirs),
|
|
tuple(sorted(str(t) for t in (available_tools or set()))),
|
|
tuple(sorted(str(ts) for ts in (available_toolsets or set()))),
|
|
)
|
|
with _SKILLS_PROMPT_CACHE_LOCK:
|
|
cached = _SKILLS_PROMPT_CACHE.get(cache_key)
|
|
if cached is not None:
|
|
_SKILLS_PROMPT_CACHE.move_to_end(cache_key)
|
|
return cached
|
|
|
|
disabled = get_disabled_skill_names()
|
|
|
|
# ── Layer 2: disk snapshot ────────────────────────────────────────
|
|
snapshot = _load_skills_snapshot(skills_dir)
|
|
|
|
skills_by_category: dict[str, list[tuple[str, str]]] = {}
|
|
category_descriptions: dict[str, str] = {}
|
|
|
|
if snapshot is not None:
|
|
# Fast path: use pre-parsed metadata from disk
|
|
for entry in snapshot.get("skills", []):
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
skill_name = entry.get("skill_name") or ""
|
|
category = entry.get("category") or "general"
|
|
frontmatter_name = entry.get("frontmatter_name") or skill_name
|
|
platforms = entry.get("platforms") or []
|
|
if not skill_matches_platform({"platforms": platforms}):
|
|
continue
|
|
if frontmatter_name in disabled or skill_name in disabled:
|
|
continue
|
|
if not _skill_should_show(
|
|
entry.get("conditions") or {},
|
|
available_tools,
|
|
available_toolsets,
|
|
):
|
|
continue
|
|
skills_by_category.setdefault(category, []).append(
|
|
(skill_name, entry.get("description", ""))
|
|
)
|
|
category_descriptions = {
|
|
str(k): str(v)
|
|
for k, v in (snapshot.get("category_descriptions") or {}).items()
|
|
}
|
|
else:
|
|
# Cold path: full filesystem scan + write snapshot for next time
|
|
skill_entries: list[dict] = []
|
|
for skill_file in iter_skill_index_files(skills_dir, "SKILL.md"):
|
|
is_compatible, frontmatter, desc = _parse_skill_file(skill_file)
|
|
entry = _build_snapshot_entry(skill_file, skills_dir, frontmatter, desc)
|
|
skill_entries.append(entry)
|
|
if not is_compatible:
|
|
continue
|
|
skill_name = entry["skill_name"]
|
|
if entry["frontmatter_name"] in disabled or skill_name in disabled:
|
|
continue
|
|
if not _skill_should_show(
|
|
extract_skill_conditions(frontmatter),
|
|
available_tools,
|
|
available_toolsets,
|
|
):
|
|
continue
|
|
skills_by_category.setdefault(entry["category"], []).append(
|
|
(skill_name, entry["description"])
|
|
)
|
|
|
|
# Read category-level DESCRIPTION.md files
|
|
for desc_file in iter_skill_index_files(skills_dir, "DESCRIPTION.md"):
|
|
try:
|
|
content = desc_file.read_text(encoding="utf-8")
|
|
fm, _ = parse_frontmatter(content)
|
|
cat_desc = fm.get("description")
|
|
if not cat_desc:
|
|
continue
|
|
rel = desc_file.relative_to(skills_dir)
|
|
cat = "/".join(rel.parts[:-1]) if len(rel.parts) > 1 else "general"
|
|
category_descriptions[cat] = str(cat_desc).strip().strip("'\"")
|
|
except Exception as e:
|
|
logger.debug("Could not read skill description %s: %s", desc_file, e)
|
|
|
|
_write_skills_snapshot(
|
|
skills_dir,
|
|
_build_skills_manifest(skills_dir),
|
|
skill_entries,
|
|
category_descriptions,
|
|
)
|
|
|
|
# ── External skill directories ─────────────────────────────────────
|
|
# Scan external dirs directly (no snapshot caching — they're read-only
|
|
# and typically small). Local skills already in skills_by_category take
|
|
# precedence: we track seen names and skip duplicates from external dirs.
|
|
seen_skill_names: set[str] = set()
|
|
for cat_skills in skills_by_category.values():
|
|
for name, _desc in cat_skills:
|
|
seen_skill_names.add(name)
|
|
|
|
for ext_dir in external_dirs:
|
|
if not ext_dir.exists():
|
|
continue
|
|
for skill_file in iter_skill_index_files(ext_dir, "SKILL.md"):
|
|
try:
|
|
is_compatible, frontmatter, desc = _parse_skill_file(skill_file)
|
|
if not is_compatible:
|
|
continue
|
|
entry = _build_snapshot_entry(skill_file, ext_dir, frontmatter, desc)
|
|
skill_name = entry["skill_name"]
|
|
if skill_name in seen_skill_names:
|
|
continue
|
|
if entry["frontmatter_name"] in disabled or skill_name in disabled:
|
|
continue
|
|
if not _skill_should_show(
|
|
extract_skill_conditions(frontmatter),
|
|
available_tools,
|
|
available_toolsets,
|
|
):
|
|
continue
|
|
seen_skill_names.add(skill_name)
|
|
skills_by_category.setdefault(entry["category"], []).append(
|
|
(skill_name, entry["description"])
|
|
)
|
|
except Exception as e:
|
|
logger.debug("Error reading external skill %s: %s", skill_file, e)
|
|
|
|
# External category descriptions
|
|
for desc_file in iter_skill_index_files(ext_dir, "DESCRIPTION.md"):
|
|
try:
|
|
content = desc_file.read_text(encoding="utf-8")
|
|
fm, _ = parse_frontmatter(content)
|
|
cat_desc = fm.get("description")
|
|
if not cat_desc:
|
|
continue
|
|
rel = desc_file.relative_to(ext_dir)
|
|
cat = "/".join(rel.parts[:-1]) if len(rel.parts) > 1 else "general"
|
|
category_descriptions.setdefault(cat, str(cat_desc).strip().strip("'\""))
|
|
except Exception as e:
|
|
logger.debug("Could not read external skill description %s: %s", desc_file, e)
|
|
|
|
if not skills_by_category:
|
|
result = ""
|
|
else:
|
|
index_lines = []
|
|
for category in sorted(skills_by_category.keys()):
|
|
cat_desc = category_descriptions.get(category, "")
|
|
if cat_desc:
|
|
index_lines.append(f" {category}: {cat_desc}")
|
|
else:
|
|
index_lines.append(f" {category}:")
|
|
# Deduplicate and sort skills within each category
|
|
seen = set()
|
|
for name, desc in sorted(skills_by_category[category], key=lambda x: x[0]):
|
|
if name in seen:
|
|
continue
|
|
seen.add(name)
|
|
if desc:
|
|
index_lines.append(f" - {name}: {desc}")
|
|
else:
|
|
index_lines.append(f" - {name}")
|
|
|
|
result = (
|
|
"## Skills (mandatory)\n"
|
|
"Before replying, scan the skills below. If one clearly matches your task, "
|
|
"load it with skill_view(name) and follow its instructions. "
|
|
"If a skill has issues, fix it with skill_manage(action='patch').\n"
|
|
"After difficult/iterative tasks, offer to save as a skill. "
|
|
"If a skill you loaded was missing steps, had wrong commands, or needed "
|
|
"pitfalls you discovered, update it before finishing.\n"
|
|
"\n"
|
|
"<available_skills>\n"
|
|
+ "\n".join(index_lines) + "\n"
|
|
"</available_skills>\n"
|
|
"\n"
|
|
"If none match, proceed normally without loading a skill."
|
|
)
|
|
|
|
# ── Store in LRU cache ────────────────────────────────────────────
|
|
with _SKILLS_PROMPT_CACHE_LOCK:
|
|
_SKILLS_PROMPT_CACHE[cache_key] = result
|
|
_SKILLS_PROMPT_CACHE.move_to_end(cache_key)
|
|
while len(_SKILLS_PROMPT_CACHE) > _SKILLS_PROMPT_CACHE_MAX:
|
|
_SKILLS_PROMPT_CACHE.popitem(last=False)
|
|
|
|
return result
|
|
|
|
|
|
# =========================================================================
|
|
# Context files (SOUL.md, AGENTS.md, .cursorrules)
|
|
# =========================================================================
|
|
|
|
def _truncate_content(content: str, filename: str, max_chars: int = CONTEXT_FILE_MAX_CHARS) -> str:
|
|
"""Head/tail truncation with a marker in the middle."""
|
|
if len(content) <= max_chars:
|
|
return content
|
|
head_chars = int(max_chars * CONTEXT_TRUNCATE_HEAD_RATIO)
|
|
tail_chars = int(max_chars * CONTEXT_TRUNCATE_TAIL_RATIO)
|
|
head = content[:head_chars]
|
|
tail = content[-tail_chars:]
|
|
marker = f"\n\n[...truncated {filename}: kept {head_chars}+{tail_chars} of {len(content)} chars. Use file tools to read the full file.]\n\n"
|
|
return head + marker + tail
|
|
|
|
|
|
def load_soul_md() -> Optional[str]:
|
|
"""Load SOUL.md from HERMES_HOME and return its content, or None.
|
|
|
|
Used as the agent identity (slot #1 in the system prompt). When this
|
|
returns content, ``build_context_files_prompt`` should be called with
|
|
``skip_soul=True`` so SOUL.md isn't injected twice.
|
|
"""
|
|
try:
|
|
from hermes_cli.config import ensure_hermes_home
|
|
ensure_hermes_home()
|
|
except Exception as e:
|
|
logger.debug("Could not ensure HERMES_HOME before loading SOUL.md: %s", e)
|
|
|
|
soul_path = get_hermes_home() / "SOUL.md"
|
|
if not soul_path.exists():
|
|
return None
|
|
try:
|
|
content = soul_path.read_text(encoding="utf-8").strip()
|
|
if not content:
|
|
return None
|
|
content = _scan_context_content(content, "SOUL.md")
|
|
content = _truncate_content(content, "SOUL.md")
|
|
return content
|
|
except Exception as e:
|
|
logger.debug("Could not read SOUL.md from %s: %s", soul_path, e)
|
|
return None
|
|
|
|
|
|
def _load_hermes_md(cwd_path: Path) -> str:
|
|
""".hermes.md / HERMES.md — walk to git root."""
|
|
hermes_md_path = _find_hermes_md(cwd_path)
|
|
if not hermes_md_path:
|
|
return ""
|
|
try:
|
|
content = hermes_md_path.read_text(encoding="utf-8").strip()
|
|
if not content:
|
|
return ""
|
|
content = _strip_yaml_frontmatter(content)
|
|
rel = hermes_md_path.name
|
|
try:
|
|
rel = str(hermes_md_path.relative_to(cwd_path))
|
|
except ValueError:
|
|
pass
|
|
content = _scan_context_content(content, rel)
|
|
result = f"## {rel}\n\n{content}"
|
|
return _truncate_content(result, ".hermes.md")
|
|
except Exception as e:
|
|
logger.debug("Could not read %s: %s", hermes_md_path, e)
|
|
return ""
|
|
|
|
|
|
def _load_agents_md(cwd_path: Path) -> str:
|
|
"""AGENTS.md — top-level only (no recursive walk)."""
|
|
for name in ["AGENTS.md", "agents.md"]:
|
|
candidate = cwd_path / name
|
|
if candidate.exists():
|
|
try:
|
|
content = candidate.read_text(encoding="utf-8").strip()
|
|
if content:
|
|
content = _scan_context_content(content, name)
|
|
result = f"## {name}\n\n{content}"
|
|
return _truncate_content(result, "AGENTS.md")
|
|
except Exception as e:
|
|
logger.debug("Could not read %s: %s", candidate, e)
|
|
return ""
|
|
|
|
|
|
def _load_claude_md(cwd_path: Path) -> str:
|
|
"""CLAUDE.md / claude.md — cwd only."""
|
|
for name in ["CLAUDE.md", "claude.md"]:
|
|
candidate = cwd_path / name
|
|
if candidate.exists():
|
|
try:
|
|
content = candidate.read_text(encoding="utf-8").strip()
|
|
if content:
|
|
content = _scan_context_content(content, name)
|
|
result = f"## {name}\n\n{content}"
|
|
return _truncate_content(result, "CLAUDE.md")
|
|
except Exception as e:
|
|
logger.debug("Could not read %s: %s", candidate, e)
|
|
return ""
|
|
|
|
|
|
def _load_cursorrules(cwd_path: Path) -> str:
|
|
""".cursorrules + .cursor/rules/*.mdc — cwd only."""
|
|
cursorrules_content = ""
|
|
cursorrules_file = cwd_path / ".cursorrules"
|
|
if cursorrules_file.exists():
|
|
try:
|
|
content = cursorrules_file.read_text(encoding="utf-8").strip()
|
|
if content:
|
|
content = _scan_context_content(content, ".cursorrules")
|
|
cursorrules_content += f"## .cursorrules\n\n{content}\n\n"
|
|
except Exception as e:
|
|
logger.debug("Could not read .cursorrules: %s", e)
|
|
|
|
cursor_rules_dir = cwd_path / ".cursor" / "rules"
|
|
if cursor_rules_dir.exists() and cursor_rules_dir.is_dir():
|
|
mdc_files = sorted(cursor_rules_dir.glob("*.mdc"))
|
|
for mdc_file in mdc_files:
|
|
try:
|
|
content = mdc_file.read_text(encoding="utf-8").strip()
|
|
if content:
|
|
content = _scan_context_content(content, f".cursor/rules/{mdc_file.name}")
|
|
cursorrules_content += f"## .cursor/rules/{mdc_file.name}\n\n{content}\n\n"
|
|
except Exception as e:
|
|
logger.debug("Could not read %s: %s", mdc_file, e)
|
|
|
|
if not cursorrules_content:
|
|
return ""
|
|
return _truncate_content(cursorrules_content, ".cursorrules")
|
|
|
|
|
|
def build_context_files_prompt(cwd: Optional[str] = None, skip_soul: bool = False) -> str:
|
|
"""Discover and load context files for the system prompt.
|
|
|
|
Priority (first found wins — only ONE project context type is loaded):
|
|
1. .hermes.md / HERMES.md (walk to git root)
|
|
2. AGENTS.md / agents.md (cwd only)
|
|
3. CLAUDE.md / claude.md (cwd only)
|
|
4. .cursorrules / .cursor/rules/*.mdc (cwd only)
|
|
|
|
SOUL.md from HERMES_HOME is independent and always included when present.
|
|
Each context source is capped at 20,000 chars.
|
|
|
|
When *skip_soul* is True, SOUL.md is not included here (it was already
|
|
loaded via ``load_soul_md()`` for the identity slot).
|
|
"""
|
|
if cwd is None:
|
|
cwd = os.getcwd()
|
|
|
|
cwd_path = Path(cwd).resolve()
|
|
sections = []
|
|
|
|
# Priority-based project context: first match wins
|
|
project_context = (
|
|
_load_hermes_md(cwd_path)
|
|
or _load_agents_md(cwd_path)
|
|
or _load_claude_md(cwd_path)
|
|
or _load_cursorrules(cwd_path)
|
|
)
|
|
if project_context:
|
|
sections.append(project_context)
|
|
|
|
# SOUL.md from HERMES_HOME only — skip when already loaded as identity
|
|
if not skip_soul:
|
|
soul_content = load_soul_md()
|
|
if soul_content:
|
|
sections.append(soul_content)
|
|
|
|
if not sections:
|
|
return ""
|
|
return "# Project Context\n\nThe following project context files have been loaded and should be followed:\n\n" + "\n".join(sections)
|