Files
hermes-agent/tools/registry.py
teknium1 2eca0d4af1 Merge PR #275: fix(batch_runner): preserve traceback when batch worker fails
Authored by batuhankocyigit. Adds explicit traceback logging for batch worker failures and improves tool dispatch error logging in registry.
2026-03-05 01:44:05 -08:00

230 lines
8.4 KiB
Python

"""Central registry for all hermes-agent tools.
Each tool file calls ``registry.register()`` at module level to declare its
schema, handler, toolset membership, and availability check. ``model_tools.py``
queries the registry instead of maintaining its own parallel data structures.
Import chain (circular-import safe):
tools/registry.py (no imports from model_tools or tool files)
^
tools/*.py (import from tools.registry at module level)
^
model_tools.py (imports tools.registry + all tool modules)
^
run_agent.py, cli.py, batch_runner.py, etc.
"""
import json
import logging
from typing import Any, Callable, Dict, List, Optional, Set
logger = logging.getLogger(__name__)
class ToolEntry:
"""Metadata for a single registered tool."""
__slots__ = (
"name", "toolset", "schema", "handler", "check_fn",
"requires_env", "is_async", "description",
)
def __init__(self, name, toolset, schema, handler, check_fn,
requires_env, is_async, description):
self.name = name
self.toolset = toolset
self.schema = schema
self.handler = handler
self.check_fn = check_fn
self.requires_env = requires_env
self.is_async = is_async
self.description = description
class ToolRegistry:
"""Singleton registry that collects tool schemas + handlers from tool files."""
def __init__(self):
self._tools: Dict[str, ToolEntry] = {}
self._toolset_checks: Dict[str, Callable] = {}
# ------------------------------------------------------------------
# Registration
# ------------------------------------------------------------------
def register(
self,
name: str,
toolset: str,
schema: dict,
handler: Callable,
check_fn: Callable = None,
requires_env: list = None,
is_async: bool = False,
description: str = "",
):
"""Register a tool. Called at module-import time by each tool file."""
self._tools[name] = ToolEntry(
name=name,
toolset=toolset,
schema=schema,
handler=handler,
check_fn=check_fn,
requires_env=requires_env or [],
is_async=is_async,
description=description or schema.get("description", ""),
)
if check_fn and toolset not in self._toolset_checks:
self._toolset_checks[toolset] = check_fn
# ------------------------------------------------------------------
# Schema retrieval
# ------------------------------------------------------------------
def get_definitions(self, tool_names: Set[str], quiet: bool = False) -> List[dict]:
"""Return OpenAI-format tool schemas for the requested tool names.
Only tools whose ``check_fn()`` returns True (or have no check_fn)
are included.
"""
result = []
for name in sorted(tool_names):
entry = self._tools.get(name)
if not entry:
continue
if entry.check_fn:
try:
if not entry.check_fn():
if not quiet:
logger.debug("Tool %s unavailable (check failed)", name)
continue
except Exception:
if not quiet:
logger.debug("Tool %s check raised; skipping", name)
continue
result.append({"type": "function", "function": entry.schema})
return result
# ------------------------------------------------------------------
# Dispatch
# ------------------------------------------------------------------
def dispatch(self, name: str, args: dict, **kwargs) -> str:
"""Execute a tool handler by name.
* Async handlers are bridged automatically via ``_run_async()``.
* All exceptions are caught and returned as ``{"error": "..."}``
for consistent error format.
"""
entry = self._tools.get(name)
if not entry:
return json.dumps({"error": f"Unknown tool: {name}"})
try:
if entry.is_async:
from model_tools import _run_async
return _run_async(entry.handler(args, **kwargs))
return entry.handler(args, **kwargs)
except Exception as e:
logger.exception("Tool %s dispatch error: %s", name, e)
return json.dumps({"error": f"Tool execution failed: {type(e).__name__}: {e}"})
# ------------------------------------------------------------------
# Query helpers (replace redundant dicts in model_tools.py)
# ------------------------------------------------------------------
def get_all_tool_names(self) -> List[str]:
"""Return sorted list of all registered tool names."""
return sorted(self._tools.keys())
def get_toolset_for_tool(self, name: str) -> Optional[str]:
"""Return the toolset a tool belongs to, or None."""
entry = self._tools.get(name)
return entry.toolset if entry else None
def get_tool_to_toolset_map(self) -> Dict[str, str]:
"""Return ``{tool_name: toolset_name}`` for every registered tool."""
return {name: e.toolset for name, e in self._tools.items()}
def is_toolset_available(self, toolset: str) -> bool:
"""Check if a toolset's requirements are met.
Returns False (rather than crashing) when the check function raises
an unexpected exception (e.g. network error, missing import, bad config).
"""
check = self._toolset_checks.get(toolset)
if not check:
return True
try:
return bool(check())
except Exception:
logger.debug("Toolset %s check raised; marking unavailable", toolset)
return False
def check_toolset_requirements(self) -> Dict[str, bool]:
"""Return ``{toolset: available_bool}`` for every toolset."""
toolsets = set(e.toolset for e in self._tools.values())
return {ts: self.is_toolset_available(ts) for ts in sorted(toolsets)}
def get_available_toolsets(self) -> Dict[str, dict]:
"""Return toolset metadata for UI display."""
toolsets: Dict[str, dict] = {}
for entry in self._tools.values():
ts = entry.toolset
if ts not in toolsets:
toolsets[ts] = {
"available": self.is_toolset_available(ts),
"tools": [],
"description": "",
"requirements": [],
}
toolsets[ts]["tools"].append(entry.name)
if entry.requires_env:
for env in entry.requires_env:
if env not in toolsets[ts]["requirements"]:
toolsets[ts]["requirements"].append(env)
return toolsets
def get_toolset_requirements(self) -> Dict[str, dict]:
"""Build a TOOLSET_REQUIREMENTS-compatible dict for backward compat."""
result: Dict[str, dict] = {}
for entry in self._tools.values():
ts = entry.toolset
if ts not in result:
result[ts] = {
"name": ts,
"env_vars": [],
"check_fn": self._toolset_checks.get(ts),
"setup_url": None,
"tools": [],
}
if entry.name not in result[ts]["tools"]:
result[ts]["tools"].append(entry.name)
for env in entry.requires_env:
if env not in result[ts]["env_vars"]:
result[ts]["env_vars"].append(env)
return result
def check_tool_availability(self, quiet: bool = False):
"""Return (available_toolsets, unavailable_info) like the old function."""
available = []
unavailable = []
seen = set()
for entry in self._tools.values():
ts = entry.toolset
if ts in seen:
continue
seen.add(ts)
if self.is_toolset_available(ts):
available.append(ts)
else:
unavailable.append({
"name": ts,
"env_vars": entry.requires_env,
"tools": [e.name for e in self._tools.values() if e.toolset == ts],
})
return available, unavailable
# Module-level singleton
registry = ToolRegistry()