feat(skills): add update checks and well-known support

Round out the skills hub integration with:
- richer skills.sh metadata and security surfacing during inspect/install
- generic check/update flows for hub-installed skills
- support for well-known Agent Skills endpoints via /.well-known/skills/index.json

Also persist upstream bundle metadata in the lock file and add
regression coverage plus live-compatible path handling for both
skills.sh aliases and well-known endpoints.
This commit is contained in:
teknium1
2026-03-14 08:21:16 -07:00
parent 02c307b004
commit 43d25af964
5 changed files with 729 additions and 27 deletions

View File

@@ -2682,7 +2682,7 @@ For more help on a command:
skills_parser = subparsers.add_parser(
"skills",
help="Search, install, configure, and manage skills",
description="Search, install, inspect, audit, configure, and manage skills from skills.sh, GitHub, ClawHub, and other registries."
description="Search, install, inspect, audit, configure, and manage skills from skills.sh, well-known agent skill endpoints, GitHub, ClawHub, and other registries."
)
skills_subparsers = skills_parser.add_subparsers(dest="skills_action")
@@ -2690,12 +2690,12 @@ For more help on a command:
skills_browse.add_argument("--page", type=int, default=1, help="Page number (default: 1)")
skills_browse.add_argument("--size", type=int, default=20, help="Results per page (default: 20)")
skills_browse.add_argument("--source", default="all",
choices=["all", "official", "skills-sh", "github", "clawhub", "lobehub"],
choices=["all", "official", "skills-sh", "well-known", "github", "clawhub", "lobehub"],
help="Filter by source (default: all)")
skills_search = skills_subparsers.add_parser("search", help="Search skill registries")
skills_search.add_argument("query", help="Search query")
skills_search.add_argument("--source", default="all", choices=["all", "official", "skills-sh", "github", "clawhub", "lobehub"])
skills_search.add_argument("--source", default="all", choices=["all", "official", "skills-sh", "well-known", "github", "clawhub", "lobehub"])
skills_search.add_argument("--limit", type=int, default=10, help="Max results")
skills_install = skills_subparsers.add_parser("install", help="Install a skill")
@@ -2709,6 +2709,12 @@ For more help on a command:
skills_list = skills_subparsers.add_parser("list", help="List installed skills")
skills_list.add_argument("--source", default="all", choices=["all", "hub", "builtin", "local"])
skills_check = skills_subparsers.add_parser("check", help="Check installed hub skills for updates")
skills_check.add_argument("name", nargs="?", help="Specific skill to check (default: all)")
skills_update = skills_subparsers.add_parser("update", help="Update installed hub skills")
skills_update.add_argument("name", nargs="?", help="Specific skill to update (default: all outdated skills)")
skills_audit = skills_subparsers.add_parser("audit", help="Re-scan installed hub skills")
skills_audit.add_argument("name", nargs="?", help="Specific skill to audit (default: all)")

View File

