Replace asyncio.run() with thread-local persistent event loops for worker threads (e.g., delegate_task's ThreadPoolExecutor). asyncio.run() creates and closes a fresh loop on every call, leaving cached httpx/AsyncOpenAI clients bound to a dead loop — causing 'Event loop is closed' errors during GC when parallel subagents clean up connections. The fix mirrors the main thread's _get_tool_loop() pattern but uses threading.local() so each worker thread gets its own long-lived loop, avoiding both cross-thread contention and the create-destroy lifecycle. Added 4 regression tests covering worker loop persistence, reuse, per-thread isolation, and separation from the main thread's loop.
478 lines
19 KiB
Python
478 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Model Tools Module
|
|
|
|
Thin orchestration layer over the tool registry. Each tool file in tools/
|
|
self-registers its schema, handler, and metadata via tools.registry.register().
|
|
This module triggers discovery (by importing all tool modules), then provides
|
|
the public API that run_agent.py, cli.py, batch_runner.py, and the RL
|
|
environments consume.
|
|
|
|
Public API (signatures preserved from the original 2,400-line version):
|
|
get_tool_definitions(enabled_toolsets, disabled_toolsets, quiet_mode) -> list
|
|
handle_function_call(function_name, function_args, task_id, user_task) -> str
|
|
TOOL_TO_TOOLSET_MAP: dict (for batch_runner.py)
|
|
TOOLSET_REQUIREMENTS: dict (for cli.py, doctor.py)
|
|
get_all_tool_names() -> list
|
|
get_toolset_for_tool(name) -> str
|
|
get_available_toolsets() -> dict
|
|
check_toolset_requirements() -> dict
|
|
check_tool_availability(quiet) -> tuple
|
|
"""
|
|
|
|
import json
|
|
import asyncio
|
|
import os
|
|
import logging
|
|
import threading
|
|
from typing import Dict, Any, List, Optional, Tuple
|
|
|
|
from tools.registry import registry
|
|
from toolsets import resolve_toolset, validate_toolset
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# =============================================================================
|
|
# Async Bridging (single source of truth -- used by registry.dispatch too)
|
|
# =============================================================================
|
|
|
|
_tool_loop = None # persistent loop for the main (CLI) thread
|
|
_tool_loop_lock = threading.Lock()
|
|
_worker_thread_local = threading.local() # per-worker-thread persistent loops
|
|
|
|
|
|
def _get_tool_loop():
|
|
"""Return a long-lived event loop for running async tool handlers.
|
|
|
|
Using a persistent loop (instead of asyncio.run() which creates and
|
|
*closes* a fresh loop every time) prevents "Event loop is closed"
|
|
errors that occur when cached httpx/AsyncOpenAI clients attempt to
|
|
close their transport on a dead loop during garbage collection.
|
|
"""
|
|
global _tool_loop
|
|
with _tool_loop_lock:
|
|
if _tool_loop is None or _tool_loop.is_closed():
|
|
_tool_loop = asyncio.new_event_loop()
|
|
return _tool_loop
|
|
|
|
|
|
def _get_worker_loop():
|
|
"""Return a persistent event loop for the current worker thread.
|
|
|
|
Each worker thread (e.g., delegate_task's ThreadPoolExecutor threads)
|
|
gets its own long-lived loop stored in thread-local storage. This
|
|
prevents the "Event loop is closed" errors that occurred when
|
|
asyncio.run() was used per-call: asyncio.run() creates a loop, runs
|
|
the coroutine, then *closes* the loop — but cached httpx/AsyncOpenAI
|
|
clients remain bound to that now-dead loop and raise RuntimeError
|
|
during garbage collection or subsequent use.
|
|
|
|
By keeping the loop alive for the thread's lifetime, cached clients
|
|
stay valid and their cleanup runs on a live loop.
|
|
"""
|
|
loop = getattr(_worker_thread_local, 'loop', None)
|
|
if loop is None or loop.is_closed():
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
_worker_thread_local.loop = loop
|
|
return loop
|
|
|
|
|
|
def _run_async(coro):
|
|
"""Run an async coroutine from a sync context.
|
|
|
|
If the current thread already has a running event loop (e.g., inside
|
|
the gateway's async stack or Atropos's event loop), we spin up a
|
|
disposable thread so asyncio.run() can create its own loop without
|
|
conflicting.
|
|
|
|
For the common CLI path (no running loop), we use a persistent event
|
|
loop so that cached async clients (httpx / AsyncOpenAI) remain bound
|
|
to a live loop and don't trigger "Event loop is closed" on GC.
|
|
|
|
When called from a worker thread (parallel tool execution), we use a
|
|
per-thread persistent loop to avoid both contention with the main
|
|
thread's shared loop AND the "Event loop is closed" errors caused by
|
|
asyncio.run()'s create-and-destroy lifecycle.
|
|
|
|
This is the single source of truth for sync->async bridging in tool
|
|
handlers. The RL paths (agent_loop.py, tool_context.py) also provide
|
|
outer thread-pool wrapping as defense-in-depth, but each handler is
|
|
self-protecting via this function.
|
|
"""
|
|
try:
|
|
loop = asyncio.get_running_loop()
|
|
except RuntimeError:
|
|
loop = None
|
|
|
|
if loop and loop.is_running():
|
|
# Inside an async context (gateway, RL env) — run in a fresh thread.
|
|
import concurrent.futures
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
|
|
future = pool.submit(asyncio.run, coro)
|
|
return future.result(timeout=300)
|
|
|
|
# If we're on a worker thread (e.g., parallel tool execution in
|
|
# delegate_task), use a per-thread persistent loop. This avoids
|
|
# contention with the main thread's shared loop while keeping cached
|
|
# httpx/AsyncOpenAI clients bound to a live loop for the thread's
|
|
# lifetime — preventing "Event loop is closed" on GC cleanup.
|
|
if threading.current_thread() is not threading.main_thread():
|
|
worker_loop = _get_worker_loop()
|
|
return worker_loop.run_until_complete(coro)
|
|
|
|
tool_loop = _get_tool_loop()
|
|
return tool_loop.run_until_complete(coro)
|
|
|
|
|
|
# =============================================================================
|
|
# Tool Discovery (importing each module triggers its registry.register calls)
|
|
# =============================================================================
|
|
|
|
def _discover_tools():
|
|
"""Import all tool modules to trigger their registry.register() calls.
|
|
|
|
Wrapped in a function so import errors in optional tools (e.g., fal_client
|
|
not installed) don't prevent the rest from loading.
|
|
"""
|
|
_modules = [
|
|
"tools.web_tools",
|
|
"tools.terminal_tool",
|
|
"tools.file_tools",
|
|
"tools.vision_tools",
|
|
"tools.mixture_of_agents_tool",
|
|
"tools.image_generation_tool",
|
|
"tools.skills_tool",
|
|
"tools.skill_manager_tool",
|
|
"tools.browser_tool",
|
|
"tools.cronjob_tools",
|
|
"tools.rl_training_tool",
|
|
"tools.tts_tool",
|
|
"tools.todo_tool",
|
|
"tools.memory_tool",
|
|
"tools.session_search_tool",
|
|
"tools.clarify_tool",
|
|
"tools.code_execution_tool",
|
|
"tools.delegate_tool",
|
|
"tools.process_registry",
|
|
"tools.send_message_tool",
|
|
"tools.honcho_tools",
|
|
"tools.homeassistant_tool",
|
|
]
|
|
import importlib
|
|
for mod_name in _modules:
|
|
try:
|
|
importlib.import_module(mod_name)
|
|
except Exception as e:
|
|
logger.warning("Could not import tool module %s: %s", mod_name, e)
|
|
|
|
|
|
_discover_tools()
|
|
|
|
# MCP tool discovery (external MCP servers from config)
|
|
try:
|
|
from tools.mcp_tool import discover_mcp_tools
|
|
discover_mcp_tools()
|
|
except Exception as e:
|
|
logger.debug("MCP tool discovery failed: %s", e)
|
|
|
|
# Plugin tool discovery (user/project/pip plugins)
|
|
try:
|
|
from hermes_cli.plugins import discover_plugins
|
|
discover_plugins()
|
|
except Exception as e:
|
|
logger.debug("Plugin discovery failed: %s", e)
|
|
|
|
|
|
# =============================================================================
|
|
# Backward-compat constants (built once after discovery)
|
|
# =============================================================================
|
|
|
|
TOOL_TO_TOOLSET_MAP: Dict[str, str] = registry.get_tool_to_toolset_map()
|
|
|
|
TOOLSET_REQUIREMENTS: Dict[str, dict] = registry.get_toolset_requirements()
|
|
|
|
# Resolved tool names from the last get_tool_definitions() call.
|
|
# Used by code_execution_tool to know which tools are available in this session.
|
|
_last_resolved_tool_names: List[str] = []
|
|
|
|
|
|
# =============================================================================
|
|
# Legacy toolset name mapping (old _tools-suffixed names -> tool name lists)
|
|
# =============================================================================
|
|
|
|
_LEGACY_TOOLSET_MAP = {
|
|
"web_tools": ["web_search", "web_extract"],
|
|
"terminal_tools": ["terminal"],
|
|
"vision_tools": ["vision_analyze"],
|
|
"moa_tools": ["mixture_of_agents"],
|
|
"image_tools": ["image_generate"],
|
|
"skills_tools": ["skills_list", "skill_view", "skill_manage"],
|
|
"browser_tools": [
|
|
"browser_navigate", "browser_snapshot", "browser_click",
|
|
"browser_type", "browser_scroll", "browser_back",
|
|
"browser_press", "browser_close", "browser_get_images",
|
|
"browser_vision", "browser_console"
|
|
],
|
|
"cronjob_tools": ["cronjob"],
|
|
"rl_tools": [
|
|
"rl_list_environments", "rl_select_environment",
|
|
"rl_get_current_config", "rl_edit_config",
|
|
"rl_start_training", "rl_check_status",
|
|
"rl_stop_training", "rl_get_results",
|
|
"rl_list_runs", "rl_test_inference"
|
|
],
|
|
"file_tools": ["read_file", "write_file", "patch", "search_files"],
|
|
"tts_tools": ["text_to_speech"],
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# get_tool_definitions (the main schema provider)
|
|
# =============================================================================
|
|
|
|
def get_tool_definitions(
|
|
enabled_toolsets: List[str] = None,
|
|
disabled_toolsets: List[str] = None,
|
|
quiet_mode: bool = False,
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get tool definitions for model API calls with toolset-based filtering.
|
|
|
|
All tools must be part of a toolset to be accessible.
|
|
|
|
Args:
|
|
enabled_toolsets: Only include tools from these toolsets.
|
|
disabled_toolsets: Exclude tools from these toolsets (if enabled_toolsets is None).
|
|
quiet_mode: Suppress status prints.
|
|
|
|
Returns:
|
|
Filtered list of OpenAI-format tool definitions.
|
|
"""
|
|
# Determine which tool names the caller wants
|
|
tools_to_include: set = set()
|
|
|
|
if enabled_toolsets:
|
|
for toolset_name in enabled_toolsets:
|
|
if validate_toolset(toolset_name):
|
|
resolved = resolve_toolset(toolset_name)
|
|
tools_to_include.update(resolved)
|
|
if not quiet_mode:
|
|
print(f"✅ Enabled toolset '{toolset_name}': {', '.join(resolved) if resolved else 'no tools'}")
|
|
elif toolset_name in _LEGACY_TOOLSET_MAP:
|
|
legacy_tools = _LEGACY_TOOLSET_MAP[toolset_name]
|
|
tools_to_include.update(legacy_tools)
|
|
if not quiet_mode:
|
|
print(f"✅ Enabled legacy toolset '{toolset_name}': {', '.join(legacy_tools)}")
|
|
else:
|
|
if not quiet_mode:
|
|
print(f"⚠️ Unknown toolset: {toolset_name}")
|
|
|
|
elif disabled_toolsets:
|
|
from toolsets import get_all_toolsets
|
|
for ts_name in get_all_toolsets():
|
|
tools_to_include.update(resolve_toolset(ts_name))
|
|
|
|
for toolset_name in disabled_toolsets:
|
|
if validate_toolset(toolset_name):
|
|
resolved = resolve_toolset(toolset_name)
|
|
tools_to_include.difference_update(resolved)
|
|
if not quiet_mode:
|
|
print(f"🚫 Disabled toolset '{toolset_name}': {', '.join(resolved) if resolved else 'no tools'}")
|
|
elif toolset_name in _LEGACY_TOOLSET_MAP:
|
|
legacy_tools = _LEGACY_TOOLSET_MAP[toolset_name]
|
|
tools_to_include.difference_update(legacy_tools)
|
|
if not quiet_mode:
|
|
print(f"🚫 Disabled legacy toolset '{toolset_name}': {', '.join(legacy_tools)}")
|
|
else:
|
|
if not quiet_mode:
|
|
print(f"⚠️ Unknown toolset: {toolset_name}")
|
|
else:
|
|
from toolsets import get_all_toolsets
|
|
for ts_name in get_all_toolsets():
|
|
tools_to_include.update(resolve_toolset(ts_name))
|
|
|
|
# Always include plugin-registered tools — they bypass the toolset filter
|
|
# because their toolsets are dynamic (created at plugin load time).
|
|
try:
|
|
from hermes_cli.plugins import get_plugin_tool_names
|
|
plugin_tools = get_plugin_tool_names()
|
|
if plugin_tools:
|
|
tools_to_include.update(plugin_tools)
|
|
except Exception:
|
|
pass
|
|
|
|
# Ask the registry for schemas (only returns tools whose check_fn passes)
|
|
filtered_tools = registry.get_definitions(tools_to_include, quiet=quiet_mode)
|
|
|
|
# The set of tool names that actually passed check_fn filtering.
|
|
# Use this (not tools_to_include) for any downstream schema that references
|
|
# other tools by name — otherwise the model sees tools mentioned in
|
|
# descriptions that don't actually exist, and hallucinates calls to them.
|
|
available_tool_names = {t["function"]["name"] for t in filtered_tools}
|
|
|
|
# Rebuild execute_code schema to only list sandbox tools that are actually
|
|
# available. Without this, the model sees "web_search is available in
|
|
# execute_code" even when the API key isn't configured or the toolset is
|
|
# disabled (#560-discord).
|
|
if "execute_code" in available_tool_names:
|
|
from tools.code_execution_tool import SANDBOX_ALLOWED_TOOLS, build_execute_code_schema
|
|
sandbox_enabled = SANDBOX_ALLOWED_TOOLS & available_tool_names
|
|
dynamic_schema = build_execute_code_schema(sandbox_enabled)
|
|
for i, td in enumerate(filtered_tools):
|
|
if td.get("function", {}).get("name") == "execute_code":
|
|
filtered_tools[i] = {"type": "function", "function": dynamic_schema}
|
|
break
|
|
|
|
# Strip web tool cross-references from browser_navigate description when
|
|
# web_search / web_extract are not available. The static schema says
|
|
# "prefer web_search or web_extract" which causes the model to hallucinate
|
|
# those tools when they're missing.
|
|
if "browser_navigate" in available_tool_names:
|
|
web_tools_available = {"web_search", "web_extract"} & available_tool_names
|
|
if not web_tools_available:
|
|
for i, td in enumerate(filtered_tools):
|
|
if td.get("function", {}).get("name") == "browser_navigate":
|
|
desc = td["function"].get("description", "")
|
|
desc = desc.replace(
|
|
" For simple information retrieval, prefer web_search or web_extract (faster, cheaper).",
|
|
"",
|
|
)
|
|
filtered_tools[i] = {
|
|
"type": "function",
|
|
"function": {**td["function"], "description": desc},
|
|
}
|
|
break
|
|
|
|
if not quiet_mode:
|
|
if filtered_tools:
|
|
tool_names = [t["function"]["name"] for t in filtered_tools]
|
|
print(f"🛠️ Final tool selection ({len(filtered_tools)} tools): {', '.join(tool_names)}")
|
|
else:
|
|
print("🛠️ No tools selected (all filtered out or unavailable)")
|
|
|
|
global _last_resolved_tool_names
|
|
_last_resolved_tool_names = [t["function"]["name"] for t in filtered_tools]
|
|
|
|
return filtered_tools
|
|
|
|
|
|
# =============================================================================
|
|
# handle_function_call (the main dispatcher)
|
|
# =============================================================================
|
|
|
|
# Tools whose execution is intercepted by the agent loop (run_agent.py)
|
|
# because they need agent-level state (TodoStore, MemoryStore, etc.).
|
|
# The registry still holds their schemas; dispatch just returns a stub error
|
|
# so if something slips through, the LLM sees a sensible message.
|
|
_AGENT_LOOP_TOOLS = {"todo", "memory", "session_search", "delegate_task"}
|
|
_READ_SEARCH_TOOLS = {"read_file", "search_files"}
|
|
|
|
|
|
def handle_function_call(
|
|
function_name: str,
|
|
function_args: Dict[str, Any],
|
|
task_id: Optional[str] = None,
|
|
user_task: Optional[str] = None,
|
|
enabled_tools: Optional[List[str]] = None,
|
|
honcho_manager: Optional[Any] = None,
|
|
honcho_session_key: Optional[str] = None,
|
|
) -> str:
|
|
"""
|
|
Main function call dispatcher that routes calls to the tool registry.
|
|
|
|
Args:
|
|
function_name: Name of the function to call.
|
|
function_args: Arguments for the function.
|
|
task_id: Unique identifier for terminal/browser session isolation.
|
|
user_task: The user's original task (for browser_snapshot context).
|
|
enabled_tools: Tool names enabled for this session. When provided,
|
|
execute_code uses this list to determine which sandbox
|
|
tools to generate. Falls back to the process-global
|
|
``_last_resolved_tool_names`` for backward compat.
|
|
|
|
Returns:
|
|
Function result as a JSON string.
|
|
"""
|
|
# Notify the read-loop tracker when a non-read/search tool runs,
|
|
# so the *consecutive* counter resets (reads after other work are fine).
|
|
if function_name not in _READ_SEARCH_TOOLS:
|
|
try:
|
|
from tools.file_tools import notify_other_tool_call
|
|
notify_other_tool_call(task_id or "default")
|
|
except Exception:
|
|
pass # file_tools may not be loaded yet
|
|
|
|
try:
|
|
if function_name in _AGENT_LOOP_TOOLS:
|
|
return json.dumps({"error": f"{function_name} must be handled by the agent loop"})
|
|
|
|
try:
|
|
from hermes_cli.plugins import invoke_hook
|
|
invoke_hook("pre_tool_call", tool_name=function_name, args=function_args, task_id=task_id or "")
|
|
except Exception:
|
|
pass
|
|
|
|
if function_name == "execute_code":
|
|
# Prefer the caller-provided list so subagents can't overwrite
|
|
# the parent's tool set via the process-global.
|
|
sandbox_enabled = enabled_tools if enabled_tools is not None else _last_resolved_tool_names
|
|
result = registry.dispatch(
|
|
function_name, function_args,
|
|
task_id=task_id,
|
|
enabled_tools=sandbox_enabled,
|
|
honcho_manager=honcho_manager,
|
|
honcho_session_key=honcho_session_key,
|
|
)
|
|
else:
|
|
result = registry.dispatch(
|
|
function_name, function_args,
|
|
task_id=task_id,
|
|
user_task=user_task,
|
|
honcho_manager=honcho_manager,
|
|
honcho_session_key=honcho_session_key,
|
|
)
|
|
|
|
try:
|
|
from hermes_cli.plugins import invoke_hook
|
|
invoke_hook("post_tool_call", tool_name=function_name, args=function_args, result=result, task_id=task_id or "")
|
|
except Exception:
|
|
pass
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error executing {function_name}: {str(e)}"
|
|
logger.error(error_msg)
|
|
return json.dumps({"error": error_msg}, ensure_ascii=False)
|
|
|
|
|
|
# =============================================================================
|
|
# Backward-compat wrapper functions
|
|
# =============================================================================
|
|
|
|
def get_all_tool_names() -> List[str]:
|
|
"""Return all registered tool names."""
|
|
return registry.get_all_tool_names()
|
|
|
|
|
|
def get_toolset_for_tool(tool_name: str) -> Optional[str]:
|
|
"""Return the toolset a tool belongs to."""
|
|
return registry.get_toolset_for_tool(tool_name)
|
|
|
|
|
|
def get_available_toolsets() -> Dict[str, dict]:
|
|
"""Return toolset availability info for UI display."""
|
|
return registry.get_available_toolsets()
|
|
|
|
|
|
def check_toolset_requirements() -> Dict[str, bool]:
|
|
"""Return {toolset: available_bool} for every registered toolset."""
|
|
return registry.check_toolset_requirements()
|
|
|
|
|
|
def check_tool_availability(quiet: bool = False) -> Tuple[List[str], List[dict]]:
|
|
"""Return (available_toolsets, unavailable_info)."""
|
|
return registry.check_tool_availability(quiet=quiet)
|