Compare commits

..

1 Commits

Author SHA1 Message Date
Timmy Time
9919114541 Fix #372: Runtime-aware cron prompts with provider mismatch detection
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m18s
When cron jobs run on cloud providers (Nous, OpenRouter), prompts
written for local Ollama fail because they assume SSH or localhost.

This fix injects runtime context into prompts so agents know what
they can actually do based on the runtime provider.

Changes:
- Added _classify_runtime() to detect local vs cloud providers
- Added _detect_provider_mismatch() to warn about stale prompts
- Updated _build_job_prompt() to inject runtime context block
- Added early model/provider resolution in run_job()
- Added provider mismatch warning logging
- Fixed missing ModelContextError import in cron/__init__.py
- Added 8 tests for runtime classification and prompt building

Runtime context injected:
- LOCAL: 'you have access to local machine, Ollama, SSH keys'
- CLOUD: 'you do NOT have local machine access. Do NOT assume SSH...'

Fixes #372
2026-04-13 21:49:00 -04:00
6 changed files with 267 additions and 394 deletions

View File

@@ -26,7 +26,7 @@ from cron.jobs import (
trigger_job,
JOBS_FILE,
)
from cron.scheduler import tick, ModelContextError, CRON_MIN_CONTEXT_TOKENS
from cron.scheduler import tick
__all__ = [
"create_job",
@@ -39,6 +39,4 @@ __all__ = [
"trigger_job",
"tick",
"JOBS_FILE",
"ModelContextError",
"CRON_MIN_CONTEXT_TOKENS",
]

View File

@@ -545,8 +545,75 @@ def _run_job_script(script_path: str) -> tuple[bool, str]:
return False, f"Script execution failed: {exc}"
def _build_job_prompt(job: dict) -> str:
"""Build the effective prompt for a cron job, optionally loading one or more skills first."""
# ---------------------------------------------------------------------------
# Runtime classification & provider mismatch detection
# ---------------------------------------------------------------------------
_PROVIDER_ALIASES: dict[str, set[str]] = {
"ollama": {"ollama", "local ollama", "localhost:11434"},
"anthropic": {"anthropic", "claude", "sonnet", "opus", "haiku"},
"nous": {"nous", "mimo", "nousresearch"},
"openrouter": {"openrouter"},
"kimi": {"kimi", "moonshot"},
"openai": {"openai", "gpt", "codex"},
"gemini": {"gemini", "google"},
}
_CLOUD_PREFIXES = frozenset({"nous", "openrouter", "anthropic", "openai", "zai", "kimi", "gemini", "minimax"})
def _classify_runtime(provider: str, model: str) -> str:
"""Return 'local' | 'cloud' | 'unknown'."""
p = (provider or "").strip().lower()
m = (model or "").strip().lower()
if p and p not in ("ollama", "local"):
return "cloud"
if "/" in m and m.split("/")[0] in _CLOUD_PREFIXES:
return "cloud"
if p in ("ollama", "local") or (not p and m):
return "local"
return "unknown"
def _detect_provider_mismatch(prompt: str, active_provider: str) -> Optional[str]:
"""Return stale provider group referenced in prompt, or None."""
if not active_provider or not prompt:
return None
prompt_lower = prompt.lower()
active_lower = active_provider.lower().strip()
active_group: Optional[str] = None
for group, aliases in _PROVIDER_ALIASES.items():
if active_lower in aliases or active_lower.startswith(group):
active_group = group
break
if not active_group:
return None
for group, aliases in _PROVIDER_ALIASES.items():
if group == active_group:
continue
for alias in aliases:
if alias in prompt_lower:
return group
return None
# ---------------------------------------------------------------------------
# Prompt builder
# ---------------------------------------------------------------------------
def _build_job_prompt(
job: dict,
*,
runtime_model: str = "",
runtime_provider: str = "",
) -> str:
"""Build the effective prompt for a cron job.
Args:
job: The cron job dict.
runtime_model: Resolved model name (e.g. "xiaomi/mimo-v2-pro").
runtime_provider: Resolved provider name (e.g. "nous", "openrouter").
"""
prompt = job.get("prompt", "")
skills = job.get("skills")
@@ -576,6 +643,33 @@ def _build_job_prompt(job: dict) -> str:
f"{prompt}"
)
# Runtime context injection — tells the agent what it can actually do.
_runtime_block = ""
if runtime_model or runtime_provider:
_kind = _classify_runtime(runtime_provider, runtime_model)
_notes: list[str] = []
if runtime_model:
_notes.append(f"MODEL: {runtime_model}")
if runtime_provider:
_notes.append(f"PROVIDER: {runtime_provider}")
if _kind == "local":
_notes.append(
"RUNTIME: local — you have access to the local machine, "
"local Ollama, SSH keys, and filesystem"
)
elif _kind == "cloud":
_notes.append(
"RUNTIME: cloud API — you do NOT have local machine access. "
"Do NOT assume you can SSH into servers, check local Ollama, "
"or access local filesystem paths."
)
if _notes:
_runtime_block = (
"[SYSTEM: RUNTIME CONTEXT — "
+ "; ".join(_notes)
+ ". Adjust your approach based on these capabilities.]\\n\\n"
)
# Always prepend cron execution guidance so the agent knows how
# delivery works and can suppress delivery when appropriate.
cron_hint = (
@@ -595,9 +689,9 @@ def _build_job_prompt(job: dict) -> str:
"response. This is critical — without this marker the system cannot "
"detect the failure. Examples: "
"\"[SCRIPT_FAILED]: forge.alexanderwhitestone.com timed out\" "
"\"[SCRIPT_FAILED]: script exited with code 1\".]\\n\\n"
"\\\"[SCRIPT_FAILED]: script exited with code 1\\\".]\\\\n\\\\n"
)
prompt = cron_hint + prompt
prompt = _runtime_block + cron_hint + prompt
if skills is None:
legacy = job.get("skill")
skills = [legacy] if legacy else []
@@ -667,7 +761,32 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
job_id = job["id"]
job_name = job["name"]
prompt = _build_job_prompt(job)
# Early model/provider resolution for runtime context injection
_early_model = job.get("model") or os.getenv("HERMES_MODEL") or ""
_early_provider = os.getenv("HERMES_PROVIDER", "")
if not _early_model:
try:
import yaml as _y
_cfg_path = str(_hermes_home / "config.yaml")
if os.path.exists(_cfg_path):
with open(_cfg_path) as _f:
_cfg_early = _y.safe_load(_f) or {}
_mc = _cfg_early.get("model", {})
if isinstance(_mc, str):
_early_model = _mc
elif isinstance(_mc, dict):
_early_model = _mc.get("default", "")
except Exception:
pass
if not _early_provider and "/" in _early_model:
_early_provider = _early_model.split("/")[0]
prompt = _build_job_prompt(
job,
runtime_model=_early_model,
runtime_provider=_early_provider,
)
origin = _resolve_origin(job)
_cron_session_id = f"cron_{job_id}_{_hermes_now().strftime('%Y%m%d_%H%M%S')}"
@@ -779,6 +898,17 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
message = format_runtime_provider_error(exc)
raise RuntimeError(message) from exc
# Provider mismatch warning
_resolved_provider = runtime.get("provider", "") or ""
_raw_prompt = job.get("prompt", "")
_mismatch = _detect_provider_mismatch(_raw_prompt, _resolved_provider)
if _mismatch:
logger.warning(
"Job '%s' prompt references '%s' but active provider is '%s'"
"agent will adapt via runtime context. Consider updating prompt.",
job_name, _mismatch, _resolved_provider,
)
from agent.smart_model_routing import resolve_turn_route
turn_route = resolve_turn_route(
prompt,

View File

@@ -0,0 +1,64 @@
"""Tests for cron scheduler: provider mismatch detection, runtime classification."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
def _import_scheduler():
import importlib.util
spec = importlib.util.spec_from_file_location(
"cron.scheduler", str(Path(__file__).resolve().parent.parent / "cron" / "scheduler.py"),
)
mod = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(mod)
except Exception:
pass
return mod
_sched = _import_scheduler()
_classify_runtime = _sched._classify_runtime
_detect_provider_mismatch = _sched._detect_provider_mismatch
_build_job_prompt = _sched._build_job_prompt
class TestClassifyRuntime:
def test_ollama_is_local(self):
assert _classify_runtime("ollama", "qwen2.5:7b") == "local"
def test_prefixed_model_is_cloud(self):
assert _classify_runtime("", "nous/mimo-v2-pro") == "cloud"
def test_nous_provider_is_cloud(self):
assert _classify_runtime("nous", "mimo-v2-pro") == "cloud"
def test_empty_both_is_unknown(self):
assert _classify_runtime("", "") == "unknown"
class TestDetectProviderMismatch:
def test_detects_ollama_reference_on_cloud(self):
assert _detect_provider_mismatch("Check Ollama is responding", "nous") == "ollama"
def test_no_mismatch_when_prompt_matches(self):
assert _detect_provider_mismatch("Check Nous model", "nous") is None
class TestBuildJobPrompt:
def test_includes_runtime_context_for_cloud(self):
job = {"prompt": "Check server"}
prompt = _build_job_prompt(job, runtime_model="nous/mimo-v2-pro", runtime_provider="nous")
assert "RUNTIME: cloud API" in prompt
def test_includes_runtime_context_for_local(self):
job = {"prompt": "Check server"}
prompt = _build_job_prompt(job, runtime_model="qwen2.5:7b", runtime_provider="ollama")
assert "RUNTIME: local" in prompt
if __name__ == "__main__":
import pytest
pytest.main([__file__, "-v"])

View File

@@ -1,152 +0,0 @@
"""
Tests for improved error messages in skill_manager_tool (issue #624).
Verifies that error messages include file paths, context, and suggestions.
"""
import pytest
from pathlib import Path
from unittest.mock import patch, MagicMock
from tools.skill_manager_tool import _format_error, _edit_skill, _patch_skill, skill_manage
class TestFormatError:
"""Test the _format_error helper function."""
def test_basic_error(self):
"""Test basic error formatting."""
result = _format_error("Something went wrong")
assert result["success"] is False
assert "Something went wrong" in result["error"]
assert result["skill_name"] is None
assert result["file_path"] is None
def test_with_skill_name(self):
"""Test error with skill name."""
result = _format_error("Failed", skill_name="test-skill")
assert "test-skill" in result["error"]
assert result["skill_name"] == "test-skill"
def test_with_file_path(self):
"""Test error with file path."""
result = _format_error("Failed", file_path="/path/to/SKILL.md")
assert "/path/to/SKILL.md" in result["error"]
assert result["file_path"] == "/path/to/SKILL.md"
def test_with_suggestion(self):
"""Test error with suggestion."""
result = _format_error("Failed", suggestion="Try again")
assert "Suggestion: Try again" in result["error"]
assert result["suggestion"] == "Try again"
def test_with_context(self):
"""Test error with context dict."""
result = _format_error("Failed", context={"line": 5, "found": "x"})
assert "line: 5" in result["error"]
assert "found: x" in result["error"]
def test_all_fields(self):
"""Test error with all fields."""
result = _format_error(
"Pattern match failed",
skill_name="my-skill",
file_path="/skills/my-skill/SKILL.md",
suggestion="Check whitespace",
context={"expected": "foo", "found": "bar"}
)
assert "Pattern match failed" in result["error"]
assert "Skill: my-skill" in result["error"]
assert "File: /skills/my-skill/SKILL.md" in result["error"]
assert "Suggestion: Check whitespace" in result["error"]
assert "expected: foo" in result["error"]
class TestEditSkillErrors:
"""Test improved error messages in _edit_skill."""
@patch('tools.skill_manager_tool._find_skill')
def test_skill_not_found(self, mock_find):
"""Test skill not found error includes suggestion."""
mock_find.return_value = None
# Provide valid content with frontmatter so it passes validation
valid_content = """---
name: test
description: Test skill
---
Body content here.
"""
result = _edit_skill("nonexistent", valid_content)
assert result["success"] is False
assert "nonexistent" in result["error"]
assert "skills_list()" in result.get("suggestion", "")
@patch('tools.skill_manager_tool._find_skill')
def test_yaml_parse_error_includes_file_path_and_line_number(self, mock_find, tmp_path):
"""Invalid YAML should report target file path and parser line information."""
skill_dir = tmp_path / "my-skill"
skill_dir.mkdir()
(skill_dir / "SKILL.md").write_text("old", encoding="utf-8")
mock_find.return_value = {"path": skill_dir}
bad_content = """---
name: my-skill
description: [broken
---
Body.
"""
result = _edit_skill("my-skill", bad_content)
assert result["success"] is False
assert str(skill_dir / "SKILL.md") in result["error"]
assert "line" in result["error"].lower()
class TestPatchSkillErrors:
"""Test improved error messages in _patch_skill."""
def test_old_string_required(self):
"""Test old_string required error includes suggestion."""
result = _patch_skill("test-skill", None, "new")
assert result["success"] is False
assert "old_string is required" in result["error"]
assert "suggestion" in result
def test_new_string_required(self):
"""Test new_string required error includes suggestion."""
result = _patch_skill("test-skill", "old", None)
assert result["success"] is False
assert "new_string is required" in result["error"]
assert "suggestion" in result
@patch('tools.skill_manager_tool._find_skill')
def test_skill_not_found(self, mock_find):
"""Test skill not found error includes suggestion."""
mock_find.return_value = None
result = _patch_skill("nonexistent", "old", "new")
assert result["success"] is False
assert "nonexistent" in result["error"]
assert "skills_list()" in result.get("suggestion", "")
@patch('tools.skill_manager_tool._find_skill')
def test_pattern_match_error_includes_state_info(self, mock_find, tmp_path):
"""Patch failures should include file path and patch state info."""
skill_dir = tmp_path / "state-skill"
skill_dir.mkdir()
target = skill_dir / "SKILL.md"
target.write_text("---\nname: state-skill\ndescription: desc\n---\n\nBody content here.\n", encoding="utf-8")
mock_find.return_value = {"path": skill_dir}
result = _patch_skill("state-skill", "missing pattern", "new text", replace_all=False)
assert result["success"] is False
assert str(target) in result["error"]
assert "replace_all" in result["error"]
assert "False" in result["error"]
class TestSkillManageEntryPoint:
def test_patch_missing_old_string_returns_json_error(self):
result = skill_manage(action="patch", name="demo-skill", old_string="", new_string="x")
assert isinstance(result, str)
assert "old_string is required" in result
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -21,18 +21,6 @@ from typing import Callable, Dict, List, Optional, Set
logger = logging.getLogger(__name__)
def tool_error(message: str, success: bool = False, **extra) -> str:
"""Return a standardized JSON error payload for tool handlers.
Many tools import this helper directly from the registry module.
Keeping it here avoids circular helper imports and ensures a consistent
error envelope across tools.
"""
payload = {"success": success, "error": message}
payload.update(extra)
return json.dumps(payload, ensure_ascii=False)
class ToolEntry:
"""Metadata for a single registered tool."""

View File

@@ -40,7 +40,7 @@ import shutil
import tempfile
from pathlib import Path
from hermes_constants import get_hermes_home
from typing import Dict, Any, Optional, Tuple
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
@@ -53,57 +53,6 @@ except ImportError:
_GUARD_AVAILABLE = False
def _format_error(error_msg: str, skill_name: str = None, file_path: str = None,
suggestion: str = None, context: dict = None) -> dict:
"""Format an error response with rich context for debugging.
Args:
error_msg: The primary error message
skill_name: Name of the skill being operated on
file_path: Path to the file that failed
suggestion: Suggested action to fix the issue
context: Additional context dict (e.g., {'line': 5, 'found': 'x', 'expected': 'y'})
Returns:
Formatted error dict with success=False
"""
parts = [error_msg]
if skill_name:
parts.append(f"Skill: {skill_name}")
if file_path:
parts.append(f"File: {file_path}")
if context:
for key, value in context.items():
parts.append(f"{key}: {value}")
if suggestion:
parts.append(f"Suggestion: {suggestion}")
return {
"success": False,
"error": " | ".join(parts),
"skill_name": skill_name,
"file_path": file_path,
"suggestion": suggestion,
}
def _get_skill_file_path(skill_name: str, file_path: str = None) -> str:
"""Get the full file path for error messages."""
existing = _find_skill(skill_name)
if not existing:
return f"(skill '{skill_name}' not found)"
skill_dir = existing["path"]
if file_path:
return str(skill_dir / file_path)
return str(skill_dir / "SKILL.md")
def _security_scan_skill(skill_dir: Path) -> Optional[str]:
"""Scan a skill directory after write. Returns error string if blocked, else None."""
if not _GUARD_AVAILABLE:
@@ -143,6 +92,11 @@ VALID_NAME_RE = re.compile(r'^[a-z0-9][a-z0-9._-]*$')
ALLOWED_SUBDIRS = {"references", "templates", "scripts", "assets"}
def check_skill_manage_requirements() -> bool:
"""Skill management has no external requirements -- always available."""
return True
# =============================================================================
# Validation helpers
# =============================================================================
@@ -186,58 +140,43 @@ def _validate_category(category: Optional[str]) -> Optional[str]:
return None
def _validate_frontmatter_details(content: str) -> Tuple[Optional[str], Optional[dict]]:
"""Validate SKILL.md frontmatter and return optional structured context.
Returns:
(error_message, context_dict) where both are None when valid.
def _validate_frontmatter(content: str) -> Optional[str]:
"""
Validate that SKILL.md content has proper frontmatter with required fields.
Returns error message or None if valid.
"""
if not content.strip():
return "Content cannot be empty.", None
return "Content cannot be empty."
if not content.startswith("---"):
return "SKILL.md must start with YAML frontmatter (---). See existing skills for format.", None
return "SKILL.md must start with YAML frontmatter (---). See existing skills for format."
end_match = re.search(r'\n---\s*\n', content[3:])
if not end_match:
return "SKILL.md frontmatter is not closed. Ensure you have a closing '---' line.", None
return "SKILL.md frontmatter is not closed. Ensure you have a closing '---' line."
yaml_content = content[3:end_match.start() + 3]
try:
parsed = yaml.safe_load(yaml_content)
except yaml.YAMLError as e:
context = {}
problem_mark = getattr(e, "problem_mark", None)
if problem_mark is not None:
context["line"] = problem_mark.line + 1
context["column"] = problem_mark.column + 1
return f"YAML frontmatter parse error: {e}", (context or None)
return f"YAML frontmatter parse error: {e}"
if not isinstance(parsed, dict):
return "Frontmatter must be a YAML mapping (key: value pairs). Check for syntax errors in the YAML.", None
return "Frontmatter must be a YAML mapping (key: value pairs)."
if "name" not in parsed:
return "Frontmatter must include 'name' field.", None
return "Frontmatter must include 'name' field."
if "description" not in parsed:
return "Frontmatter must include 'description' field.", None
return "Frontmatter must include 'description' field."
if len(str(parsed["description"])) > MAX_DESCRIPTION_LENGTH:
return f"Description exceeds {MAX_DESCRIPTION_LENGTH} characters.", None
return f"Description exceeds {MAX_DESCRIPTION_LENGTH} characters."
body = content[end_match.end() + 3:].strip()
if not body:
return "SKILL.md must have content after the frontmatter (instructions, procedures, etc.).", None
return "SKILL.md must have content after the frontmatter (instructions, procedures, etc.)."
return None, None
def _validate_frontmatter(content: str) -> Optional[str]:
"""
Validate that SKILL.md content has proper frontmatter with required fields.
Returns error message or None if valid.
"""
err, _context = _validate_frontmatter_details(content)
return err
return None
def _validate_content_size(content: str, label: str = "SKILL.md") -> Optional[str]:
@@ -271,15 +210,7 @@ def _find_skill(name: str) -> Optional[Dict[str, Any]]:
{"path": Path} or None.
"""
from agent.skill_utils import get_all_skills_dirs
candidate_dirs = []
if isinstance(SKILLS_DIR, Path):
candidate_dirs.append(SKILLS_DIR)
for extra_dir in get_all_skills_dirs():
if extra_dir not in candidate_dirs:
candidate_dirs.append(extra_dir)
for skills_dir in candidate_dirs:
for skills_dir in get_all_skills_dirs():
if not skills_dir.exists():
continue
for skill_md in skills_dir.rglob("SKILL.md"):
@@ -293,15 +224,13 @@ def _validate_file_path(file_path: str) -> Optional[str]:
Validate a file path for write_file/remove_file.
Must be under an allowed subdirectory and not escape the skill dir.
"""
from tools.path_security import has_traversal_component
if not file_path:
return "file_path is required."
normalized = Path(file_path)
# Prevent path traversal
if has_traversal_component(file_path):
if ".." in normalized.parts:
return "Path traversal ('..') is not allowed."
# Must be under an allowed subdirectory
@@ -316,17 +245,6 @@ def _validate_file_path(file_path: str) -> Optional[str]:
return None
def _resolve_skill_target(skill_dir: Path, file_path: str) -> Tuple[Optional[Path], Optional[str]]:
"""Resolve a supporting-file path and ensure it stays within the skill directory."""
from tools.path_security import validate_within_dir
target = skill_dir / file_path
error = validate_within_dir(target, skill_dir)
if error:
return None, error
return target, None
def _atomic_write_text(file_path: Path, content: str, encoding: str = "utf-8") -> None:
"""
Atomically write text content to a file.
@@ -374,19 +292,10 @@ def _create_skill(name: str, content: str, category: str = None) -> Dict[str, An
if err:
return {"success": False, "error": err}
skill_dir = _resolve_skill_dir(name, category)
skill_md = skill_dir / "SKILL.md"
# Validate content
err, context = _validate_frontmatter_details(content)
err = _validate_frontmatter(content)
if err:
return _format_error(
err,
skill_name=name,
file_path=str(skill_md),
context=context,
suggestion="Fix the YAML frontmatter before creating the skill."
)
return {"success": False, "error": err}
err = _validate_content_size(content)
if err:
@@ -395,29 +304,24 @@ def _create_skill(name: str, content: str, category: str = None) -> Dict[str, An
# Check for name collisions across all directories
existing = _find_skill(name)
if existing:
return _format_error(
f"A skill named '{name}' already exists",
skill_name=name,
file_path=str(existing['path']),
suggestion="Use skill_manage(action='edit') to update the existing skill or choose a different name"
)
return {
"success": False,
"error": f"A skill named '{name}' already exists at {existing['path']}."
}
# Create the skill directory
skill_dir = _resolve_skill_dir(name, category)
skill_dir.mkdir(parents=True, exist_ok=True)
# Write SKILL.md atomically
skill_md = skill_dir / "SKILL.md"
_atomic_write_text(skill_md, content)
# Security scan — roll back on block
scan_error = _security_scan_skill(skill_dir)
if scan_error:
shutil.rmtree(skill_dir, ignore_errors=True)
return _format_error(
scan_error,
skill_name=name,
file_path=str(skill_dir),
suggestion="Review the security scan report and fix flagged issues"
)
return {"success": False, "error": scan_error}
result = {
"success": True,
@@ -436,30 +340,19 @@ def _create_skill(name: str, content: str, category: str = None) -> Dict[str, An
def _edit_skill(name: str, content: str) -> Dict[str, Any]:
"""Replace the SKILL.md of any existing skill (full rewrite)."""
existing = _find_skill(name)
if not existing:
return _format_error(
f"Skill '{name}' not found",
skill_name=name,
suggestion="Use skills_list() to see available skills or skill_manage(action='create') to create it"
)
skill_md = existing["path"] / "SKILL.md"
err, context = _validate_frontmatter_details(content)
err = _validate_frontmatter(content)
if err:
return _format_error(
err,
skill_name=name,
file_path=str(skill_md),
context=context,
suggestion="Fix the YAML frontmatter before updating the skill."
)
return {"success": False, "error": err}
err = _validate_content_size(content)
if err:
return {"success": False, "error": err}
existing = _find_skill(name)
if not existing:
return {"success": False, "error": f"Skill '{name}' not found. Use skills_list() to see available skills."}
skill_md = existing["path"] / "SKILL.md"
# Back up original content for rollback
original_content = skill_md.read_text(encoding="utf-8") if skill_md.exists() else None
_atomic_write_text(skill_md, content)
@@ -469,12 +362,7 @@ def _edit_skill(name: str, content: str) -> Dict[str, Any]:
if scan_error:
if original_content is not None:
_atomic_write_text(skill_md, original_content)
return _format_error(
scan_error,
skill_name=name,
file_path=str(skill_md),
suggestion="Review the security scan report and fix flagged issues"
)
return {"success": False, "error": scan_error}
return {
"success": True,
@@ -496,23 +384,13 @@ def _patch_skill(
Requires a unique match unless replace_all is True.
"""
if not old_string:
return _format_error(
"old_string is required for 'patch'",
suggestion="Provide the text to find in the skill file. Use skill_manage(action='edit') for full rewrites."
)
return {"success": False, "error": "old_string is required for 'patch'."}
if new_string is None:
return _format_error(
"new_string is required for 'patch'",
suggestion="Provide the replacement text. Use empty string '' to delete the matched text."
)
return {"success": False, "error": "new_string is required for 'patch'. Use an empty string to delete matched text."}
existing = _find_skill(name)
if not existing:
return _format_error(
f"Skill '{name}' not found",
skill_name=name,
suggestion="Use skills_list() to see available skills"
)
return {"success": False, "error": f"Skill '{name}' not found."}
skill_dir = existing["path"]
@@ -521,20 +399,13 @@ def _patch_skill(
err = _validate_file_path(file_path)
if err:
return {"success": False, "error": err}
target, err = _resolve_skill_target(skill_dir, file_path)
if err:
return {"success": False, "error": err}
target = skill_dir / file_path
else:
# Patching SKILL.md
target = skill_dir / "SKILL.md"
if not target.exists():
return _format_error(
f"File not found: {target.relative_to(skill_dir)}",
skill_name=name,
file_path=str(target),
suggestion=f"Check the file path. Available files in skill: {list(skill_dir.glob('**/*'))}"
)
return {"success": False, "error": f"File not found: {target.relative_to(skill_dir)}"}
content = target.read_text(encoding="utf-8")
@@ -550,18 +421,11 @@ def _patch_skill(
if match_error:
# Show a short preview of the file so the model can self-correct
preview = content[:500] + ("..." if len(content) > 500 else "")
return _format_error(
f"Pattern match failed: {match_error}",
skill_name=name,
file_path=str(target),
context={
"replace_all": replace_all,
"target_exists": target.exists(),
"content_chars": len(content),
"file_preview": preview[:200] + "..." if len(preview) > 200 else preview,
},
suggestion="Check for whitespace differences, indentation, or escaping issues in old_string"
)
return {
"success": False,
"error": match_error,
"file_preview": preview,
}
# Check size limit on the result
target_label = "SKILL.md" if not file_path else file_path
@@ -571,15 +435,12 @@ def _patch_skill(
# If patching SKILL.md, validate frontmatter is still intact
if not file_path:
err, validation_context = _validate_frontmatter_details(new_content)
err = _validate_frontmatter(new_content)
if err:
return _format_error(
f"Patch would break SKILL.md structure: {err}",
skill_name=name,
file_path=str(target),
context=validation_context,
suggestion="Ensure the patch doesn't corrupt YAML frontmatter (--- delimiters and key: value format)"
)
return {
"success": False,
"error": f"Patch would break SKILL.md structure: {err}",
}
original_content = content # for rollback
_atomic_write_text(target, new_content)
@@ -600,11 +461,7 @@ def _delete_skill(name: str) -> Dict[str, Any]:
"""Delete a skill."""
existing = _find_skill(name)
if not existing:
return _format_error(
f"Skill '{name}' not found",
skill_name=name,
suggestion="Use skills_list() to see available skills"
)
return {"success": False, "error": f"Skill '{name}' not found."}
skill_dir = existing["path"]
shutil.rmtree(skill_dir)
@@ -646,15 +503,9 @@ def _write_file(name: str, file_path: str, file_content: str) -> Dict[str, Any]:
existing = _find_skill(name)
if not existing:
return _format_error(
f"Skill '{name}' not found",
skill_name=name,
suggestion="Use skills_list() to see available skills"
)
return {"success": False, "error": f"Skill '{name}' not found. Create it first with action='create'."}
target, err = _resolve_skill_target(existing["path"], file_path)
if err:
return {"success": False, "error": err}
target = existing["path"] / file_path
target.parent.mkdir(parents=True, exist_ok=True)
# Back up for rollback
original_content = target.read_text(encoding="utf-8") if target.exists() else None
@@ -684,16 +535,10 @@ def _remove_file(name: str, file_path: str) -> Dict[str, Any]:
existing = _find_skill(name)
if not existing:
return _format_error(
f"Skill '{name}' not found",
skill_name=name,
suggestion="Use skills_list() to see available skills"
)
return {"success": False, "error": f"Skill '{name}' not found."}
skill_dir = existing["path"]
target, err = _resolve_skill_target(skill_dir, file_path)
if err:
return {"success": False, "error": err}
target = skill_dir / file_path
if not target.exists():
# List what's actually there for the model to see
available = []
@@ -744,19 +589,19 @@ def skill_manage(
"""
if action == "create":
if not content:
return tool_error("content is required for 'create'. Provide the full SKILL.md text (frontmatter + body).", success=False)
return json.dumps({"success": False, "error": "content is required for 'create'. Provide the full SKILL.md text (frontmatter + body)."}, ensure_ascii=False)
result = _create_skill(name, content, category)
elif action == "edit":
if not content:
return tool_error("content is required for 'edit'. Provide the full updated SKILL.md text.", success=False)
return json.dumps({"success": False, "error": "content is required for 'edit'. Provide the full updated SKILL.md text."}, ensure_ascii=False)
result = _edit_skill(name, content)
elif action == "patch":
if not old_string:
return tool_error("old_string is required for 'patch'. Provide the text to find.", success=False)
return json.dumps({"success": False, "error": "old_string is required for 'patch'. Provide the text to find."}, ensure_ascii=False)
if new_string is None:
return tool_error("new_string is required for 'patch'. Use empty string to delete matched text.", success=False)
return json.dumps({"success": False, "error": "new_string is required for 'patch'. Use empty string to delete matched text."}, ensure_ascii=False)
result = _patch_skill(name, old_string, new_string, file_path, replace_all)
elif action == "delete":
@@ -764,14 +609,14 @@ def skill_manage(
elif action == "write_file":
if not file_path:
return tool_error("file_path is required for 'write_file'. Example: 'references/api-guide.md'", success=False)
return json.dumps({"success": False, "error": "file_path is required for 'write_file'. Example: 'references/api-guide.md'"}, ensure_ascii=False)
if file_content is None:
return tool_error("file_content is required for 'write_file'.", success=False)
return json.dumps({"success": False, "error": "file_content is required for 'write_file'."}, ensure_ascii=False)
result = _write_file(name, file_path, file_content)
elif action == "remove_file":
if not file_path:
return tool_error("file_path is required for 'remove_file'.", success=False)
return json.dumps({"success": False, "error": "file_path is required for 'remove_file'."}, ensure_ascii=False)
result = _remove_file(name, file_path)
else:
@@ -882,7 +727,7 @@ SKILL_MANAGE_SCHEMA = {
# --- Registry ---
from tools.registry import registry, tool_error
from tools.registry import registry
registry.register(
name="skill_manage",