Merge pull request #1400 from NousResearch/hermes/hermes-45b79a59-clawhub-search

fix: harden ClawHub skill search exact matches
This commit is contained in:
Teknium
2026-03-14 23:17:24 -07:00
committed by GitHub
2 changed files with 392 additions and 30 deletions

View File

@@ -1156,11 +1156,176 @@ class ClawHubSource(SkillSource):
def trust_level_for(self, identifier: str) -> str:
return "community"
@staticmethod
def _normalize_tags(tags: Any) -> List[str]:
if isinstance(tags, list):
return [str(t) for t in tags]
if isinstance(tags, dict):
return [str(k) for k in tags.keys() if str(k) != "latest"]
return []
@staticmethod
def _coerce_skill_payload(data: Any) -> Optional[Dict[str, Any]]:
if not isinstance(data, dict):
return None
nested = data.get("skill")
if isinstance(nested, dict):
merged = dict(nested)
latest_version = data.get("latestVersion")
if latest_version is not None and "latestVersion" not in merged:
merged["latestVersion"] = latest_version
return merged
return data
@staticmethod
def _query_terms(query: str) -> List[str]:
return [term for term in re.split(r"[^a-z0-9]+", query.lower()) if term]
@classmethod
def _search_score(cls, query: str, meta: SkillMeta) -> int:
query_norm = query.strip().lower()
if not query_norm:
return 1
identifier = (meta.identifier or "").lower()
name = (meta.name or "").lower()
description = (meta.description or "").lower()
normalized_identifier = " ".join(cls._query_terms(identifier))
normalized_name = " ".join(cls._query_terms(name))
query_terms = cls._query_terms(query_norm)
identifier_terms = cls._query_terms(identifier)
name_terms = cls._query_terms(name)
score = 0
if query_norm == identifier:
score += 140
if query_norm == name:
score += 130
if normalized_identifier == query_norm:
score += 125
if normalized_name == query_norm:
score += 120
if normalized_identifier.startswith(query_norm):
score += 95
if normalized_name.startswith(query_norm):
score += 90
if query_terms and identifier_terms[: len(query_terms)] == query_terms:
score += 70
if query_terms and name_terms[: len(query_terms)] == query_terms:
score += 65
if query_norm in identifier:
score += 40
if query_norm in name:
score += 35
if query_norm in description:
score += 10
for term in query_terms:
if term in identifier_terms:
score += 15
if term in name_terms:
score += 12
if term in description:
score += 3
return score
@staticmethod
def _dedupe_results(results: List[SkillMeta]) -> List[SkillMeta]:
seen: set[str] = set()
deduped: List[SkillMeta] = []
for result in results:
key = (result.identifier or result.name).lower()
if key in seen:
continue
seen.add(key)
deduped.append(result)
return deduped
def _exact_slug_meta(self, query: str) -> Optional[SkillMeta]:
slug = query.strip().split("/")[-1]
query_terms = self._query_terms(query)
candidates: List[str] = []
if slug and re.fullmatch(r"[A-Za-z0-9][A-Za-z0-9._-]*", slug):
candidates.append(slug)
if query_terms:
base_slug = "-".join(query_terms)
if len(query_terms) >= 2:
candidates.extend([
f"{base_slug}-agent",
f"{base_slug}-skill",
f"{base_slug}-tool",
f"{base_slug}-assistant",
f"{base_slug}-playbook",
base_slug,
])
else:
candidates.append(base_slug)
seen: set[str] = set()
for candidate in candidates:
if candidate in seen:
continue
seen.add(candidate)
meta = self.inspect(candidate)
if meta:
return meta
return None
def _finalize_search_results(self, query: str, results: List[SkillMeta], limit: int) -> List[SkillMeta]:
query_norm = query.strip()
if not query_norm:
return self._dedupe_results(results)[:limit]
filtered = [meta for meta in results if self._search_score(query_norm, meta) > 0]
filtered.sort(
key=lambda meta: (
-self._search_score(query_norm, meta),
meta.name.lower(),
meta.identifier.lower(),
)
)
filtered = self._dedupe_results(filtered)
exact = self._exact_slug_meta(query_norm)
if exact:
filtered = [meta for meta in filtered if self._search_score(query_norm, meta) >= 20]
filtered = self._dedupe_results([exact] + filtered)
if filtered:
return filtered[:limit]
if re.fullmatch(r"[A-Za-z0-9][A-Za-z0-9._/-]*", query_norm):
return []
return self._dedupe_results(results)[:limit]
def search(self, query: str, limit: int = 10) -> List[SkillMeta]:
cache_key = f"clawhub_search_{hashlib.md5(query.encode()).hexdigest()}"
query = query.strip()
if query:
query_terms = self._query_terms(query)
if len(query_terms) >= 2:
direct = self._exact_slug_meta(query)
if direct:
return [direct]
results = self._search_catalog(query, limit=limit)
if results:
return results
# Empty query or catalog fallback failure: use the lightweight listing API.
cache_key = f"clawhub_search_listing_v1_{hashlib.md5(query.encode()).hexdigest()}_{limit}"
cached = _read_index_cache(cache_key)
if cached is not None:
return [SkillMeta(**s) for s in cached][:limit]
return self._finalize_search_results(
query,
[SkillMeta(**s) for s in cached],
limit,
)
try:
resp = httpx.get(
@@ -1185,20 +1350,19 @@ class ClawHubSource(SkillSource):
continue
display_name = item.get("displayName") or item.get("name") or slug
summary = item.get("summary") or item.get("description") or ""
tags = item.get("tags", [])
if not isinstance(tags, list):
tags = []
tags = self._normalize_tags(item.get("tags", []))
results.append(SkillMeta(
name=display_name,
description=summary,
source="clawhub",
identifier=slug,
trust_level="community",
tags=[str(t) for t in tags],
tags=tags,
))
_write_index_cache(cache_key, [_skill_meta_to_dict(s) for s in results])
return results
final_results = self._finalize_search_results(query, results, limit)
_write_index_cache(cache_key, [_skill_meta_to_dict(s) for s in final_results])
return final_results
def fetch(self, identifier: str) -> Optional[SkillBundle]:
slug = identifier.split("/")[-1]
@@ -1244,13 +1408,11 @@ class ClawHubSource(SkillSource):
def inspect(self, identifier: str) -> Optional[SkillMeta]:
slug = identifier.split("/")[-1]
data = self._get_json(f"{self.BASE_URL}/skills/{slug}")
data = self._coerce_skill_payload(self._get_json(f"{self.BASE_URL}/skills/{slug}"))
if not isinstance(data, dict):
return None
tags = data.get("tags", [])
if not isinstance(tags, list):
tags = []
tags = self._normalize_tags(data.get("tags", []))
return SkillMeta(
name=data.get("displayName") or data.get("name") or data.get("slug") or slug,
@@ -1258,9 +1420,75 @@ class ClawHubSource(SkillSource):
source="clawhub",
identifier=data.get("slug") or slug,
trust_level="community",
tags=[str(t) for t in tags],
tags=tags,
)
def _search_catalog(self, query: str, limit: int = 10) -> List[SkillMeta]:
cache_key = f"clawhub_search_catalog_v1_{hashlib.md5(f'{query}|{limit}'.encode()).hexdigest()}"
cached = _read_index_cache(cache_key)
if cached is not None:
return [SkillMeta(**s) for s in cached][:limit]
catalog = self._load_catalog_index()
if not catalog:
return []
results = self._finalize_search_results(query, catalog, limit)
_write_index_cache(cache_key, [_skill_meta_to_dict(s) for s in results])
return results
def _load_catalog_index(self) -> List[SkillMeta]:
cache_key = "clawhub_catalog_v1"
cached = _read_index_cache(cache_key)
if cached is not None:
return [SkillMeta(**s) for s in cached]
cursor: Optional[str] = None
results: List[SkillMeta] = []
seen: set[str] = set()
max_pages = 50
for _ in range(max_pages):
params: Dict[str, Any] = {"limit": 200}
if cursor:
params["cursor"] = cursor
try:
resp = httpx.get(f"{self.BASE_URL}/skills", params=params, timeout=30)
if resp.status_code != 200:
break
data = resp.json()
except (httpx.HTTPError, json.JSONDecodeError):
break
items = data.get("items", []) if isinstance(data, dict) else []
if not isinstance(items, list) or not items:
break
for item in items:
slug = item.get("slug")
if not isinstance(slug, str) or not slug or slug in seen:
continue
seen.add(slug)
display_name = item.get("displayName") or item.get("name") or slug
summary = item.get("summary") or item.get("description") or ""
tags = self._normalize_tags(item.get("tags", []))
results.append(SkillMeta(
name=display_name,
description=summary,
source="clawhub",
identifier=slug,
trust_level="community",
tags=tags,
))
cursor = data.get("nextCursor") if isinstance(data, dict) else None
if not isinstance(cursor, str) or not cursor:
break
_write_index_cache(cache_key, [_skill_meta_to_dict(s) for s in results])
return results
def _get_json(self, url: str, timeout: int = 20) -> Optional[Any]:
try:
resp = httpx.get(url, timeout=timeout)