Refactor file operations and environment management in file_tools and terminal_tool
- Improved the caching mechanism for ShellFileOperations to ensure stale entries are invalidated when environments are cleaned up. - Enhanced thread safety by refining the use of locks during environment creation and cleanup processes. - Streamlined the cleanup of inactive environments to prevent blocking other tool calls, ensuring efficient resource management. - Added error handling and messaging improvements for better user feedback during environment cleanup.
This commit is contained in:
@@ -13,86 +13,88 @@ _file_ops_cache: dict = {}
|
||||
|
||||
def _get_file_ops(task_id: str = "default") -> ShellFileOperations:
|
||||
"""Get or create ShellFileOperations for a terminal environment.
|
||||
|
||||
|
||||
Respects the TERMINAL_ENV setting -- if the task_id doesn't have an
|
||||
environment yet, creates one using the configured backend (local, docker,
|
||||
modal, etc.) rather than always defaulting to local.
|
||||
|
||||
Thread-safe: uses the same per-task creation locks as terminal_tool to
|
||||
prevent duplicate sandbox creation from concurrent tool calls.
|
||||
"""
|
||||
from tools.terminal_tool import (
|
||||
_active_environments, _env_lock, _create_environment,
|
||||
_get_env_config, _last_activity, _start_cleanup_thread,
|
||||
_check_disk_usage_warning,
|
||||
_creation_locks, _creation_locks_lock,
|
||||
)
|
||||
import time
|
||||
|
||||
# Fast path: check cache without heavy locks
|
||||
|
||||
# Fast path: check cache -- but also verify the underlying environment
|
||||
# is still alive (it may have been killed by the cleanup thread).
|
||||
with _file_ops_lock:
|
||||
if task_id in _file_ops_cache:
|
||||
return _file_ops_cache[task_id]
|
||||
|
||||
# Check if we need to create a new environment.
|
||||
# Uses the same per-task creation locks as terminal_tool to prevent
|
||||
# duplicate sandbox creation from concurrent tool calls.
|
||||
from tools.terminal_tool import _creation_locks, _creation_locks_lock
|
||||
|
||||
needs_creation = False
|
||||
with _env_lock:
|
||||
if task_id not in _active_environments:
|
||||
needs_creation = True
|
||||
|
||||
if needs_creation:
|
||||
# Per-task lock: only one thread creates the sandbox, others wait
|
||||
with _creation_locks_lock:
|
||||
if task_id not in _creation_locks:
|
||||
_creation_locks[task_id] = __import__("threading").Lock()
|
||||
task_lock = _creation_locks[task_id]
|
||||
cached = _file_ops_cache.get(task_id)
|
||||
if cached is not None:
|
||||
with _env_lock:
|
||||
if task_id in _active_environments:
|
||||
_last_activity[task_id] = time.time()
|
||||
return cached
|
||||
else:
|
||||
# Environment was cleaned up -- invalidate stale cache entry
|
||||
with _file_ops_lock:
|
||||
_file_ops_cache.pop(task_id, None)
|
||||
|
||||
# Need to ensure the environment exists before building file_ops.
|
||||
# Acquire per-task lock so only one thread creates the sandbox.
|
||||
with _creation_locks_lock:
|
||||
if task_id not in _creation_locks:
|
||||
_creation_locks[task_id] = threading.Lock()
|
||||
task_lock = _creation_locks[task_id]
|
||||
|
||||
with task_lock:
|
||||
# Double-check: another thread may have created it while we waited
|
||||
with _env_lock:
|
||||
if task_id in _active_environments:
|
||||
_last_activity[task_id] = time.time()
|
||||
terminal_env = _active_environments[task_id]
|
||||
else:
|
||||
terminal_env = None
|
||||
|
||||
if terminal_env is None:
|
||||
from tools.terminal_tool import _task_env_overrides
|
||||
|
||||
config = _get_env_config()
|
||||
env_type = config["env_type"]
|
||||
overrides = _task_env_overrides.get(task_id, {})
|
||||
|
||||
if env_type == "docker":
|
||||
image = overrides.get("docker_image") or config["docker_image"]
|
||||
elif env_type == "singularity":
|
||||
image = overrides.get("singularity_image") or config["singularity_image"]
|
||||
elif env_type == "modal":
|
||||
image = overrides.get("modal_image") or config["modal_image"]
|
||||
else:
|
||||
image = ""
|
||||
|
||||
cwd = overrides.get("cwd") or config["cwd"]
|
||||
if not os.getenv("HERMES_QUIET"):
|
||||
print(f"[FileTools] Creating new {env_type} environment for task {task_id[:8]}...", flush=True)
|
||||
|
||||
terminal_env = _create_environment(
|
||||
env_type=env_type,
|
||||
image=image,
|
||||
cwd=cwd,
|
||||
timeout=config["timeout"],
|
||||
)
|
||||
|
||||
with task_lock:
|
||||
# Double-check after acquiring the per-task lock
|
||||
with _env_lock:
|
||||
if task_id in _active_environments:
|
||||
needs_creation = False
|
||||
_active_environments[task_id] = terminal_env
|
||||
_last_activity[task_id] = time.time()
|
||||
|
||||
if needs_creation:
|
||||
from tools.terminal_tool import _task_env_overrides
|
||||
|
||||
config = _get_env_config()
|
||||
env_type = config["env_type"]
|
||||
overrides = _task_env_overrides.get(task_id, {})
|
||||
|
||||
if env_type == "docker":
|
||||
image = overrides.get("docker_image") or config["docker_image"]
|
||||
elif env_type == "singularity":
|
||||
image = overrides.get("singularity_image") or config["singularity_image"]
|
||||
elif env_type == "modal":
|
||||
image = overrides.get("modal_image") or config["modal_image"]
|
||||
else:
|
||||
image = ""
|
||||
|
||||
cwd = overrides.get("cwd") or config["cwd"]
|
||||
if not os.getenv("HERMES_QUIET"):
|
||||
print(f"[FileTools] Creating new {env_type} environment for task {task_id[:8]}...", flush=True)
|
||||
|
||||
new_env = _create_environment(
|
||||
env_type=env_type,
|
||||
image=image,
|
||||
cwd=cwd,
|
||||
timeout=config["timeout"],
|
||||
)
|
||||
|
||||
with _env_lock:
|
||||
_active_environments[task_id] = new_env
|
||||
_last_activity[task_id] = __import__("time").time()
|
||||
|
||||
_start_cleanup_thread()
|
||||
if not os.getenv("HERMES_QUIET"):
|
||||
print(f"[FileTools] {env_type} environment ready for task {task_id[:8]}", flush=True)
|
||||
|
||||
# Now get the environment and build file_ops
|
||||
with _env_lock:
|
||||
_last_activity[task_id] = time.time()
|
||||
terminal_env = _active_environments[task_id]
|
||||
|
||||
_start_cleanup_thread()
|
||||
if not os.getenv("HERMES_QUIET"):
|
||||
print(f"[FileTools] {env_type} environment ready for task {task_id[:8]}", flush=True)
|
||||
|
||||
# Build file_ops from the (guaranteed live) environment and cache it
|
||||
file_ops = ShellFileOperations(terminal_env)
|
||||
with _file_ops_lock:
|
||||
_file_ops_cache[task_id] = file_ops
|
||||
|
||||
@@ -1274,49 +1274,56 @@ def _cleanup_inactive_envs(lifetime_seconds: int = 300):
|
||||
global _active_environments, _last_activity
|
||||
|
||||
current_time = time.time()
|
||||
tasks_to_cleanup = []
|
||||
|
||||
# Phase 1: collect stale entries and remove them from tracking dicts while
|
||||
# holding the lock. Do NOT call env.cleanup() inside the lock -- Modal and
|
||||
# Docker teardown can block for 10-15s, which would stall every concurrent
|
||||
# terminal/file tool call waiting on _env_lock.
|
||||
envs_to_stop = [] # list of (task_id, env) pairs
|
||||
|
||||
with _env_lock:
|
||||
for task_id, last_time in list(_last_activity.items()):
|
||||
if current_time - last_time > lifetime_seconds:
|
||||
tasks_to_cleanup.append(task_id)
|
||||
env = _active_environments.pop(task_id, None)
|
||||
_last_activity.pop(task_id, None)
|
||||
_task_workdirs.pop(task_id, None)
|
||||
if env is not None:
|
||||
envs_to_stop.append((task_id, env))
|
||||
|
||||
for task_id in tasks_to_cleanup:
|
||||
try:
|
||||
if task_id in _active_environments:
|
||||
env = _active_environments[task_id]
|
||||
# Try various cleanup methods
|
||||
if hasattr(env, 'cleanup'):
|
||||
env.cleanup()
|
||||
elif hasattr(env, 'stop'):
|
||||
env.stop()
|
||||
elif hasattr(env, 'terminate'):
|
||||
env.terminate()
|
||||
# Also purge per-task creation locks for cleaned-up tasks
|
||||
with _creation_locks_lock:
|
||||
for task_id, _ in envs_to_stop:
|
||||
_creation_locks.pop(task_id, None)
|
||||
|
||||
del _active_environments[task_id]
|
||||
if not os.getenv("HERMES_QUIET"):
|
||||
print(f"[Terminal Cleanup] Cleaned up inactive environment for task: {task_id}")
|
||||
# Phase 2: stop the actual sandboxes OUTSIDE the lock so other tool calls
|
||||
# are not blocked while Modal/Docker sandboxes shut down.
|
||||
for task_id, env in envs_to_stop:
|
||||
# Invalidate stale file_ops cache entry (Bug fix: prevents
|
||||
# ShellFileOperations from referencing a dead sandbox)
|
||||
try:
|
||||
from tools.file_tools import clear_file_ops_cache
|
||||
clear_file_ops_cache(task_id)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
if task_id in _last_activity:
|
||||
del _last_activity[task_id]
|
||||
if task_id in _task_workdirs:
|
||||
del _task_workdirs[task_id]
|
||||
try:
|
||||
if hasattr(env, 'cleanup'):
|
||||
env.cleanup()
|
||||
elif hasattr(env, 'stop'):
|
||||
env.stop()
|
||||
elif hasattr(env, 'terminate'):
|
||||
env.terminate()
|
||||
|
||||
except Exception as e:
|
||||
error_str = str(e)
|
||||
if not os.getenv("HERMES_QUIET"):
|
||||
if "404" in error_str or "not found" in error_str.lower():
|
||||
print(f"[Terminal Cleanup] Environment for task {task_id} already cleaned up")
|
||||
else:
|
||||
print(f"[Terminal Cleanup] Error cleaning up environment for task {task_id}: {e}")
|
||||
|
||||
# Always remove from tracking dicts
|
||||
if task_id in _active_environments:
|
||||
del _active_environments[task_id]
|
||||
if task_id in _last_activity:
|
||||
del _last_activity[task_id]
|
||||
if task_id in _task_workdirs:
|
||||
del _task_workdirs[task_id]
|
||||
if not os.getenv("HERMES_QUIET"):
|
||||
print(f"[Terminal Cleanup] Cleaned up inactive environment for task: {task_id}")
|
||||
|
||||
except Exception as e:
|
||||
error_str = str(e)
|
||||
if not os.getenv("HERMES_QUIET"):
|
||||
if "404" in error_str or "not found" in error_str.lower():
|
||||
print(f"[Terminal Cleanup] Environment for task {task_id} already cleaned up")
|
||||
else:
|
||||
print(f"[Terminal Cleanup] Error cleaning up environment for task {task_id}: {e}")
|
||||
|
||||
|
||||
def _cleanup_thread_worker():
|
||||
@@ -1415,34 +1422,47 @@ def cleanup_vm(task_id: str):
|
||||
"""Manually clean up a specific environment by task_id."""
|
||||
global _active_environments, _last_activity, _task_workdirs
|
||||
|
||||
# Remove from tracking dicts while holding the lock, but defer the
|
||||
# actual (potentially slow) env.cleanup() call to outside the lock
|
||||
# so other tool calls aren't blocked.
|
||||
env = None
|
||||
with _env_lock:
|
||||
try:
|
||||
if task_id in _active_environments:
|
||||
env = _active_environments[task_id]
|
||||
if hasattr(env, 'cleanup'):
|
||||
env.cleanup()
|
||||
elif hasattr(env, 'stop'):
|
||||
env.stop()
|
||||
elif hasattr(env, 'terminate'):
|
||||
env.terminate()
|
||||
env = _active_environments.pop(task_id, None)
|
||||
_task_workdirs.pop(task_id, None)
|
||||
_last_activity.pop(task_id, None)
|
||||
|
||||
del _active_environments[task_id]
|
||||
if not os.getenv("HERMES_QUIET"):
|
||||
print(f"[Terminal Cleanup] Manually cleaned up environment for task: {task_id}")
|
||||
# Clean up per-task creation lock
|
||||
with _creation_locks_lock:
|
||||
_creation_locks.pop(task_id, None)
|
||||
|
||||
if task_id in _task_workdirs:
|
||||
del _task_workdirs[task_id]
|
||||
# Invalidate stale file_ops cache entry
|
||||
try:
|
||||
from tools.file_tools import clear_file_ops_cache
|
||||
clear_file_ops_cache(task_id)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
if task_id in _last_activity:
|
||||
del _last_activity[task_id]
|
||||
if env is None:
|
||||
return
|
||||
|
||||
except Exception as e:
|
||||
if not os.getenv("HERMES_QUIET"):
|
||||
error_str = str(e)
|
||||
if "404" in error_str or "not found" in error_str.lower():
|
||||
print(f"[Terminal Cleanup] Environment for task {task_id} already cleaned up")
|
||||
else:
|
||||
print(f"[Terminal Cleanup] Error cleaning up environment for task {task_id}: {e}")
|
||||
try:
|
||||
if hasattr(env, 'cleanup'):
|
||||
env.cleanup()
|
||||
elif hasattr(env, 'stop'):
|
||||
env.stop()
|
||||
elif hasattr(env, 'terminate'):
|
||||
env.terminate()
|
||||
|
||||
if not os.getenv("HERMES_QUIET"):
|
||||
print(f"[Terminal Cleanup] Manually cleaned up environment for task: {task_id}")
|
||||
|
||||
except Exception as e:
|
||||
if not os.getenv("HERMES_QUIET"):
|
||||
error_str = str(e)
|
||||
if "404" in error_str or "not found" in error_str.lower():
|
||||
print(f"[Terminal Cleanup] Environment for task {task_id} already cleaned up")
|
||||
else:
|
||||
print(f"[Terminal Cleanup] Error cleaning up environment for task {task_id}: {e}")
|
||||
|
||||
|
||||
def _atexit_cleanup():
|
||||
|
||||
Reference in New Issue
Block a user