Merge PR #286: Fix ClawHub Skills Hub adapter for API endpoint changes
Authored by BP602. Fixes #285.
This commit is contained in:
126
tests/tools/test_skills_hub_clawhub.py
Normal file
126
tests/tools/test_skills_hub_clawhub.py
Normal file
@@ -0,0 +1,126 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
|
||||
from tools.skills_hub import ClawHubSource
|
||||
|
||||
|
||||
class _MockResponse:
|
||||
def __init__(self, status_code=200, json_data=None, text=""):
|
||||
self.status_code = status_code
|
||||
self._json_data = json_data
|
||||
self.text = text
|
||||
|
||||
def json(self):
|
||||
return self._json_data
|
||||
|
||||
|
||||
class TestClawHubSource(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.src = ClawHubSource()
|
||||
|
||||
@patch("tools.skills_hub._write_index_cache")
|
||||
@patch("tools.skills_hub._read_index_cache", return_value=None)
|
||||
@patch("tools.skills_hub.httpx.get")
|
||||
def test_search_uses_new_endpoint_and_parses_items(self, mock_get, _mock_read_cache, _mock_write_cache):
|
||||
mock_get.return_value = _MockResponse(
|
||||
status_code=200,
|
||||
json_data={
|
||||
"items": [
|
||||
{
|
||||
"slug": "caldav-calendar",
|
||||
"displayName": "CalDAV Calendar",
|
||||
"summary": "Calendar integration",
|
||||
"tags": ["calendar", "productivity"],
|
||||
}
|
||||
]
|
||||
},
|
||||
)
|
||||
|
||||
results = self.src.search("caldav", limit=5)
|
||||
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertEqual(results[0].identifier, "caldav-calendar")
|
||||
self.assertEqual(results[0].name, "CalDAV Calendar")
|
||||
self.assertEqual(results[0].description, "Calendar integration")
|
||||
|
||||
mock_get.assert_called_once()
|
||||
args, kwargs = mock_get.call_args
|
||||
self.assertTrue(args[0].endswith("/skills"))
|
||||
self.assertEqual(kwargs["params"], {"search": "caldav", "limit": 5})
|
||||
|
||||
@patch("tools.skills_hub.httpx.get")
|
||||
def test_inspect_maps_display_name_and_summary(self, mock_get):
|
||||
mock_get.return_value = _MockResponse(
|
||||
status_code=200,
|
||||
json_data={
|
||||
"slug": "caldav-calendar",
|
||||
"displayName": "CalDAV Calendar",
|
||||
"summary": "Calendar integration",
|
||||
"tags": ["calendar"],
|
||||
},
|
||||
)
|
||||
|
||||
meta = self.src.inspect("caldav-calendar")
|
||||
|
||||
self.assertIsNotNone(meta)
|
||||
self.assertEqual(meta.name, "CalDAV Calendar")
|
||||
self.assertEqual(meta.description, "Calendar integration")
|
||||
self.assertEqual(meta.identifier, "caldav-calendar")
|
||||
|
||||
@patch("tools.skills_hub.httpx.get")
|
||||
def test_fetch_resolves_latest_version_and_downloads_raw_files(self, mock_get):
|
||||
def side_effect(url, *args, **kwargs):
|
||||
if url.endswith("/skills/caldav-calendar"):
|
||||
return _MockResponse(
|
||||
status_code=200,
|
||||
json_data={
|
||||
"slug": "caldav-calendar",
|
||||
"latestVersion": {"version": "1.0.1"},
|
||||
},
|
||||
)
|
||||
if url.endswith("/skills/caldav-calendar/versions/1.0.1"):
|
||||
return _MockResponse(
|
||||
status_code=200,
|
||||
json_data={
|
||||
"files": [
|
||||
{"path": "SKILL.md", "rawUrl": "https://files.example/skill-md"},
|
||||
{"path": "README.md", "content": "hello"},
|
||||
]
|
||||
},
|
||||
)
|
||||
if url == "https://files.example/skill-md":
|
||||
return _MockResponse(status_code=200, text="# Skill")
|
||||
return _MockResponse(status_code=404, json_data={})
|
||||
|
||||
mock_get.side_effect = side_effect
|
||||
|
||||
bundle = self.src.fetch("caldav-calendar")
|
||||
|
||||
self.assertIsNotNone(bundle)
|
||||
self.assertEqual(bundle.name, "caldav-calendar")
|
||||
self.assertIn("SKILL.md", bundle.files)
|
||||
self.assertEqual(bundle.files["SKILL.md"], "# Skill")
|
||||
self.assertEqual(bundle.files["README.md"], "hello")
|
||||
|
||||
@patch("tools.skills_hub.httpx.get")
|
||||
def test_fetch_falls_back_to_versions_list(self, mock_get):
|
||||
def side_effect(url, *args, **kwargs):
|
||||
if url.endswith("/skills/caldav-calendar"):
|
||||
return _MockResponse(status_code=200, json_data={"slug": "caldav-calendar"})
|
||||
if url.endswith("/skills/caldav-calendar/versions"):
|
||||
return _MockResponse(status_code=200, json_data=[{"version": "2.0.0"}])
|
||||
if url.endswith("/skills/caldav-calendar/versions/2.0.0"):
|
||||
return _MockResponse(status_code=200, json_data={"files": {"SKILL.md": "# Skill"}})
|
||||
return _MockResponse(status_code=404, json_data={})
|
||||
|
||||
mock_get.side_effect = side_effect
|
||||
|
||||
bundle = self.src.fetch("caldav-calendar")
|
||||
self.assertIsNotNone(bundle)
|
||||
self.assertEqual(bundle.files["SKILL.md"], "# Skill")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -520,8 +520,8 @@ class ClawHubSource(SkillSource):
|
||||
|
||||
try:
|
||||
resp = httpx.get(
|
||||
f"{self.BASE_URL}/skills/search",
|
||||
params={"q": query, "limit": limit},
|
||||
f"{self.BASE_URL}/skills",
|
||||
params={"search": query, "limit": limit},
|
||||
timeout=15,
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
@@ -530,82 +530,154 @@ class ClawHubSource(SkillSource):
|
||||
except (httpx.HTTPError, json.JSONDecodeError):
|
||||
return []
|
||||
|
||||
skills_data = data.get("skills", data) if isinstance(data, dict) else data
|
||||
skills_data = data.get("items", data) if isinstance(data, dict) else data
|
||||
if not isinstance(skills_data, list):
|
||||
return []
|
||||
|
||||
results = []
|
||||
for item in skills_data[:limit]:
|
||||
name = item.get("name", item.get("slug", ""))
|
||||
if not name:
|
||||
slug = item.get("slug")
|
||||
if not slug:
|
||||
continue
|
||||
meta = SkillMeta(
|
||||
name=name,
|
||||
description=item.get("description", ""),
|
||||
display_name = item.get("displayName") or item.get("name") or slug
|
||||
summary = item.get("summary") or item.get("description") or ""
|
||||
tags = item.get("tags", [])
|
||||
if not isinstance(tags, list):
|
||||
tags = []
|
||||
results.append(SkillMeta(
|
||||
name=display_name,
|
||||
description=summary,
|
||||
source="clawhub",
|
||||
identifier=item.get("slug", name),
|
||||
identifier=slug,
|
||||
trust_level="community",
|
||||
tags=item.get("tags", []),
|
||||
)
|
||||
results.append(meta)
|
||||
tags=[str(t) for t in tags],
|
||||
))
|
||||
|
||||
_write_index_cache(cache_key, [_skill_meta_to_dict(s) for s in results])
|
||||
return results
|
||||
|
||||
def fetch(self, identifier: str) -> Optional[SkillBundle]:
|
||||
try:
|
||||
resp = httpx.get(
|
||||
f"{self.BASE_URL}/skills/{identifier}/versions/latest/files",
|
||||
timeout=30,
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
return None
|
||||
data = resp.json()
|
||||
except (httpx.HTTPError, json.JSONDecodeError):
|
||||
slug = identifier.split("/")[-1]
|
||||
|
||||
skill_data = self._get_json(f"{self.BASE_URL}/skills/{slug}")
|
||||
if not isinstance(skill_data, dict):
|
||||
return None
|
||||
|
||||
files: Dict[str, str] = {}
|
||||
file_list = data.get("files", data) if isinstance(data, dict) else data
|
||||
if isinstance(file_list, list):
|
||||
for f in file_list:
|
||||
fname = f.get("name", f.get("path", ""))
|
||||
content = f.get("content", "")
|
||||
if fname and content:
|
||||
files[fname] = content
|
||||
elif isinstance(file_list, dict):
|
||||
files = {k: v for k, v in file_list.items() if isinstance(v, str)}
|
||||
latest_version = self._resolve_latest_version(slug, skill_data)
|
||||
if not latest_version:
|
||||
logger.warning("ClawHub fetch failed for %s: could not resolve latest version", slug)
|
||||
return None
|
||||
|
||||
version_data = self._get_json(f"{self.BASE_URL}/skills/{slug}/versions/{latest_version}")
|
||||
if not isinstance(version_data, dict):
|
||||
return None
|
||||
|
||||
files = self._extract_files(version_data)
|
||||
if "SKILL.md" not in files:
|
||||
logger.warning(
|
||||
"ClawHub fetch for %s resolved version %s but no inline/raw file content was available",
|
||||
slug,
|
||||
latest_version,
|
||||
)
|
||||
return None
|
||||
|
||||
return SkillBundle(
|
||||
name=identifier.split("/")[-1] if "/" in identifier else identifier,
|
||||
name=slug,
|
||||
files=files,
|
||||
source="clawhub",
|
||||
identifier=identifier,
|
||||
identifier=slug,
|
||||
trust_level="community",
|
||||
)
|
||||
|
||||
def inspect(self, identifier: str) -> Optional[SkillMeta]:
|
||||
slug = identifier.split("/")[-1]
|
||||
data = self._get_json(f"{self.BASE_URL}/skills/{slug}")
|
||||
if not isinstance(data, dict):
|
||||
return None
|
||||
|
||||
tags = data.get("tags", [])
|
||||
if not isinstance(tags, list):
|
||||
tags = []
|
||||
|
||||
return SkillMeta(
|
||||
name=data.get("displayName") or data.get("name") or data.get("slug") or slug,
|
||||
description=data.get("summary") or data.get("description") or "",
|
||||
source="clawhub",
|
||||
identifier=data.get("slug") or slug,
|
||||
trust_level="community",
|
||||
tags=[str(t) for t in tags],
|
||||
)
|
||||
|
||||
def _get_json(self, url: str, timeout: int = 20) -> Optional[Any]:
|
||||
try:
|
||||
resp = httpx.get(
|
||||
f"{self.BASE_URL}/skills/{identifier}",
|
||||
timeout=15,
|
||||
)
|
||||
resp = httpx.get(url, timeout=timeout)
|
||||
if resp.status_code != 200:
|
||||
return None
|
||||
data = resp.json()
|
||||
return resp.json()
|
||||
except (httpx.HTTPError, json.JSONDecodeError):
|
||||
return None
|
||||
|
||||
return SkillMeta(
|
||||
name=data.get("name", identifier),
|
||||
description=data.get("description", ""),
|
||||
source="clawhub",
|
||||
identifier=identifier,
|
||||
trust_level="community",
|
||||
tags=data.get("tags", []),
|
||||
)
|
||||
def _resolve_latest_version(self, slug: str, skill_data: Dict[str, Any]) -> Optional[str]:
|
||||
latest = skill_data.get("latestVersion")
|
||||
if isinstance(latest, dict):
|
||||
version = latest.get("version")
|
||||
if isinstance(version, str) and version:
|
||||
return version
|
||||
|
||||
tags = skill_data.get("tags")
|
||||
if isinstance(tags, dict):
|
||||
latest_tag = tags.get("latest")
|
||||
if isinstance(latest_tag, str) and latest_tag:
|
||||
return latest_tag
|
||||
|
||||
versions_data = self._get_json(f"{self.BASE_URL}/skills/{slug}/versions")
|
||||
if isinstance(versions_data, list) and versions_data:
|
||||
first = versions_data[0]
|
||||
if isinstance(first, dict):
|
||||
version = first.get("version")
|
||||
if isinstance(version, str) and version:
|
||||
return version
|
||||
return None
|
||||
|
||||
def _extract_files(self, version_data: Dict[str, Any]) -> Dict[str, str]:
|
||||
files: Dict[str, str] = {}
|
||||
file_list = version_data.get("files")
|
||||
|
||||
if isinstance(file_list, dict):
|
||||
return {k: v for k, v in file_list.items() if isinstance(v, str)}
|
||||
|
||||
if not isinstance(file_list, list):
|
||||
return files
|
||||
|
||||
for file_meta in file_list:
|
||||
if not isinstance(file_meta, dict):
|
||||
continue
|
||||
|
||||
fname = file_meta.get("path") or file_meta.get("name")
|
||||
if not fname or not isinstance(fname, str):
|
||||
continue
|
||||
|
||||
inline_content = file_meta.get("content")
|
||||
if isinstance(inline_content, str):
|
||||
files[fname] = inline_content
|
||||
continue
|
||||
|
||||
raw_url = file_meta.get("rawUrl") or file_meta.get("downloadUrl") or file_meta.get("url")
|
||||
if isinstance(raw_url, str) and raw_url.startswith("http"):
|
||||
content = self._fetch_text(raw_url)
|
||||
if content is not None:
|
||||
files[fname] = content
|
||||
|
||||
return files
|
||||
|
||||
def _fetch_text(self, url: str) -> Optional[str]:
|
||||
try:
|
||||
resp = httpx.get(url, timeout=20)
|
||||
if resp.status_code == 200:
|
||||
return resp.text
|
||||
except httpx.HTTPError:
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user