@@ -13,7 +13,7 @@ handler are thin wrappers that parse args and delegate.
import json
import shutil
from pathlib import Path
from typing import Optional
from typing import Any, Dict, Optional
from rich.console import Console
from rich.panel import Panel
@@ -76,6 +76,70 @@ def _resolve_short_name(name: str, sources, console: Console) -> str:
return ""
def _format_extra_metadata_lines(extra: Dict[str, Any]) -> list[str]:
lines: list[str] = []
if not extra:
return lines
if extra.get("repo_url"):
lines.append(f"[bold]Repo:[/] {extra['repo_url']}")
if extra.get("detail_url"):
lines.append(f"[bold]Detail Page:[/] {extra['detail_url']}")
if extra.get("index_url"):
lines.append(f"[bold]Index:[/] {extra['index_url']}")
if extra.get("endpoint"):
lines.append(f"[bold]Endpoint:[/] {extra['endpoint']}")
if extra.get("install_command"):
lines.append(f"[bold]Install Command:[/] {extra['install_command']}")
if extra.get("installs") is not None:
lines.append(f"[bold]Installs:[/] {extra['installs']}")
if extra.get("weekly_installs"):
lines.append(f"[bold]Weekly Installs:[/] {extra['weekly_installs']}")
security = extra.get("security_audits")
if isinstance(security, dict) and security:
ordered = ", ".join(f"{name}={status}" for name, status in sorted(security.items()))
lines.append(f"[bold]Security:[/] {ordered}")
return lines
def _resolve_source_meta_and_bundle(identifier: str, sources):
"""Resolve metadata and bundle for a specific identifier."""
meta = None
bundle = None
matched_source = None
for src in sources:
if meta is None:
try:
meta = src.inspect(identifier)
if meta:
matched_source = src
except Exception:
meta = None
try:
bundle = src.fetch(identifier)
except Exception:
bundle = None
if bundle:
matched_source = src
if meta is None:
try:
meta = src.inspect(identifier)
except Exception:
meta = None
break
return meta, bundle, matched_source
def _derive_category_from_install_path(install_path: str) -> str:
path = Path(install_path)
parent = str(path.parent)
return "" if parent == "." else parent
def do_search(query: str, source: str = "all", limit: int = 10,
console: Optional[Console] = None) -> None:
"""Search registries and display results as a Rich table."""
@@ -136,7 +200,7 @@ def do_browse(page: int = 1, page_size: int = 20, source: str = "all",
# Collect results from all (or filtered) sources
# Use empty query to get everything; per-source limits prevent overload
_TRUST_RANK = {"builtin": 3, "trusted": 2, "community": 1}
_PER_SOURCE_LIMIT = {"official": 100, "skills-sh": 100, "github": 100, "clawhub": 50,
_PER_SOURCE_LIMIT = {"official": 100, "skills-sh": 100, "well-known": 25, "github": 100, "clawhub": 50,
"claude-marketplace": 50, "lobehub": 50}
all_results: list = []
@@ -263,11 +327,7 @@ def do_install(identifier: str, category: str = "", force: bool = False,
c.print(f"\n[bold]Fetching:[/] {identifier}")
bundle = None
for src in sources:
bundle = src.fetch(identifier)
if bundle:
break
meta, bundle, _matched_source = _resolve_source_meta_and_bundle(identifier, sources)
if not bundle:
c.print(f"[bold red]Error:[/] Could not fetch '{identifier}' from any source.\n")
@@ -288,6 +348,9 @@ def do_install(identifier: str, category: str = "", force: bool = False,
c.print("Use --force to reinstall.\n")
return
extra_metadata = dict(getattr(meta, "extra", {}) or {})
extra_metadata.update(getattr(bundle, "metadata", {}) or {})
# Quarantine the bundle
q_path = quarantine_bundle(bundle)
c.print(f"[dim]Quarantined to {q_path.relative_to(q_path.parent.parent.parent)}[/]")
@@ -309,6 +372,11 @@ def do_install(identifier: str, category: str = "", force: bool = False,
f"{len(result.findings)}_findings")
return
if extra_metadata:
metadata_lines = _format_extra_metadata_lines(extra_metadata)
if metadata_lines:
c.print(Panel("\n".join(metadata_lines), title="Upstream Metadata", border_style="blue"))
# Confirm with user — show appropriate warning based on source
if not force:
c.print()
@@ -361,23 +429,12 @@ def do_inspect(identifier: str, console: Optional[Console] = None) -> None:
if not identifier:
return
meta = None
for src in sources:
meta = src.inspect(identifier)
if meta:
break
meta, bundle, _matched_source = _resolve_source_meta_and_bundle(identifier, sources)
if not meta:
c.print(f"[bold red]Error:[/] Could not find '{identifier}' in any source.\n")
return
# Also fetch full content for preview
bundle = None
for src in sources:
bundle = src.fetch(identifier)
if bundle:
break
c.print()
trust_style = {"builtin": "bright_cyan", "trusted": "green", "community": "yellow"}.get(meta.trust_level, "dim")
trust_label = "official" if meta.source == "official" else meta.trust_level
@@ -391,6 +448,7 @@ def do_inspect(identifier: str, console: Optional[Console] = None) -> None:
]
if meta.tags:
info_lines.append(f"[bold]Tags:[/] {', '.join(meta.tags)}")
info_lines.extend(_format_extra_metadata_lines(meta.extra))
c.print(Panel("\n".join(info_lines), title=f"Skill: {meta.name}"))
@@ -464,6 +522,49 @@ def do_list(source_filter: str = "all", console: Optional[Console] = None) -> No
)
def do_check(name: Optional[str] = None, console: Optional[Console] = None) -> None:
"""Check hub-installed skills for upstream updates."""
from tools.skills_hub import check_for_skill_updates
c = console or _console
results = check_for_skill_updates(name=name)
if not results:
c.print("[dim]No hub-installed skills to check.[/]\n")
return
table = Table(title="Skill Updates")
table.add_column("Name", style="bold cyan")
table.add_column("Source", style="dim")
table.add_column("Status", style="dim")
for entry in results:
table.add_row(entry.get("name", ""), entry.get("source", ""), entry.get("status", ""))
c.print(table)
update_count = sum(1 for entry in results if entry.get("status") == "update_available")
c.print(f"[dim]{update_count} update(s) available across {len(results)} checked skill(s)[/]\n")
def do_update(name: Optional[str] = None, console: Optional[Console] = None) -> None:
"""Update hub-installed skills with upstream changes."""
from tools.skills_hub import HubLockFile, check_for_skill_updates
c = console or _console
lock = HubLockFile()
updates = [entry for entry in check_for_skill_updates(name=name) if entry.get("status") == "update_available"]
if not updates:
c.print("[dim]No updates available.[/]\n")
return
for entry in updates:
installed = lock.get_installed(entry["name"])
category = _derive_category_from_install_path(installed.get("install_path", "")) if installed else ""
c.print(f"[bold]Updating:[/] {entry['name']}")
do_install(entry["identifier"], category=category, force=True, console=c)
c.print(f"[bold green]Updated {len(updates)} skill(s).[/]\n")
def do_audit(name: Optional[str] = None, console: Optional[Console] = None) -> None:
"""Re-run security scan on installed hub skills."""
from tools.skills_hub import HubLockFile, SKILLS_DIR
@@ -827,6 +928,10 @@ def skills_command(args) -> None:
do_inspect(args.identifier)
elif action == "list":
do_list(source_filter=args.source)
elif action == "check":
do_check(name=getattr(args, "name", None))
elif action == "update":
do_update(name=getattr(args, "name", None))
elif action == "audit":
do_audit(name=getattr(args, "name", None))
elif action == "uninstall":
@@ -853,7 +958,7 @@ def skills_command(args) -> None:
return
do_tap(tap_action, repo=repo)
else:
_console.print("Usage: hermes skills [browse|search|install|inspect|list|audit|uninstall|publish|snapshot|tap]\n")
_console.print("Usage: hermes skills [browse|search|install|inspect|list|check|update|audit|uninstall|publish|snapshot|tap]\n")
_console.print("Run 'hermes skills <command> --help' for details.\n")
@@ -872,6 +977,8 @@ def handle_skills_slash(cmd: str, console: Optional[Console] = None) -> None:
/skills inspect openai/skills/skill-creator
/skills list
/skills list --source hub
/skills check
/skills update
/skills audit
/skills audit my-skill
/skills uninstall my-skill
@@ -920,7 +1027,7 @@ def handle_skills_slash(cmd: str, console: Optional[Console] = None) -> None:
elif action == "search":
if not args:
c.print("[bold red]Usage:[/] /skills search <query> [--source skills-sh|github|official] [--limit N]\n")
c.print("[bold red]Usage:[/] /skills search <query> [--source skills-sh|well-known|github|official] [--limit N]\n")
return
source = "all"
limit = 10
@@ -967,6 +1074,14 @@ def handle_skills_slash(cmd: str, console: Optional[Console] = None) -> None:
source_filter = args[idx + 1]
do_list(source_filter=source_filter, console=c)
elif action == "check":
name = args[0] if args else None
do_check(name=name, console=c)
elif action == "update":
name = args[0] if args else None
do_update(name=name, console=c)
elif action == "audit":
name = args[0] if args else None
do_audit(name=name, console=c)
@@ -1029,6 +1144,8 @@ def _print_skills_help(console: Console) -> None:
" [cyan]install[/] <identifier> Install a skill (with security scan)\n"
" [cyan]inspect[/] <identifier> Preview a skill without installing\n"
" [cyan]list[/] [--source hub|builtin|local] List installed skills\n"
" [cyan]check[/] [name] Check hub skills for upstream updates\n"
" [cyan]update[/] [name] Update hub skills with upstream changes\n"
" [cyan]audit[/] [name] Re-scan hub skills for security\n"
" [cyan]uninstall[/] <name> Remove a hub-installed skill\n"
" [cyan]publish[/] <path> --repo <r> Publish a skill to GitHub via PR\n"

View File

@@ -3,7 +3,7 @@ from io import StringIO
import pytest
from rich.console import Console
from hermes_cli.skills_hub import do_list
from hermes_cli.skills_hub import do_check, do_list, do_update
class _DummyLockFile:
@@ -68,6 +68,34 @@ def _capture(source_filter: str = "all") -> str:
return sink.getvalue()
def _capture_check(monkeypatch, results, name=None) -> str:
import tools.skills_hub as hub
sink = StringIO()
console = Console(file=sink, force_terminal=False, color_system=None)
monkeypatch.setattr(hub, "check_for_skill_updates", lambda **_kwargs: results)
do_check(name=name, console=console)
return sink.getvalue()
def _capture_update(monkeypatch, results) -> tuple[str, list[tuple[str, str, bool]]]:
import tools.skills_hub as hub
import hermes_cli.skills_hub as cli_hub
sink = StringIO()
console = Console(file=sink, force_terminal=False, color_system=None)
installs = []
monkeypatch.setattr(hub, "check_for_skill_updates", lambda **_kwargs: results)
monkeypatch.setattr(hub, "HubLockFile", lambda: type("L", (), {
"get_installed": lambda self, name: {"install_path": "category/" + name}
})())
monkeypatch.setattr(cli_hub, "do_install", lambda identifier, category="", force=False, console=None: installs.append((identifier, category, force)))
do_update(console=console)
return sink.getvalue(), installs
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
@@ -122,3 +150,30 @@ def test_do_list_filter_builtin(three_source_env):
assert "builtin-skill" in output
assert "hub-skill" not in output
assert "local-skill" not in output
def test_do_check_reports_available_updates(monkeypatch):
output = _capture_check(monkeypatch, [
{"name": "hub-skill", "source": "skills.sh", "status": "update_available"},
{"name": "other-skill", "source": "github", "status": "up_to_date"},
])
assert "hub-skill" in output
assert "update_available" in output
assert "up_to_date" in output
def test_do_check_handles_no_installed_updates(monkeypatch):
output = _capture_check(monkeypatch, [])
assert "No hub-installed skills to check" in output
def test_do_update_reinstalls_outdated_skills(monkeypatch):
output, installs = _capture_update(monkeypatch, [
{"name": "hub-skill", "identifier": "skills-sh/example/repo/hub-skill", "status": "update_available"},
{"name": "other-skill", "identifier": "github/example/other-skill", "status": "up_to_date"},
])
assert installs == [("skills-sh/example/repo/hub-skill", "category", True)]
assert "Updated 1 skill" in output

View File

@@ -9,10 +9,13 @@ from tools.skills_hub import (
GitHubSource,
LobeHubSource,
SkillsShSource,
WellKnownSkillSource,
SkillMeta,
SkillBundle,
HubLockFile,
TapsManager,
bundle_content_hash,
check_for_skill_updates,
create_source_router,
unified_search,
append_audit_log,
@@ -132,6 +135,7 @@ class TestSkillsShSource:
assert "skills.sh" in results[0].description
assert results[0].repo == "vercel-labs/agent-skills"
assert results[0].path == "vercel-react-best-practices"
assert results[0].extra["installs"] == 207679
@patch("tools.skills_hub._write_index_cache")
@patch("tools.skills_hub._read_index_cache", return_value=None)
@@ -171,8 +175,11 @@ class TestSkillsShSource:
assert bundle.identifier == "skills-sh/vercel-labs/agent-skills/vercel-react-best-practices"
mock_fetch.assert_called_once_with("vercel-labs/agent-skills/vercel-react-best-practices")
@patch("tools.skills_hub._write_index_cache")
@patch("tools.skills_hub._read_index_cache", return_value=None)
@patch("tools.skills_hub.httpx.get")
@patch.object(GitHubSource, "inspect")
def test_inspect_delegates_to_github_source_and_relabels_meta(self, mock_inspect):
def test_inspect_delegates_to_github_source_and_relabels_meta(self, mock_inspect, mock_get, _mock_read_cache, _mock_write_cache):
mock_inspect.return_value = SkillMeta(
name="vercel-react-best-practices",
description="React rules",
@@ -182,12 +189,24 @@ class TestSkillsShSource:
repo="vercel-labs/agent-skills",
path="vercel-react-best-practices",
)
mock_get.return_value = MagicMock(
status_code=200,
text='''
<h1>vercel-react-best-practices</h1>
<code>$ npx skills add https://github.com/vercel-labs/agent-skills --skill vercel-react-best-practices</code>
<div class="prose"><h1>Vercel React Best Practices</h1><p>React rules.</p></div>
<a href="/vercel-labs/agent-skills/vercel-react-best-practices/security/socket">Socket</a> Pass
<a href="/vercel-labs/agent-skills/vercel-react-best-practices/security/snyk">Snyk</a> Pass
''',
)
meta = self._source().inspect("skills-sh/vercel-labs/agent-skills/vercel-react-best-practices")
assert meta is not None
assert meta.source == "skills.sh"
assert meta.identifier == "skills-sh/vercel-labs/agent-skills/vercel-react-best-practices"
assert meta.extra["install_command"].endswith("--skill vercel-react-best-practices")
assert meta.extra["security_audits"]["socket"] == "Pass"
mock_inspect.assert_called_once_with("vercel-labs/agent-skills/vercel-react-best-practices")
@patch.object(GitHubSource, "_list_skills_in_repo")
@@ -285,11 +304,176 @@ class TestSkillsShSource:
assert mock_get.called
class TestWellKnownSkillSource:
def _source(self):
return WellKnownSkillSource()
@patch("tools.skills_hub._write_index_cache")
@patch("tools.skills_hub._read_index_cache", return_value=None)
@patch("tools.skills_hub.httpx.get")
def test_search_reads_index_from_well_known_url(self, mock_get, _mock_read_cache, _mock_write_cache):
mock_get.return_value = MagicMock(
status_code=200,
json=lambda: {
"skills": [
{"name": "git-workflow", "description": "Git rules", "files": ["SKILL.md"]},
{"name": "code-review", "description": "Review code", "files": ["SKILL.md", "references/checklist.md"]},
]
},
)
results = self._source().search("https://example.com/.well-known/skills/index.json", limit=10)
assert [r.identifier for r in results] == [
"well-known:https://example.com/.well-known/skills/git-workflow",
"well-known:https://example.com/.well-known/skills/code-review",
]
assert all(r.source == "well-known" for r in results)
@patch("tools.skills_hub._write_index_cache")
@patch("tools.skills_hub._read_index_cache", return_value=None)
@patch("tools.skills_hub.httpx.get")
def test_search_accepts_domain_root_and_resolves_index(self, mock_get, _mock_read_cache, _mock_write_cache):
mock_get.return_value = MagicMock(
status_code=200,
json=lambda: {"skills": [{"name": "git-workflow", "description": "Git rules", "files": ["SKILL.md"]}]},
)
results = self._source().search("https://example.com", limit=10)
assert len(results) == 1
called_url = mock_get.call_args.args[0]
assert called_url == "https://example.com/.well-known/skills/index.json"
@patch("tools.skills_hub.httpx.get")
def test_inspect_fetches_skill_md_from_well_known_endpoint(self, mock_get):
def fake_get(url, *args, **kwargs):
if url.endswith("/index.json"):
return MagicMock(status_code=200, json=lambda: {
"skills": [{"name": "git-workflow", "description": "Git rules", "files": ["SKILL.md"]}]
})
if url.endswith("/git-workflow/SKILL.md"):
return MagicMock(status_code=200, text="---\nname: git-workflow\ndescription: Git rules\n---\n\n# Git Workflow\n")
raise AssertionError(url)
mock_get.side_effect = fake_get
meta = self._source().inspect("well-known:https://example.com/.well-known/skills/git-workflow")
assert meta is not None
assert meta.name == "git-workflow"
assert meta.source == "well-known"
assert meta.extra["base_url"] == "https://example.com/.well-known/skills"
@patch("tools.skills_hub.httpx.get")
def test_fetch_downloads_skill_files_from_well_known_endpoint(self, mock_get):
def fake_get(url, *args, **kwargs):
if url.endswith("/index.json"):
return MagicMock(status_code=200, json=lambda: {
"skills": [{
"name": "code-review",
"description": "Review code",
"files": ["SKILL.md", "references/checklist.md"],
}]
})
if url.endswith("/code-review/SKILL.md"):
return MagicMock(status_code=200, text="# Code Review\n")
if url.endswith("/code-review/references/checklist.md"):
return MagicMock(status_code=200, text="- [ ] security\n")
raise AssertionError(url)
mock_get.side_effect = fake_get
bundle = self._source().fetch("well-known:https://example.com/.well-known/skills/code-review")
assert bundle is not None
assert bundle.source == "well-known"
assert bundle.files["SKILL.md"] == "# Code Review\n"
assert bundle.files["references/checklist.md"] == "- [ ] security\n"
class TestCheckForSkillUpdates:
def test_bundle_content_hash_matches_installed_content_hash(self, tmp_path):
from tools.skills_guard import content_hash
bundle = SkillBundle(
name="demo-skill",
files={
"SKILL.md": "same content",
"references/checklist.md": "- [ ] security\n",
},
source="github",
identifier="owner/repo/demo-skill",
trust_level="community",
)
skill_dir = tmp_path / "demo-skill"
skill_dir.mkdir()
(skill_dir / "SKILL.md").write_text("same content")
(skill_dir / "references").mkdir()
(skill_dir / "references" / "checklist.md").write_text("- [ ] security\n")
assert bundle_content_hash(bundle) == content_hash(skill_dir)
def test_reports_update_when_remote_hash_differs(self):
lock = MagicMock()
lock.list_installed.return_value = [{
"name": "demo-skill",
"source": "github",
"identifier": "owner/repo/demo-skill",
"content_hash": "oldhash",
"install_path": "demo-skill",
}]
source = MagicMock()
source.source_id.return_value = "github"
source.fetch.return_value = SkillBundle(
name="demo-skill",
files={"SKILL.md": "new content"},
source="github",
identifier="owner/repo/demo-skill",
trust_level="community",
)
results = check_for_skill_updates(lock=lock, sources=[source])
assert len(results) == 1
assert results[0]["name"] == "demo-skill"
assert results[0]["status"] == "update_available"
def test_reports_up_to_date_when_hash_matches(self):
bundle = SkillBundle(
name="demo-skill",
files={"SKILL.md": "same content"},
source="github",
identifier="owner/repo/demo-skill",
trust_level="community",
)
lock = MagicMock()
lock.list_installed.return_value = [{
"name": "demo-skill",
"source": "github",
"identifier": "owner/repo/demo-skill",
"content_hash": bundle_content_hash(bundle),
"install_path": "demo-skill",
}]
source = MagicMock()
source.source_id.return_value = "github"
source.fetch.return_value = bundle
results = check_for_skill_updates(lock=lock, sources=[source])
assert results[0]["status"] == "up_to_date"
class TestCreateSourceRouter:
def test_includes_skills_sh_source(self):
sources = create_source_router(auth=MagicMock(spec=GitHubAuth))
assert any(isinstance(src, SkillsShSource) for src in sources)
def test_includes_well_known_source(self):
sources = create_source_router(auth=MagicMock(spec=GitHubAuth))
assert any(isinstance(src, WellKnownSkillSource) for src in sources)
# ---------------------------------------------------------------------------
# HubLockFile

View File

@@ -26,6 +26,7 @@ from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlparse, urlunparse
import httpx
import yaml
@@ -69,6 +70,7 @@ class SkillMeta:
repo: Optional[str] = None
path: Optional[str] = None
tags: List[str] = field(default_factory=list)
extra: Dict[str, Any] = field(default_factory=dict)
@dataclass
@@ -79,6 +81,7 @@ class SkillBundle:
source: str
identifier: str
trust_level: str
metadata: Dict[str, Any] = field(default_factory=dict)
# ---------------------------------------------------------------------------
@@ -497,6 +500,221 @@ class GitHubSource(SkillSource):
return {}
# ---------------------------------------------------------------------------
# Well-known Agent Skills endpoint source adapter
# ---------------------------------------------------------------------------
class WellKnownSkillSource(SkillSource):
"""Read skills from a domain exposing /.well-known/skills/index.json."""
BASE_PATH = "/.well-known/skills"
def source_id(self) -> str:
return "well-known"
def trust_level_for(self, identifier: str) -> str:
return "community"
def search(self, query: str, limit: int = 10) -> List[SkillMeta]:
index_url = self._query_to_index_url(query)
if not index_url:
return []
parsed = self._parse_index(index_url)
if not parsed:
return []
results: List[SkillMeta] = []
for entry in parsed["skills"][:limit]:
name = entry.get("name")
if not isinstance(name, str) or not name:
continue
description = entry.get("description", "")
files = entry.get("files", ["SKILL.md"])
results.append(SkillMeta(
name=name,
description=str(description),
source="well-known",
identifier=self._wrap_identifier(parsed["base_url"], name),
trust_level="community",
path=name,
extra={
"index_url": parsed["index_url"],
"base_url": parsed["base_url"],
"files": files if isinstance(files, list) else ["SKILL.md"],
},
))
return results
def inspect(self, identifier: str) -> Optional[SkillMeta]:
parsed = self._parse_identifier(identifier)
if not parsed:
return None
entry = self._index_entry(parsed["index_url"], parsed["skill_name"])
if not entry:
return None
skill_md = self._fetch_text(f"{parsed['skill_url']}/SKILL.md")
if skill_md is None:
return None
fm = GitHubSource._parse_frontmatter_quick(skill_md)
description = str(fm.get("description") or entry.get("description") or "")
name = str(fm.get("name") or parsed["skill_name"])
return SkillMeta(
name=name,
description=description,
source="well-known",
identifier=self._wrap_identifier(parsed["base_url"], parsed["skill_name"]),
trust_level="community",
path=parsed["skill_name"],
extra={
"index_url": parsed["index_url"],
"base_url": parsed["base_url"],
"files": entry.get("files", ["SKILL.md"]),
"endpoint": parsed["skill_url"],
},
)
def fetch(self, identifier: str) -> Optional[SkillBundle]:
parsed = self._parse_identifier(identifier)
if not parsed:
return None
entry = self._index_entry(parsed["index_url"], parsed["skill_name"])
if not entry:
return None
files = entry.get("files", ["SKILL.md"])
if not isinstance(files, list) or not files:
files = ["SKILL.md"]
downloaded: Dict[str, str] = {}
for rel_path in files:
if not isinstance(rel_path, str) or not rel_path:
continue
text = self._fetch_text(f"{parsed['skill_url']}/{rel_path}")
if text is None:
return None
downloaded[rel_path] = text
if "SKILL.md" not in downloaded:
return None
return SkillBundle(
name=parsed["skill_name"],
files=downloaded,
source="well-known",
identifier=self._wrap_identifier(parsed["base_url"], parsed["skill_name"]),
trust_level="community",
metadata={
"index_url": parsed["index_url"],
"base_url": parsed["base_url"],
"endpoint": parsed["skill_url"],
"files": files,
},
)
def _query_to_index_url(self, query: str) -> Optional[str]:
query = query.strip()
if not query.startswith(("http://", "https://")):
return None
if query.endswith("/index.json"):
return query
if f"{self.BASE_PATH}/" in query:
base_url = query.split(f"{self.BASE_PATH}/", 1)[0] + self.BASE_PATH
return f"{base_url}/index.json"
return query.rstrip("/") + f"{self.BASE_PATH}/index.json"
def _parse_identifier(self, identifier: str) -> Optional[dict]:
raw = identifier[len("well-known:"):] if identifier.startswith("well-known:") else identifier
if not raw.startswith(("http://", "https://")):
return None
parsed_url = urlparse(raw)
clean_url = urlunparse(parsed_url._replace(fragment=""))
fragment = parsed_url.fragment
if clean_url.endswith("/index.json"):
if not fragment:
return None
base_url = clean_url[:-len("/index.json")]
skill_name = fragment
skill_url = f"{base_url}/{skill_name}"
return {
"index_url": clean_url,
"base_url": base_url,
"skill_name": skill_name,
"skill_url": skill_url,
}
if clean_url.endswith("/SKILL.md"):
skill_url = clean_url[:-len("/SKILL.md")]
else:
skill_url = clean_url.rstrip("/")
if f"{self.BASE_PATH}/" not in skill_url:
return None
base_url, skill_name = skill_url.rsplit("/", 1)
return {
"index_url": f"{base_url}/index.json",
"base_url": base_url,
"skill_name": skill_name,
"skill_url": skill_url,
}
def _parse_index(self, index_url: str) -> Optional[dict]:
cache_key = f"well_known_index_{hashlib.md5(index_url.encode()).hexdigest()}"
cached = _read_index_cache(cache_key)
if isinstance(cached, dict) and isinstance(cached.get("skills"), list):
return cached
try:
resp = httpx.get(index_url, timeout=20, follow_redirects=True)
if resp.status_code != 200:
return None
data = resp.json()
except (httpx.HTTPError, json.JSONDecodeError):
return None
skills = data.get("skills", []) if isinstance(data, dict) else []
if not isinstance(skills, list):
return None
parsed = {
"index_url": index_url,
"base_url": index_url[:-len("/index.json")],
"skills": skills,
}
_write_index_cache(cache_key, parsed)
return parsed
def _index_entry(self, index_url: str, skill_name: str) -> Optional[dict]:
parsed = self._parse_index(index_url)
if not parsed:
return None
for entry in parsed["skills"]:
if isinstance(entry, dict) and entry.get("name") == skill_name:
return entry
return None
@staticmethod
def _fetch_text(url: str) -> Optional[str]:
try:
resp = httpx.get(url, timeout=20, follow_redirects=True)
if resp.status_code == 200:
return resp.text
except httpx.HTTPError:
return None
return None
@staticmethod
def _wrap_identifier(base_url: str, skill_name: str) -> str:
return f"well-known:{base_url.rstrip('/')}/{skill_name}"
# ---------------------------------------------------------------------------
# skills.sh source adapter
# ---------------------------------------------------------------------------
@@ -569,20 +787,22 @@ class SkillsShSource(SkillSource):
def fetch(self, identifier: str) -> Optional[SkillBundle]:
canonical = self._normalize_identifier(identifier)
detail = self._fetch_detail_page(canonical)
for candidate in self._candidate_identifiers(canonical):
bundle = self.github.fetch(candidate)
if bundle:
bundle.source = "skills.sh"
bundle.identifier = self._wrap_identifier(canonical)
bundle.metadata.update(self._detail_to_metadata(canonical, detail))
return bundle
detail = self._fetch_detail_page(canonical)
resolved = self._discover_identifier(canonical, detail=detail)
if resolved:
bundle = self.github.fetch(resolved)
if bundle:
bundle.source = "skills.sh"
bundle.identifier = self._wrap_identifier(canonical)
bundle.metadata.update(self._detail_to_metadata(canonical, detail))
return bundle
return None
@@ -672,6 +892,11 @@ class SkillsShSource(SkillSource):
trust_level=self.github.trust_level_for(canonical),
repo=repo,
path=skill_path,
extra={
"installs": installs,
"detail_url": f"{self.BASE_URL}/{canonical}",
"repo_url": f"https://github.com/{repo}",
},
)
def _fetch_detail_page(self, identifier: str) -> Optional[dict]:
@@ -702,8 +927,10 @@ class SkillsShSource(SkillSource):
repo = default_repo
install_skill = skill_token
install_command = None
install_match = self._INSTALL_CMD_RE.search(html)
if install_match:
install_command = install_match.group(0).strip()
repo_value = (install_match.group("repo") or "").strip()
install_skill = (install_match.group("skill") or install_skill).strip()
repo = self._extract_repo_slug(repo_value) or repo
@@ -712,6 +939,7 @@ class SkillsShSource(SkillSource):
body_title = self._extract_first_match(self._PROSE_H1_RE, html)
body_summary = self._extract_first_match(self._PROSE_P_RE, html)
weekly_installs = self._extract_weekly_installs(html)
security_audits = self._extract_security_audits(html, identifier)
return {
"repo": repo,
@@ -720,6 +948,10 @@ class SkillsShSource(SkillSource):
"body_title": body_title,
"body_summary": body_summary,
"weekly_installs": weekly_installs,
"install_command": install_command,
"repo_url": f"https://github.com/{repo}",
"detail_url": f"{self.BASE_URL}/{identifier}",
"security_audits": security_audits,
}
def _discover_identifier(self, identifier: str, detail: Optional[dict] = None) -> Optional[str]:
@@ -752,6 +984,9 @@ class SkillsShSource(SkillSource):
meta.source = "skills.sh"
meta.identifier = self._wrap_identifier(canonical)
meta.trust_level = self.trust_level_for(canonical)
merged_extra = dict(meta.extra)
merged_extra.update(self._detail_to_metadata(canonical, detail))
meta.extra = merged_extra
if isinstance(detail, dict):
body_summary = detail.get("body_summary")
@@ -827,6 +1062,21 @@ class SkillsShSource(SkillSource):
return None
return SkillsShSource._strip_html(value).strip() or None
def _detail_to_metadata(self, canonical: str, detail: Optional[dict]) -> Dict[str, Any]:
parts = canonical.split("/", 2)
repo = f"{parts[0]}/{parts[1]}" if len(parts) >= 2 else ""
metadata = {
"detail_url": f"{self.BASE_URL}/{canonical}",
}
if repo:
metadata["repo_url"] = f"https://github.com/{repo}"
if isinstance(detail, dict):
for key in ("weekly_installs", "install_command", "repo_url", "detail_url", "security_audits"):
value = detail.get(key)
if value:
metadata[key] = value
return metadata
@staticmethod
def _extract_weekly_installs(html: str) -> Optional[str]:
match = SkillsShSource._WEEKLY_INSTALLS_RE.search(html)
@@ -834,6 +1084,19 @@ class SkillsShSource(SkillSource):
return None
return match.group("count")
@staticmethod
def _extract_security_audits(html: str, identifier: str) -> Dict[str, str]:
audits: Dict[str, str] = {}
for audit in ("agent-trust-hub", "socket", "snyk"):
idx = html.find(f"/security/{audit}")
if idx == -1:
continue
window = html[idx:idx + 500]
match = re.search(r'(Pass|Warn|Fail)', window, re.IGNORECASE)
if match:
audits[audit] = match.group(1).title()
return audits
@staticmethod
def _strip_html(value: str) -> str:
return re.sub(r'<[^>]+>', '', value)
@@ -1590,6 +1853,7 @@ def _skill_meta_to_dict(meta: SkillMeta) -> dict:
"repo": meta.repo,
"path": meta.path,
"tags": meta.tags,
"extra": meta.extra,
}
@@ -1625,6 +1889,7 @@ class HubLockFile:
skill_hash: str,
install_path: str,
files: List[str],
metadata: Optional[Dict[str, Any]] = None,
) -> None:
data = self.load()
data["installed"][name] = {
@@ -1635,6 +1900,7 @@ class HubLockFile:
"content_hash": skill_hash,
"install_path": install_path,
"files": files,
"metadata": metadata or {},
"installed_at": datetime.now(timezone.utc).isoformat(),
"updated_at": datetime.now(timezone.utc).isoformat(),
}
@@ -1789,6 +2055,7 @@ def install_from_quarantine(
skill_hash=content_hash(install_dir),
install_path=str(install_dir.relative_to(SKILLS_DIR)),
files=list(bundle.files.keys()),
metadata=bundle.metadata,
)
append_audit_log(
@@ -1817,6 +2084,78 @@ def uninstall_skill(skill_name: str) -> Tuple[bool, str]:
return True, f"Uninstalled '{skill_name}' from {entry['install_path']}"
def bundle_content_hash(bundle: SkillBundle) -> str:
"""Compute a deterministic hash for an in-memory skill bundle."""
h = hashlib.sha256()
for rel_path in sorted(bundle.files):
h.update(bundle.files[rel_path].encode("utf-8"))
return f"sha256:{h.hexdigest()[:16]}"
def _source_matches(source: SkillSource, source_name: str) -> bool:
aliases = {
"skills.sh": "skills-sh",
}
normalized = aliases.get(source_name, source_name)
return source.source_id() == normalized
def check_for_skill_updates(
name: Optional[str] = None,
*,
lock: Optional[HubLockFile] = None,
sources: Optional[List[SkillSource]] = None,
auth: Optional[GitHubAuth] = None,
) -> List[dict]:
"""Check installed hub skills for upstream changes."""
lock = lock or HubLockFile()
installed = lock.list_installed()
if name:
installed = [entry for entry in installed if entry.get("name") == name]
if sources is None:
sources = create_source_router(auth=auth)
results: List[dict] = []
for entry in installed:
identifier = entry.get("identifier", "")
source_name = entry.get("source", "")
candidate_sources = [src for src in sources if _source_matches(src, source_name)] or sources
bundle = None
for src in candidate_sources:
try:
bundle = src.fetch(identifier)
except Exception:
bundle = None
if bundle:
break
if not bundle:
results.append({
"name": entry.get("name", ""),
"identifier": identifier,
"source": source_name,
"status": "unavailable",
})
continue
current_hash = entry.get("content_hash", "")
latest_hash = bundle_content_hash(bundle)
status = "up_to_date" if current_hash == latest_hash else "update_available"
results.append({
"name": entry.get("name", ""),
"identifier": identifier,
"source": source_name,
"status": status,
"current_hash": current_hash,
"latest_hash": latest_hash,
"bundle": bundle,
})
return results
def create_source_router(auth: Optional[GitHubAuth] = None) -> List[SkillSource]:
"""
Create all configured source adapters.
@@ -1831,6 +2170,7 @@ def create_source_router(auth: Optional[GitHubAuth] = None) -> List[SkillSource]
sources: List[SkillSource] = [
OptionalSkillSource(), # Official optional skills (highest priority)
SkillsShSource(auth=auth),
WellKnownSkillSource(),
GitHubSource(auth=auth, extra_taps=extra_taps),
ClawHubSource(),
ClaudeMarketplaceSource(auth=auth),