2025-07-25 15:15:36 +00:00
#!/usr/bin/env python3
"""
2026-01-23 12:26:53 +00:00
Terminal Tool Module ( mini - swe - agent backend )
2025-07-25 15:15:36 +00:00
2026-01-23 12:26:53 +00:00
A terminal tool that executes commands using mini - swe - agent ' s execution environments.
Supports local execution , Docker containers , and Modal cloud sandboxes .
2025-11-04 03:32:43 -05:00
2026-01-23 12:26:53 +00:00
Environment Selection ( via TERMINAL_ENV environment variable ) :
- " local " : Execute directly on the host machine ( default , fastest )
- " docker " : Execute in Docker containers ( isolated , requires Docker )
- " modal " : Execute in Modal cloud sandboxes ( scalable , requires Modal account )
2025-11-02 08:52:05 +08:00
2026-01-23 12:26:53 +00:00
Features :
- Multiple execution backends ( local , docker , modal )
- Background task support
- VM / container lifecycle management
- Automatic cleanup after inactivity
2025-07-25 15:15:36 +00:00
Usage :
2025-07-26 04:31:17 +00:00
from terminal_tool import terminal_tool
2025-11-02 08:52:05 +08:00
2026-01-23 12:26:53 +00:00
# Execute a simple command
2025-07-26 04:31:17 +00:00
result = terminal_tool ( " ls -la " )
2025-11-02 08:52:05 +08:00
2026-01-23 12:26:53 +00:00
# Execute in background
result = terminal_tool ( " python server.py " , background = True )
2025-07-25 15:15:36 +00:00
"""
import json
import os
2026-01-23 12:26:53 +00:00
import sys
2025-11-04 03:32:43 -05:00
import time
2026-01-23 12:26:53 +00:00
import threading
2025-11-04 03:32:43 -05:00
import atexit
2026-01-29 06:10:24 +00:00
import shutil
import subprocess
import tempfile
import uuid
2026-01-23 12:26:53 +00:00
from pathlib import Path
2025-07-26 04:31:17 +00:00
from typing import Optional , Dict , Any
2025-07-25 15:15:36 +00:00
2026-01-23 12:26:53 +00:00
# Add mini-swe-agent to path if not installed
mini_swe_path = Path ( __file__ ) . parent . parent / " mini-swe-agent " / " src "
if mini_swe_path . exists ( ) :
sys . path . insert ( 0 , str ( mini_swe_path ) )
2026-01-29 06:10:24 +00:00
# =============================================================================
# Custom Singularity Environment with more space
# =============================================================================
def _get_scratch_dir ( ) - > Path :
""" Get the best directory for Singularity sandboxes - prefers /scratch if available. """
# Check for configurable scratch directory first (highest priority)
custom_scratch = os . getenv ( " TERMINAL_SCRATCH_DIR " )
if custom_scratch :
scratch_path = Path ( custom_scratch )
scratch_path . mkdir ( parents = True , exist_ok = True )
return scratch_path
# Check for /scratch (common on HPC clusters, especially GPU nodes)
scratch = Path ( " /scratch " )
if scratch . exists ( ) and os . access ( scratch , os . W_OK ) :
# Create user-specific subdirectory
user_scratch = scratch / os . getenv ( " USER " , " hermes " ) / " hermes-agent "
user_scratch . mkdir ( parents = True , exist_ok = True )
2026-01-31 06:30:48 +00:00
if not os . getenv ( " HERMES_QUIET " ) :
print ( f " [Terminal] Using /scratch for sandboxes: { user_scratch } " )
2026-01-29 06:10:24 +00:00
return user_scratch
# Fall back to /tmp
2026-01-31 06:30:48 +00:00
if not os . getenv ( " HERMES_QUIET " ) :
print ( " [Terminal] Warning: /scratch not available, using /tmp (limited space) " )
2026-01-29 06:10:24 +00:00
return Path ( tempfile . gettempdir ( ) )
2026-01-29 22:47:11 +00:00
def _get_apptainer_cache_dir ( ) - > Path :
""" Get the Apptainer cache directory for SIF images. """
# Check for APPTAINER_CACHEDIR env var
cache_dir = os . getenv ( " APPTAINER_CACHEDIR " )
if cache_dir :
cache_path = Path ( cache_dir )
cache_path . mkdir ( parents = True , exist_ok = True )
return cache_path
2026-02-09 04:35:25 +00:00
# Use user-specific subdirectory in scratch for cache
2026-01-29 22:47:11 +00:00
scratch = _get_scratch_dir ( )
2026-02-09 04:35:25 +00:00
cache_path = scratch / " .apptainer "
2026-01-29 22:47:11 +00:00
cache_path . mkdir ( parents = True , exist_ok = True )
return cache_path
# Lock for SIF building to prevent race conditions
_sif_build_lock = threading . Lock ( )
def _get_or_build_sif ( image : str , executable : str = " apptainer " ) - > str :
"""
Get or build a SIF image from a docker : / / URL .
If the image is already a . sif file , returns it as - is .
If the image is a docker : / / URL , checks for cached SIF and builds if needed .
Args :
image : Image path ( docker : / / . . . URL or . sif path )
executable : apptainer or singularity
Returns :
Path to SIF file , or original image if not a docker : / / URL
"""
# If already a .sif file, use it directly
if image . endswith ( ' .sif ' ) and Path ( image ) . exists ( ) :
return image
# If not a docker:// URL, return as-is (could be a local sandbox or other format)
if not image . startswith ( ' docker:// ' ) :
return image
# Generate SIF filename from docker image name
# docker://nikolaik/python-nodejs:python3.11-nodejs20 -> python-nodejs-python3.11-nodejs20.sif
image_name = image . replace ( ' docker:// ' , ' ' ) . replace ( ' / ' , ' - ' ) . replace ( ' : ' , ' - ' )
cache_dir = _get_apptainer_cache_dir ( )
sif_path = cache_dir / f " { image_name } .sif "
# Check if SIF already exists
if sif_path . exists ( ) :
return str ( sif_path )
# Build SIF with lock to prevent multiple workers building simultaneously
with _sif_build_lock :
# Double-check after acquiring lock (another thread may have built it)
if sif_path . exists ( ) :
return str ( sif_path )
print ( f " [Terminal] Building SIF image (one-time setup)... " )
print ( f " [Terminal] Source: { image } " )
print ( f " [Terminal] Target: { sif_path } " )
# Ensure tmp directory exists for build
tmp_dir = cache_dir / " tmp "
tmp_dir . mkdir ( parents = True , exist_ok = True )
# Set APPTAINER_TMPDIR for the build
env = os . environ . copy ( )
env [ " APPTAINER_TMPDIR " ] = str ( tmp_dir )
env [ " APPTAINER_CACHEDIR " ] = str ( cache_dir )
try :
result = subprocess . run (
[ executable , " build " , str ( sif_path ) , image ] ,
capture_output = True ,
text = True ,
timeout = 600 , # 10 min timeout for pulling and building
env = env
)
if result . returncode != 0 :
print ( f " [Terminal] ⚠️ SIF build failed, falling back to docker:// URL " )
print ( f " [Terminal] Error: { result . stderr [ : 500 ] } " )
return image
print ( f " [Terminal] ✅ SIF image built successfully " )
return str ( sif_path )
except subprocess . TimeoutExpired :
print ( f " [Terminal] ⚠️ SIF build timed out, falling back to docker:// URL " )
# Clean up partial file
if sif_path . exists ( ) :
sif_path . unlink ( )
return image
except Exception as e :
print ( f " [Terminal] ⚠️ SIF build error: { e } , falling back to docker:// URL " )
return image
2026-01-29 06:10:24 +00:00
# Disk usage warning threshold (in GB)
DISK_USAGE_WARNING_THRESHOLD_GB = float ( os . getenv ( " TERMINAL_DISK_WARNING_GB " , " 500 " ) )
def _check_disk_usage_warning ( ) :
""" Check if total disk usage exceeds warning threshold. """
scratch_dir = _get_scratch_dir ( )
try :
# Get total size of hermes directories
total_bytes = 0
import glob
for path in glob . glob ( str ( scratch_dir / " hermes-* " ) ) :
for f in Path ( path ) . rglob ( ' * ' ) :
if f . is_file ( ) :
try :
total_bytes + = f . stat ( ) . st_size
except :
pass
total_gb = total_bytes / ( 1024 * * 3 )
if total_gb > DISK_USAGE_WARNING_THRESHOLD_GB :
print ( f " ⚠️ [Terminal] WARNING: Disk usage ( { total_gb : .1f } GB) exceeds threshold ( { DISK_USAGE_WARNING_THRESHOLD_GB } GB) " )
print ( f " Consider running cleanup_all_environments() or reducing parallel workers " )
return True
return False
except Exception as e :
return False
2026-02-01 15:36:26 -08:00
# Session-cached sudo password (persists until CLI exits)
_cached_sudo_password : str = " "
2026-02-02 23:35:18 -08:00
# =============================================================================
# Dangerous Command Approval System
# =============================================================================
# Session-cached dangerous command approvals (pattern -> approved)
_session_approved_patterns : set = set ( )
# Dangerous command patterns (regex, description)
DANGEROUS_PATTERNS = [
( r ' \ brm \ s+(-[^ \ s]* \ s+)*/ ' , " delete in root path " ) ,
( r ' \ brm \ s+(-[^ \ s]*)?r ' , " recursive delete " ) ,
( r ' \ bchmod \ s+(-[^ \ s]* \ s+)*777 \ b ' , " world-writable permissions " ) ,
( r ' \ bchown \ s+(-[^ \ s]*)?R \ s+root ' , " recursive chown to root " ) ,
( r ' \ bmkfs \ b ' , " format filesystem " ) ,
( r ' \ bdd \ s+.*if= ' , " disk copy " ) ,
( r ' > \ s*/dev/sd ' , " write to block device " ) ,
( r ' \ bDROP \ s+(TABLE|DATABASE) \ b ' , " SQL DROP " ) ,
( r ' \ bDELETE \ s+FROM \ b(?!.* \ bWHERE \ b) ' , " SQL DELETE without WHERE " ) ,
( r ' \ bTRUNCATE \ s+(TABLE)? \ s* \ w ' , " SQL TRUNCATE " ) ,
( r ' > \ s*/etc/ ' , " overwrite system config " ) ,
( r ' \ bsystemctl \ s+(stop|disable|mask) \ b ' , " stop/disable system service " ) ,
( r ' \ bkill \ s+-9 \ s+-1 \ b ' , " kill all processes " ) ,
( r ' \ bpkill \ s+-9 \ b ' , " force kill processes " ) ,
( r ' :() \ s* { \ s*: \ s* \ | \ s*:& \ s*} \ s*;: ' , " fork bomb " ) ,
]
def _load_permanent_allowlist ( ) - > set :
""" Load permanently allowed command patterns from config. """
try :
from hermes_cli . config import load_config
config = load_config ( )
patterns = config . get ( " command_allowlist " , [ ] )
return set ( patterns ) if patterns else set ( )
except Exception :
return set ( )
def _save_permanent_allowlist ( patterns : set ) :
""" Save permanently allowed command patterns to config. """
try :
from hermes_cli . config import load_config , save_config
config = load_config ( )
config [ " command_allowlist " ] = list ( patterns )
save_config ( config )
except Exception as e :
print ( f " ⚠️ Could not save allowlist: { e } " )
def _detect_dangerous_command ( command : str ) - > tuple :
"""
Check if command matches any dangerous patterns .
Returns :
( is_dangerous , pattern_key , description ) or ( False , None , None )
"""
import re
command_lower = command . lower ( )
for pattern , description in DANGEROUS_PATTERNS :
if re . search ( pattern , command_lower , re . IGNORECASE ) :
# Use a simplified pattern key for caching (first word + key chars)
pattern_key = pattern . split ( r ' \ b ' ) [ 1 ] if r ' \ b ' in pattern else pattern [ : 20 ]
return ( True , pattern_key , description )
return ( False , None , None )
def _is_command_approved ( pattern_key : str ) - > bool :
""" Check if a pattern is approved (session or permanent). """
if pattern_key in _session_approved_patterns :
return True
permanent = _load_permanent_allowlist ( )
if pattern_key in permanent :
return True
return False
def _prompt_dangerous_approval ( command : str , description : str , timeout_seconds : int = 60 ) - > str :
"""
Prompt user to approve a dangerous command ( CLI only ) .
Returns : ' once ' , ' session ' , ' always ' , or ' deny '
"""
import sys
import threading
# Pause spinner if one is running
os . environ [ " HERMES_SPINNER_PAUSE " ] = " 1 "
try :
2026-02-02 23:46:41 -08:00
# Use simple ASCII art for compatibility (no ANSI color codes)
2026-02-02 23:35:18 -08:00
print ( )
2026-02-02 23:46:41 -08:00
print ( f " ⚠️ DANGEROUS COMMAND: { description } " )
print ( f " { command [ : 80 ] } { ' ... ' if len ( command ) > 80 else ' ' } " )
2026-02-02 23:35:18 -08:00
print ( )
2026-02-02 23:46:41 -08:00
print ( f " [o]nce | [s]ession | [a]lways | [d]eny " )
2026-02-02 23:35:18 -08:00
print ( )
sys . stdout . flush ( )
result = { " choice " : " " }
def get_input ( ) :
try :
result [ " choice " ] = input ( " Choice [o/s/a/D]: " ) . strip ( ) . lower ( )
except :
result [ " choice " ] = " "
thread = threading . Thread ( target = get_input , daemon = True )
thread . start ( )
thread . join ( timeout = timeout_seconds )
if thread . is_alive ( ) :
print ( " \n ⏱ Timeout - denying command " )
return " deny "
choice = result [ " choice " ]
if choice in ( ' o ' , ' once ' ) :
print ( " ✓ Allowed once " )
return " once "
elif choice in ( ' s ' , ' session ' ) :
print ( " ✓ Allowed for this session " )
return " session "
elif choice in ( ' a ' , ' always ' ) :
print ( " ✓ Added to permanent allowlist " )
return " always "
else :
print ( " ✗ Denied " )
return " deny "
except ( EOFError , KeyboardInterrupt ) :
print ( " \n ✗ Cancelled " )
return " deny "
finally :
if " HERMES_SPINNER_PAUSE " in os . environ :
del os . environ [ " HERMES_SPINNER_PAUSE " ]
print ( )
sys . stdout . flush ( )
def _check_dangerous_command ( command : str , env_type : str ) - > dict :
"""
Check if command is dangerous and handle approval .
Only applies to local / ssh backends in interactive contexts .
Args :
command : The command to check
env_type : The terminal backend type
Returns :
{ " approved " : True / False , " message " : str or None }
"""
# Skip check for isolated environments (containers are disposable)
if env_type in ( " docker " , " singularity " , " modal " ) :
return { " approved " : True , " message " : None }
# Detect dangerous command
is_dangerous , pattern_key , description = _detect_dangerous_command ( command )
if not is_dangerous :
return { " approved " : True , " message " : None }
# Check if already approved
if _is_command_approved ( pattern_key ) :
return { " approved " : True , " message " : None }
# Check context - only prompt in interactive modes
is_cli = os . getenv ( " HERMES_INTERACTIVE " )
is_gateway = os . getenv ( " HERMES_GATEWAY_SESSION " )
if not is_cli and not is_gateway :
# Programmatic use - allow (user opted into local backend)
return { " approved " : True , " message " : None }
if is_gateway :
# Messaging context - return informative denial, agent should ask user
return {
" approved " : False ,
" pattern_key " : pattern_key ,
2026-02-02 23:46:41 -08:00
" message " : f " BLOCKED: This command is potentially dangerous ( { description } ). Tell the user and ask if they want to add this command pattern to their allowlist. They can do this via ' hermes config edit ' or by running the command directly on their machine. "
2026-02-02 23:35:18 -08:00
}
# CLI context - prompt user
choice = _prompt_dangerous_approval ( command , description )
if choice == " deny " :
2026-02-02 23:46:41 -08:00
return { " approved " : False , " message " : " BLOCKED: User denied this potentially dangerous command. Do NOT retry this command - the user has explicitly rejected it. " }
2026-02-02 23:35:18 -08:00
# Handle approval
if choice == " session " :
_session_approved_patterns . add ( pattern_key )
elif choice == " always " :
_session_approved_patterns . add ( pattern_key )
permanent = _load_permanent_allowlist ( )
permanent . add ( pattern_key )
_save_permanent_allowlist ( permanent )
return { " approved " : True , " message " : None }
def _handle_sudo_failure ( output : str , env_type : str ) - > str :
"""
Check for sudo failure and add helpful message for messaging contexts .
Returns enhanced output if sudo failed in messaging context , else original .
"""
is_gateway = os . getenv ( " HERMES_GATEWAY_SESSION " )
if not is_gateway :
return output
# Check for sudo failure indicators
sudo_failures = [
" sudo: a password is required " ,
" sudo: no tty present " ,
" sudo: a terminal is required " ,
]
for failure in sudo_failures :
if failure in output :
return output + " \n \n 💡 Tip: To enable sudo over messaging, add SUDO_PASSWORD to ~/.hermes/.env on the agent machine. "
return output
2026-02-01 15:36:26 -08:00
def _prompt_for_sudo_password ( timeout_seconds : int = 45 ) - > str :
"""
Prompt user for sudo password with timeout .
Returns the password if entered , or empty string if :
- User presses Enter without input ( skip )
- Timeout expires ( 45 s default )
- Any error occurs
Only works in interactive mode ( HERMES_INTERACTIVE = 1 ) .
Uses getpass for hidden input with threading for timeout support .
"""
import getpass
import sys
import time as time_module
# ANSI escape codes for terminal control
CLEAR_LINE = " \033 [2K " # Clear entire line
CURSOR_START = " \r " # Move cursor to start of line
# Result container for thread
result = { " password " : None , " done " : False }
def get_password_thread ( ) :
""" Thread function to get password with getpass (hidden input). """
try :
result [ " password " ] = getpass . getpass ( " Password (hidden): " )
except ( EOFError , KeyboardInterrupt ) :
result [ " password " ] = " "
except Exception :
result [ " password " ] = " "
finally :
result [ " done " ] = True
try :
# Pause the spinner animation while prompting for password
os . environ [ " HERMES_SPINNER_PAUSE " ] = " 1 "
time_module . sleep ( 0.2 ) # Give spinner time to pause
# Clear any spinner/animation on current line
sys . stdout . write ( CURSOR_START + CLEAR_LINE )
sys . stdout . flush ( )
# Print a clear visual break with empty lines for separation
print ( " \n " ) # Extra spacing
print ( " ┌ " + " ─ " * 58 + " ┐ " )
print ( " │ 🔐 SUDO PASSWORD REQUIRED " + " " * 30 + " │ " )
print ( " ├ " + " ─ " * 58 + " ┤ " )
print ( " │ Enter password below (input is hidden), or: │ " )
print ( " │ • Press Enter to skip (command fails gracefully) │ " )
print ( f " │ • Wait { timeout_seconds } s to auto-skip " + " " * 27 + " │ " )
print ( " └ " + " ─ " * 58 + " ┘ " )
print ( )
sys . stdout . flush ( )
# Start password input in a thread so we can timeout
password_thread = threading . Thread ( target = get_password_thread , daemon = True )
password_thread . start ( )
# Wait for either completion or timeout
password_thread . join ( timeout = timeout_seconds )
if result [ " done " ] :
# Got input (or user pressed Enter/Ctrl+C)
password = result [ " password " ] or " "
if password :
print ( " ✓ Password received (cached for this session) " )
else :
print ( " ⏭ Skipped - continuing without sudo " )
print ( )
sys . stdout . flush ( )
return password
else :
# Timeout - thread is still waiting for input
print ( " \n ⏱ Timeout - continuing without sudo " )
print ( " (Press Enter to dismiss the password prompt) " )
print ( )
sys . stdout . flush ( )
return " "
except ( EOFError , KeyboardInterrupt ) :
print ( )
print ( " ⏭ Cancelled - continuing without sudo " )
print ( )
sys . stdout . flush ( )
return " "
except Exception as e :
print ( f " \n [sudo prompt error: { e } ] - continuing without sudo \n " )
sys . stdout . flush ( )
return " "
finally :
# Always resume the spinner when done
if " HERMES_SPINNER_PAUSE " in os . environ :
del os . environ [ " HERMES_SPINNER_PAUSE " ]
2026-02-01 10:02:34 -08:00
def _transform_sudo_command ( command : str ) - > str :
"""
Transform sudo commands to use - S flag if SUDO_PASSWORD is available .
This is a shared helper used by all execution environments to provide
consistent sudo handling across local , SSH , and container environments .
2026-02-01 15:36:26 -08:00
If SUDO_PASSWORD is set ( via env , config , or interactive prompt ) :
2026-02-01 10:02:34 -08:00
' sudo apt install curl ' - > password piped via sudo - S
2026-02-01 15:36:26 -08:00
If SUDO_PASSWORD is not set and in interactive mode ( HERMES_INTERACTIVE = 1 ) :
Prompts user for password with 45 s timeout , caches for session .
If SUDO_PASSWORD is not set and NOT interactive :
Command runs as - is ( fails gracefully with " sudo: a password is required " ) .
2026-02-01 10:02:34 -08:00
"""
2026-02-01 15:36:26 -08:00
global _cached_sudo_password
import re
# Check if command even contains sudo
if not re . search ( r ' \ bsudo \ b ' , command ) :
return command # No sudo in command, return as-is
# Try to get password from: env var -> session cache -> interactive prompt
sudo_password = os . getenv ( " SUDO_PASSWORD " , " " ) or _cached_sudo_password
2026-02-01 10:02:34 -08:00
if not sudo_password :
2026-02-01 15:36:26 -08:00
# No password configured - check if we're in interactive mode
if os . getenv ( " HERMES_INTERACTIVE " ) :
# Prompt user for password
sudo_password = _prompt_for_sudo_password ( timeout_seconds = 45 )
if sudo_password :
_cached_sudo_password = sudo_password # Cache for session
2026-02-01 10:02:34 -08:00
2026-02-01 15:36:26 -08:00
if not sudo_password :
return command # No password, let it fail gracefully
2026-02-01 10:02:34 -08:00
def replace_sudo ( match ) :
# Replace 'sudo' with password-piped version
# The -S flag makes sudo read password from stdin
# The -p '' suppresses the password prompt
return f " echo ' { sudo_password } ' | sudo -S -p ' ' "
# Match 'sudo' at word boundaries (not 'visudo' or 'sudoers')
# This handles: sudo, sudo -flag, etc.
return re . sub ( r ' \ bsudo \ b ' , replace_sudo , command )
class _LocalEnvironment :
"""
Local execution environment with sudo support and non - blocking stdin .
Features :
- Uses stdin = DEVNULL to prevent hanging on interactive prompts ( sudo , etc . )
- Optional SUDO_PASSWORD support : if set , transforms ` sudo ` commands to use ` sudo - S `
- Graceful failure : sudo commands fail fast with clear error if no password configured
Environment variables :
- SUDO_PASSWORD : If set , enables sudo commands by piping password via ` sudo - S `
"""
def __init__ ( self , cwd : str = " " , timeout : int = 60 , env : dict = None ) :
self . cwd = cwd or os . getcwd ( )
self . timeout = timeout
self . env = env or { }
def execute ( self , command : str , cwd : str = " " , * , timeout : int | None = None ) - > dict :
""" Execute a command locally with sudo support. """
work_dir = cwd or self . cwd or os . getcwd ( )
effective_timeout = timeout or self . timeout
# Transform sudo commands if SUDO_PASSWORD is available
exec_command = _transform_sudo_command ( command )
try :
result = subprocess . run (
exec_command ,
shell = True ,
text = True ,
cwd = work_dir ,
env = os . environ | self . env ,
timeout = effective_timeout ,
encoding = " utf-8 " ,
errors = " replace " ,
stdout = subprocess . PIPE ,
stderr = subprocess . STDOUT ,
stdin = subprocess . DEVNULL , # Prevent hanging on interactive prompts
)
return { " output " : result . stdout , " returncode " : result . returncode }
except subprocess . TimeoutExpired :
return { " output " : f " Command timed out after { effective_timeout } s " , " returncode " : 124 }
except Exception as e :
return { " output " : f " Execution error: { str ( e ) } " , " returncode " : 1 }
def cleanup ( self ) :
""" No cleanup needed for local environment. """
pass
def stop ( self ) :
""" Alias for cleanup. """
pass
2026-01-29 06:10:24 +00:00
class _SingularityEnvironment :
"""
2026-02-10 06:49:58 +00:00
Persistent Singularity / Apptainer container environment .
2026-01-29 06:10:24 +00:00
2026-02-10 06:49:58 +00:00
Uses ` apptainer instance ` to create a long - running container that persists
state ( files , installs , env changes ) across all commands within a task .
The model experiences this as a real Linux VM .
Features :
- Persistent filesystem : files created in one command are visible in the next
- Package installs persist : pip / apt installs survive across tool calls
- Full isolation : - - containall gives PID , IPC , and environment isolation
- Writable tmpfs overlay : full root filesystem is writable ( RAM - backed )
- Automatic SIF caching : docker : / / images converted to SIF once , reused forever
2026-01-29 06:10:24 +00:00
"""
2026-02-10 06:49:58 +00:00
def __init__ ( self , image : str , cwd : str = " /root " , timeout : int = 60 ) :
2026-01-29 06:10:24 +00:00
self . cwd = cwd
self . timeout = timeout
# Use apptainer if available, otherwise singularity
self . executable = " apptainer " if shutil . which ( " apptainer " ) else " singularity "
2026-01-29 22:47:11 +00:00
# Get or build SIF from docker:// URL (fast if already cached)
self . image = _get_or_build_sif ( image , self . executable )
2026-02-10 06:49:58 +00:00
# Create unique instance name (must be alphanumeric + underscores)
self . instance_id = f " hermes_ { uuid . uuid4 ( ) . hex [ : 12 ] } "
self . _instance_started = False
2026-01-29 06:10:24 +00:00
2026-02-10 06:49:58 +00:00
# Start the persistent instance
self . _start_instance ( )
def _start_instance ( self ) :
""" Start a persistent apptainer instance.
2026-01-29 06:10:24 +00:00
2026-02-10 06:49:58 +00:00
The instance runs as a background process . All subsequent execute ( ) calls
run commands inside this same instance , so state persists across calls .
"""
cmd = [
self . executable , " instance " , " start " ,
" --writable-tmpfs " , # RAM-backed writable overlay on read-only SIF
" --containall " , # Full isolation: PID, IPC, environment, filesystem
str ( self . image ) ,
self . instance_id ,
]
2026-01-29 06:10:24 +00:00
2026-02-10 06:49:58 +00:00
try :
result = subprocess . run (
cmd ,
capture_output = True ,
text = True ,
timeout = 120 , # 2 min for instance startup
)
if result . returncode != 0 :
raise RuntimeError ( f " Failed to start instance: { result . stderr } " )
self . _instance_started = True
print ( f " [Singularity] Instance { self . instance_id } started (persistent container) " , flush = True )
except subprocess . TimeoutExpired :
raise RuntimeError ( " Instance start timed out " )
2026-01-29 06:10:24 +00:00
def execute ( self , command : str , cwd : str = " " , * , timeout : int | None = None ) - > dict :
2026-02-10 06:49:58 +00:00
""" Execute a command in the persistent Singularity instance.
2026-01-29 06:10:24 +00:00
2026-02-10 06:49:58 +00:00
All commands run in the same container , so files , installs , and
environment changes persist between calls .
"""
if not self . _instance_started :
return { " output " : " Instance not started " , " returncode " : - 1 }
2026-01-29 06:10:24 +00:00
2026-02-10 06:49:58 +00:00
cmd = [ self . executable , " exec " ]
2026-01-29 06:10:24 +00:00
# Set working directory
work_dir = cwd or self . cwd
cmd . extend ( [ " --pwd " , work_dir ] )
2026-02-10 06:49:58 +00:00
# Connect to the running instance
cmd . append ( f " instance:// { self . instance_id } " )
2026-01-29 06:10:24 +00:00
2026-02-01 10:02:34 -08:00
# Transform sudo commands if SUDO_PASSWORD is available
exec_command = _transform_sudo_command ( command )
2026-01-29 06:10:24 +00:00
# Execute the command
2026-02-01 10:02:34 -08:00
cmd . extend ( [ " bash " , " -c " , exec_command ] )
2026-01-29 06:10:24 +00:00
try :
result = subprocess . run (
cmd ,
text = True ,
timeout = timeout or self . timeout ,
encoding = " utf-8 " ,
errors = " replace " ,
stdout = subprocess . PIPE ,
stderr = subprocess . STDOUT ,
2026-02-01 10:02:34 -08:00
stdin = subprocess . DEVNULL , # Prevent hanging on interactive prompts
2026-01-29 06:10:24 +00:00
)
return { " output " : result . stdout , " returncode " : result . returncode }
except subprocess . TimeoutExpired :
return { " output " : f " Command timed out after { timeout or self . timeout } s " , " returncode " : 124 }
def cleanup ( self ) :
2026-02-10 06:49:58 +00:00
""" Stop the persistent instance and clean up. """
if self . _instance_started :
try :
subprocess . run (
[ self . executable , " instance " , " stop " , self . instance_id ] ,
capture_output = True ,
text = True ,
timeout = 30 ,
)
print ( f " [Singularity] Instance { self . instance_id } stopped " , flush = True )
except Exception as e :
print ( f " [Singularity] Warning: failed to stop instance { self . instance_id } : { e } " , flush = True )
self . _instance_started = False
2026-01-29 06:10:24 +00:00
def stop ( self ) :
""" Alias for cleanup. """
self . cleanup ( )
def __del__ ( self ) :
""" Cleanup on destruction. """
2026-02-10 06:49:58 +00:00
try :
self . cleanup ( )
except :
pass
2026-01-29 06:10:24 +00:00
2026-01-31 06:30:48 +00:00
class _SSHEnvironment :
"""
SSH - based remote execution environment .
Runs commands on a remote machine over SSH , keeping the agent code
completely isolated from the execution environment . Uses SSH ControlMaster
for connection persistence ( faster subsequent commands ) .
Security benefits :
- Agent cannot modify its own code
- Remote machine acts as a sandbox
- Clear separation between agent and execution environment
"""
def __init__ ( self , host : str , user : str , cwd : str = " /tmp " , timeout : int = 60 ,
port : int = 22 , key_path : str = " " ) :
self . host = host
self . user = user
self . cwd = cwd
self . timeout = timeout
self . port = port
self . key_path = key_path
# Create control socket directory for connection persistence
self . control_dir = Path ( tempfile . gettempdir ( ) ) / " hermes-ssh "
self . control_dir . mkdir ( parents = True , exist_ok = True )
self . control_socket = self . control_dir / f " { user } @ { host } : { port } .sock "
# Test connection and establish ControlMaster
self . _establish_connection ( )
def _build_ssh_command ( self , extra_args : list = None ) - > list :
""" Build base SSH command with connection options. """
cmd = [ " ssh " ]
# Connection multiplexing for performance
cmd . extend ( [ " -o " , f " ControlPath= { self . control_socket } " ] )
cmd . extend ( [ " -o " , " ControlMaster=auto " ] )
cmd . extend ( [ " -o " , " ControlPersist=300 " ] ) # Keep connection alive for 5 min
# Standard options
cmd . extend ( [ " -o " , " BatchMode=yes " ] ) # No password prompts
cmd . extend ( [ " -o " , " StrictHostKeyChecking=accept-new " ] ) # Accept new hosts
cmd . extend ( [ " -o " , " ConnectTimeout=10 " ] )
# Port
if self . port != 22 :
cmd . extend ( [ " -p " , str ( self . port ) ] )
# Private key
if self . key_path :
cmd . extend ( [ " -i " , self . key_path ] )
# Extra args (like -t for TTY)
if extra_args :
cmd . extend ( extra_args )
# Target
cmd . append ( f " { self . user } @ { self . host } " )
return cmd
def _establish_connection ( self ) :
""" Test SSH connection and establish ControlMaster. """
cmd = self . _build_ssh_command ( )
cmd . append ( " echo ' SSH connection established ' " )
try :
result = subprocess . run (
cmd ,
capture_output = True ,
text = True ,
timeout = 15
)
if result . returncode != 0 :
error_msg = result . stderr . strip ( ) or result . stdout . strip ( )
raise RuntimeError ( f " SSH connection failed: { error_msg } " )
except subprocess . TimeoutExpired :
raise RuntimeError ( f " SSH connection to { self . user } @ { self . host } timed out " )
def execute ( self , command : str , cwd : str = " " , * , timeout : int | None = None ) - > dict :
""" Execute a command on the remote host via SSH. """
work_dir = cwd or self . cwd
effective_timeout = timeout or self . timeout
2026-02-01 10:02:34 -08:00
# Transform sudo commands if SUDO_PASSWORD is available
exec_command = _transform_sudo_command ( command )
2026-01-31 06:30:48 +00:00
# Wrap command to run in the correct directory
# Use bash -c to handle complex commands properly
2026-02-01 10:02:34 -08:00
wrapped_command = f ' cd { work_dir } && { exec_command } '
2026-01-31 06:30:48 +00:00
cmd = self . _build_ssh_command ( )
cmd . extend ( [ " bash " , " -c " , wrapped_command ] )
try :
result = subprocess . run (
cmd ,
text = True ,
timeout = effective_timeout ,
encoding = " utf-8 " ,
errors = " replace " ,
stdout = subprocess . PIPE ,
stderr = subprocess . STDOUT ,
2026-02-01 10:02:34 -08:00
stdin = subprocess . DEVNULL , # Prevent hanging on interactive prompts
2026-01-31 06:30:48 +00:00
)
return { " output " : result . stdout , " returncode " : result . returncode }
except subprocess . TimeoutExpired :
return { " output " : f " Command timed out after { effective_timeout } s " , " returncode " : 124 }
except Exception as e :
return { " output " : f " SSH execution error: { str ( e ) } " , " returncode " : 1 }
def cleanup ( self ) :
""" Close the SSH ControlMaster connection. """
if self . control_socket . exists ( ) :
try :
# Send exit command to ControlMaster
cmd = [ " ssh " , " -o " , f " ControlPath= { self . control_socket } " , " -O " , " exit " ,
f " { self . user } @ { self . host } " ]
subprocess . run ( cmd , capture_output = True , timeout = 5 )
except :
pass
# Remove socket file
try :
self . control_socket . unlink ( )
except :
pass
def stop ( self ) :
""" Alias for cleanup. """
self . cleanup ( )
def __del__ ( self ) :
""" Cleanup on destruction. """
try :
self . cleanup ( )
except :
pass
2026-02-01 10:02:34 -08:00
class _DockerEnvironment :
"""
Docker execution environment wrapper with sudo support and non - blocking stdin .
Wraps mini - swe - agent ' s DockerEnvironment but adds:
- stdin = DEVNULL to prevent hanging on interactive prompts
- SUDO_PASSWORD support via _transform_sudo_command
"""
def __init__ ( self , image : str , cwd : str = " / " , timeout : int = 60 ) :
from minisweagent . environments . docker import DockerEnvironment
self . _inner = DockerEnvironment ( image = image , cwd = cwd , timeout = timeout )
self . cwd = cwd
self . timeout = timeout
def execute ( self , command : str , cwd : str = " " , * , timeout : int | None = None ) - > dict :
""" Execute a command in the Docker container with sudo support. """
# Transform sudo commands if SUDO_PASSWORD is available
exec_command = _transform_sudo_command ( command )
work_dir = cwd or self . cwd
effective_timeout = timeout or self . timeout
# Get container_id from inner environment
assert self . _inner . container_id , " Container not started "
cmd = [ self . _inner . config . executable , " exec " , " -w " , work_dir ]
for key in self . _inner . config . forward_env :
if ( value := os . getenv ( key ) ) is not None :
cmd . extend ( [ " -e " , f " { key } = { value } " ] )
for key , value in self . _inner . config . env . items ( ) :
cmd . extend ( [ " -e " , f " { key } = { value } " ] )
cmd . extend ( [ self . _inner . container_id , " bash " , " -lc " , exec_command ] )
try :
result = subprocess . run (
cmd ,
text = True ,
timeout = effective_timeout ,
encoding = " utf-8 " ,
errors = " replace " ,
stdout = subprocess . PIPE ,
stderr = subprocess . STDOUT ,
stdin = subprocess . DEVNULL , # Prevent hanging on interactive prompts
)
return { " output " : result . stdout , " returncode " : result . returncode }
except subprocess . TimeoutExpired :
return { " output " : f " Command timed out after { effective_timeout } s " , " returncode " : 124 }
def cleanup ( self ) :
""" Cleanup the Docker container. """
self . _inner . cleanup ( )
def stop ( self ) :
""" Alias for cleanup. """
self . cleanup ( )
def __del__ ( self ) :
""" Cleanup on destruction. """
try :
self . cleanup ( )
except :
pass
class _ModalEnvironment :
"""
Modal cloud execution environment wrapper with sudo support .
Wraps mini - swe - agent ' s SwerexModalEnvironment but adds:
- SUDO_PASSWORD support via _transform_sudo_command
2026-02-10 19:39:05 +00:00
- Automatic async - safety patches ( applied once , before first use )
2026-02-01 10:02:34 -08:00
2026-02-10 19:39:05 +00:00
The patches replace SwerexModalEnvironment ' s asyncio.run() calls with a
background thread approach , making it safe to use inside any event loop
( e . g . , Atropos ) . Applied here at the point of use rather than relying on
import - time side effects , so ALL callers get the fix automatically .
2026-02-01 10:02:34 -08:00
"""
2026-02-10 19:39:05 +00:00
# Class-level flag: patches only need to be applied once
_patches_applied = False
2026-02-08 12:56:40 -08:00
def __init__ ( self , image : str , cwd : str = " /root " , timeout : int = 60 ) :
2026-02-10 19:39:05 +00:00
# Ensure async-safety patches are applied before creating any
# SwerexModalEnvironment instance. This is the single authoritative
# place -- no other module needs to call apply_patches() for Modal.
if not _ModalEnvironment . _patches_applied :
try :
from environments . patches import apply_patches
apply_patches ( )
except ImportError :
pass # patches module not available (standalone use)
_ModalEnvironment . _patches_applied = True
2026-02-01 10:02:34 -08:00
from minisweagent . environments . extra . swerex_modal import SwerexModalEnvironment
2026-02-10 19:39:05 +00:00
# Generous startup timeout: sandbox creation can take 30-60s for cold images,
# and the SWE-ReX runtime needs another 10-30s to boot inside it.
self . _inner = SwerexModalEnvironment (
image = image , cwd = cwd , timeout = timeout ,
startup_timeout = 180.0 ,
runtime_timeout = 3600.0 ,
)
2026-02-01 10:02:34 -08:00
self . cwd = cwd
self . timeout = timeout
def execute ( self , command : str , cwd : str = " " , * , timeout : int | None = None ) - > dict :
""" Execute a command in Modal with sudo support. """
# Transform sudo commands if SUDO_PASSWORD is available
exec_command = _transform_sudo_command ( command )
# Delegate to inner environment with transformed command
return self . _inner . execute ( exec_command , cwd = cwd , timeout = timeout )
def cleanup ( self ) :
""" Cleanup the Modal deployment. """
if hasattr ( self . _inner , ' stop ' ) :
self . _inner . stop ( )
def stop ( self ) :
""" Stop the Modal deployment. """
self . cleanup ( )
def __del__ ( self ) :
""" Cleanup on destruction. """
try :
self . cleanup ( )
except :
pass
2026-01-23 12:26:53 +00:00
# Tool description for LLM
TERMINAL_TOOL_DESCRIPTION = """ Execute commands on a secure Linux environment.
2025-07-26 04:43:07 +00:00
2026-01-23 12:26:53 +00:00
* * Environment : * *
- Isolated execution environment ( local , Docker , or Modal cloud based on configuration )
- Filesystem persists between tool calls within the same task
- Internet access available
2025-07-26 04:43:07 +00:00
* * Command Execution : * *
- Simple commands : Just provide the ' command ' parameter
- Background processes : Set ' background ' : True for servers / long - running tasks
2026-01-23 12:26:53 +00:00
- Command timeout : Optional ' timeout ' parameter in seconds
2025-07-26 04:43:07 +00:00
* * Examples : * *
2026-01-23 12:26:53 +00:00
- Run command : ` { " command " : " ls -la " } `
- Background task : ` { " command " : " source venv/bin/activate && python server.py " , " background " : True } `
- With timeout : ` { " command " : " long_task.sh " , " timeout " : 300 } `
2025-07-26 04:43:07 +00:00
* * Best Practices : * *
2026-01-23 12:26:53 +00:00
- Run servers / long processes in background
- Monitor disk usage for large tasks
- Install whatever tools you need with apt - get or pip
2026-02-10 19:39:05 +00:00
- Try to create or use a venv with uv or python - m venv to keep isolation from global system packages .
2026-01-23 12:26:53 +00:00
* * Things to avoid : * *
- Do NOT use interactive tools such as tmux , vim , nano , python repl - you will get stuck .
- Even git sometimes becomes interactive if the output is large . If you ' re not sure, pipe to cat.
"""
# Global state for environment lifecycle management
_active_environments : Dict [ str , Any ] = { }
2026-01-29 06:10:24 +00:00
_task_workdirs : Dict [ str , str ] = { } # Maps task_id to working directory
2026-01-23 12:26:53 +00:00
_last_activity : Dict [ str , float ] = { }
_env_lock = threading . Lock ( )
2025-11-04 03:32:43 -05:00
_cleanup_thread = None
_cleanup_running = False
2026-02-10 06:49:58 +00:00
# Per-task environment overrides registry.
# Allows environments (e.g., TerminalBench2Env) to specify a custom Docker/Modal
# image for a specific task_id BEFORE the agent loop starts. When the terminal or
# file tools create a new sandbox for that task_id, they check this registry first
# and fall back to the TERMINAL_MODAL_IMAGE (etc.) env var if no override is set.
#
# This is never exposed to the model -- only infrastructure code calls it.
# Thread-safe because each task_id is unique per rollout.
_task_env_overrides : Dict [ str , Dict [ str , Any ] ] = { }
def register_task_env_overrides ( task_id : str , overrides : Dict [ str , Any ] ) :
"""
Register environment overrides for a specific task / rollout .
Called by Atropos environments before the agent loop to configure
per - task sandbox settings ( e . g . , a custom Dockerfile for the Modal image ) .
Supported override keys :
- modal_image : str - - Path to Dockerfile or Docker Hub image name
- docker_image : str - - Docker image name
- cwd : str - - Working directory inside the sandbox
Args :
task_id : The rollout ' s unique task identifier
overrides : Dict of config keys to override
"""
_task_env_overrides [ task_id ] = overrides
def clear_task_env_overrides ( task_id : str ) :
"""
Clear environment overrides for a task after rollout completes .
Called during cleanup to avoid stale entries accumulating .
"""
_task_env_overrides . pop ( task_id , None )
2026-01-23 12:26:53 +00:00
# Configuration from environment variables
def _get_env_config ( ) - > Dict [ str , Any ] :
""" Get terminal environment configuration from environment variables. """
2026-02-02 19:13:41 -08:00
# Default image with Python and Node.js for maximum compatibility
default_image = " nikolaik/python-nodejs:python3.11-nodejs20 "
2026-02-08 12:56:40 -08:00
env_type = os . getenv ( " TERMINAL_ENV " , " local " )
# Default cwd depends on backend:
# - local/ssh: current working directory (CLI resolves "." before we get here)
# - docker/singularity: /tmp inside the container (singularity bind-mounts /scratch there)
# - modal: /root (ephemeral cloud container, full filesystem access)
2026-02-10 06:49:58 +00:00
if env_type in ( " modal " , " singularity " ) :
2026-02-08 12:56:40 -08:00
default_cwd = " /root "
2026-02-10 06:49:58 +00:00
elif env_type == " docker " :
default_cwd = " / "
2026-02-08 12:56:40 -08:00
else :
default_cwd = os . getcwd ( )
2026-01-23 12:26:53 +00:00
return {
2026-02-08 12:56:40 -08:00
" env_type " : env_type ,
2026-02-02 19:13:41 -08:00
" docker_image " : os . getenv ( " TERMINAL_DOCKER_IMAGE " , default_image ) ,
" singularity_image " : os . getenv ( " TERMINAL_SINGULARITY_IMAGE " , f " docker:// { default_image } " ) ,
" modal_image " : os . getenv ( " TERMINAL_MODAL_IMAGE " , default_image ) ,
2026-02-08 12:56:40 -08:00
" cwd " : os . getenv ( " TERMINAL_CWD " , default_cwd ) ,
2026-01-23 12:26:53 +00:00
" timeout " : int ( os . getenv ( " TERMINAL_TIMEOUT " , " 60 " ) ) ,
" lifetime_seconds " : int ( os . getenv ( " TERMINAL_LIFETIME_SECONDS " , " 300 " ) ) ,
2026-01-31 06:30:48 +00:00
# SSH-specific config
" ssh_host " : os . getenv ( " TERMINAL_SSH_HOST " , " " ) ,
" ssh_user " : os . getenv ( " TERMINAL_SSH_USER " , " " ) ,
" ssh_port " : int ( os . getenv ( " TERMINAL_SSH_PORT " , " 22 " ) ) ,
" ssh_key " : os . getenv ( " TERMINAL_SSH_KEY " , " " ) , # Path to private key (optional, uses ssh-agent if empty)
2026-01-23 12:26:53 +00:00
}
2025-11-04 03:32:43 -05:00
2026-01-23 12:26:53 +00:00
2026-01-31 06:30:48 +00:00
def _create_environment ( env_type : str , image : str , cwd : str , timeout : int , ssh_config : dict = None ) :
2026-01-23 12:26:53 +00:00
"""
Create an execution environment from mini - swe - agent .
2025-11-04 03:32:43 -05:00
Args :
2026-01-31 06:30:48 +00:00
env_type : One of " local " , " docker " , " singularity " , " modal " , " ssh "
image : Docker / Singularity / Modal image name ( ignored for local / ssh )
2026-01-23 12:26:53 +00:00
cwd : Working directory
timeout : Default command timeout
2026-01-31 06:30:48 +00:00
ssh_config : SSH connection config ( for env_type = " ssh " )
2026-01-23 12:26:53 +00:00
Returns :
Environment instance with execute ( ) method
2025-11-04 03:32:43 -05:00
"""
2026-01-23 12:26:53 +00:00
if env_type == " local " :
2026-02-01 10:02:34 -08:00
# Use our custom LocalEnvironment with sudo support and non-blocking stdin
return _LocalEnvironment ( cwd = cwd , timeout = timeout )
2026-01-23 12:26:53 +00:00
elif env_type == " docker " :
2026-02-01 10:02:34 -08:00
# Use custom Docker wrapper with sudo support and non-blocking stdin
return _DockerEnvironment ( image = image , cwd = cwd , timeout = timeout )
2026-01-23 12:26:53 +00:00
2026-01-29 06:10:24 +00:00
elif env_type == " singularity " :
# Use custom Singularity environment with better space management
return _SingularityEnvironment ( image = image , cwd = cwd , timeout = timeout )
2026-01-23 12:26:53 +00:00
elif env_type == " modal " :
2026-02-01 10:02:34 -08:00
# Use custom Modal wrapper with sudo support
return _ModalEnvironment ( image = image , cwd = cwd , timeout = timeout )
2026-01-23 12:26:53 +00:00
2026-01-31 06:30:48 +00:00
elif env_type == " ssh " :
if not ssh_config or not ssh_config . get ( " host " ) or not ssh_config . get ( " user " ) :
raise ValueError ( " SSH environment requires ssh_host and ssh_user to be configured " )
return _SSHEnvironment (
host = ssh_config [ " host " ] ,
user = ssh_config [ " user " ] ,
port = ssh_config . get ( " port " , 22 ) ,
key_path = ssh_config . get ( " key " , " " ) ,
cwd = cwd ,
timeout = timeout
)
2026-01-23 12:26:53 +00:00
else :
2026-01-31 06:30:48 +00:00
raise ValueError ( f " Unknown environment type: { env_type } . Use ' local ' , ' docker ' , ' singularity ' , ' modal ' , or ' ssh ' " )
2026-01-23 12:26:53 +00:00
def _cleanup_inactive_envs ( lifetime_seconds : int = 300 ) :
""" Clean up environments that have been inactive for longer than lifetime_seconds. """
global _active_environments , _last_activity
2025-11-04 03:32:43 -05:00
current_time = time . time ( )
tasks_to_cleanup = [ ]
2026-01-23 12:26:53 +00:00
with _env_lock :
2025-11-04 03:32:43 -05:00
for task_id , last_time in list ( _last_activity . items ( ) ) :
2026-01-23 12:26:53 +00:00
if current_time - last_time > lifetime_seconds :
2025-11-04 03:32:43 -05:00
tasks_to_cleanup . append ( task_id )
for task_id in tasks_to_cleanup :
try :
2026-01-23 12:26:53 +00:00
if task_id in _active_environments :
env = _active_environments [ task_id ]
# Try various cleanup methods
if hasattr ( env , ' cleanup ' ) :
env . cleanup ( )
elif hasattr ( env , ' stop ' ) :
env . stop ( )
elif hasattr ( env , ' terminate ' ) :
env . terminate ( )
del _active_environments [ task_id ]
2026-01-31 06:30:48 +00:00
if not os . getenv ( " HERMES_QUIET " ) :
print ( f " [Terminal Cleanup] Cleaned up inactive environment for task: { task_id } " )
2025-11-04 03:32:43 -05:00
if task_id in _last_activity :
del _last_activity [ task_id ]
2026-01-29 06:10:24 +00:00
if task_id in _task_workdirs :
del _task_workdirs [ task_id ]
2025-11-04 03:32:43 -05:00
except Exception as e :
2026-01-23 12:26:53 +00:00
error_str = str ( e )
2026-01-31 06:30:48 +00:00
if not os . getenv ( " HERMES_QUIET " ) :
if " 404 " in error_str or " not found " in error_str . lower ( ) :
print ( f " [Terminal Cleanup] Environment for task { task_id } already cleaned up " )
else :
print ( f " [Terminal Cleanup] Error cleaning up environment for task { task_id } : { e } " )
2026-01-23 12:26:53 +00:00
# Always remove from tracking dicts
if task_id in _active_environments :
del _active_environments [ task_id ]
if task_id in _last_activity :
del _last_activity [ task_id ]
2026-01-29 06:10:24 +00:00
if task_id in _task_workdirs :
del _task_workdirs [ task_id ]
2026-01-23 12:26:53 +00:00
2025-11-04 03:32:43 -05:00
def _cleanup_thread_worker ( ) :
2026-01-23 12:26:53 +00:00
""" Background thread worker that periodically cleans up inactive environments. """
2025-11-04 03:32:43 -05:00
global _cleanup_running
while _cleanup_running :
try :
2026-01-23 12:26:53 +00:00
config = _get_env_config ( )
_cleanup_inactive_envs ( config [ " lifetime_seconds " ] )
2025-11-04 03:32:43 -05:00
except Exception as e :
2026-01-31 06:30:48 +00:00
if not os . getenv ( " HERMES_QUIET " ) :
print ( f " [Terminal Cleanup] Error in cleanup thread: { e } " )
2025-11-04 03:32:43 -05:00
for _ in range ( 60 ) :
if not _cleanup_running :
break
time . sleep ( 1 )
2026-01-23 12:26:53 +00:00
2025-11-04 03:32:43 -05:00
def _start_cleanup_thread ( ) :
2026-01-23 12:26:53 +00:00
""" Start the background cleanup thread if not already running. """
2025-11-04 03:32:43 -05:00
global _cleanup_thread , _cleanup_running
2026-01-23 12:26:53 +00:00
with _env_lock :
2025-11-04 03:32:43 -05:00
if _cleanup_thread is None or not _cleanup_thread . is_alive ( ) :
_cleanup_running = True
_cleanup_thread = threading . Thread ( target = _cleanup_thread_worker , daemon = True )
_cleanup_thread . start ( )
2026-01-23 12:26:53 +00:00
2025-11-04 03:32:43 -05:00
def _stop_cleanup_thread ( ) :
2026-01-23 12:26:53 +00:00
""" Stop the background cleanup thread. """
2025-11-04 03:32:43 -05:00
global _cleanup_running
_cleanup_running = False
if _cleanup_thread is not None :
_cleanup_thread . join ( timeout = 5 )
2026-01-29 06:10:24 +00:00
def get_active_environments_info ( ) - > Dict [ str , Any ] :
""" Get information about currently active environments. """
info = {
" count " : len ( _active_environments ) ,
" task_ids " : list ( _active_environments . keys ( ) ) ,
" workdirs " : dict ( _task_workdirs ) ,
}
# Calculate total disk usage
total_size = 0
for task_id in _active_environments . keys ( ) :
# Check sandbox and workdir sizes
scratch_dir = _get_scratch_dir ( )
for pattern in [ f " hermes-* { task_id [ : 8 ] } * " ] :
import glob
for path in glob . glob ( str ( scratch_dir / " hermes-* " ) ) :
try :
size = sum ( f . stat ( ) . st_size for f in Path ( path ) . rglob ( ' * ' ) if f . is_file ( ) )
total_size + = size
except :
pass
info [ " total_disk_usage_mb " ] = round ( total_size / ( 1024 * 1024 ) , 2 )
return info
def cleanup_all_environments ( ) :
""" Clean up ALL active environments. Use with caution. """
global _active_environments , _last_activity , _task_workdirs
task_ids = list ( _active_environments . keys ( ) )
cleaned = 0
for task_id in task_ids :
try :
cleanup_vm ( task_id )
cleaned + = 1
except Exception as e :
print ( f " [Terminal Cleanup] Error cleaning { task_id } : { e } " )
# Also clean any orphaned directories
scratch_dir = _get_scratch_dir ( )
import glob
for path in glob . glob ( str ( scratch_dir / " hermes-* " ) ) :
try :
shutil . rmtree ( path , ignore_errors = True )
print ( f " [Terminal Cleanup] Removed orphaned: { path } " )
except :
pass
print ( f " [Terminal Cleanup] Cleaned { cleaned } environments " )
return cleaned
2026-01-23 12:26:53 +00:00
def cleanup_vm ( task_id : str ) :
""" Manually clean up a specific environment by task_id. """
2026-01-29 06:10:24 +00:00
global _active_environments , _last_activity , _task_workdirs
2025-11-04 03:32:43 -05:00
2026-01-23 12:26:53 +00:00
with _env_lock :
2025-11-04 03:32:43 -05:00
try :
2026-01-23 12:26:53 +00:00
if task_id in _active_environments :
env = _active_environments [ task_id ]
if hasattr ( env , ' cleanup ' ) :
env . cleanup ( )
elif hasattr ( env , ' stop ' ) :
env . stop ( )
elif hasattr ( env , ' terminate ' ) :
env . terminate ( )
del _active_environments [ task_id ]
2026-01-31 06:30:48 +00:00
if not os . getenv ( " HERMES_QUIET " ) :
print ( f " [Terminal Cleanup] Manually cleaned up environment for task: { task_id } " )
2025-11-04 03:32:43 -05:00
2026-01-29 06:10:24 +00:00
if task_id in _task_workdirs :
del _task_workdirs [ task_id ]
2025-11-04 03:32:43 -05:00
if task_id in _last_activity :
del _last_activity [ task_id ]
except Exception as e :
2026-01-31 06:30:48 +00:00
if not os . getenv ( " HERMES_QUIET " ) :
error_str = str ( e )
if " 404 " in error_str or " not found " in error_str . lower ( ) :
print ( f " [Terminal Cleanup] Environment for task { task_id } already cleaned up " )
else :
print ( f " [Terminal Cleanup] Error cleaning up environment for task { task_id } : { e } " )
2026-01-23 12:26:53 +00:00
2025-11-04 03:32:43 -05:00
2026-02-10 22:53:44 +00:00
def _atexit_cleanup ( ) :
""" Stop cleanup thread and shut down all remaining sandboxes on exit. """
_stop_cleanup_thread ( )
if _active_environments :
count = len ( _active_environments )
print ( f " \n [Terminal Cleanup] Shutting down { count } remaining sandbox(es)... " )
cleanup_all_environments ( )
atexit . register ( _atexit_cleanup )
2025-11-02 08:52:05 +08:00
2026-01-23 12:26:53 +00:00
2025-07-26 04:31:17 +00:00
def terminal_tool (
2026-01-23 12:26:53 +00:00
command : str ,
2025-07-25 15:15:36 +00:00
background : bool = False ,
2025-11-03 17:42:23 -05:00
timeout : Optional [ int ] = None ,
2026-02-02 23:35:18 -08:00
task_id : Optional [ str ] = None ,
force : bool = False
2025-07-25 15:15:36 +00:00
) - > str :
"""
2026-01-23 12:26:53 +00:00
Execute a command using mini - swe - agent ' s execution environments.
2025-11-03 17:42:23 -05:00
2025-07-25 15:15:36 +00:00
Args :
2026-01-23 12:26:53 +00:00
command : The command to execute
background : Whether to run in background ( default : False )
timeout : Command timeout in seconds ( default : from config )
task_id : Unique identifier for environment isolation ( optional )
2026-02-02 23:35:18 -08:00
force : If True , skip dangerous command check ( use after user confirms )
2025-11-03 17:42:23 -05:00
2025-07-25 15:15:36 +00:00
Returns :
2026-01-23 12:26:53 +00:00
str : JSON string with output , exit_code , and error fields
2025-07-26 04:31:17 +00:00
Examples :
# Execute a simple command
>> > result = terminal_tool ( command = " ls -la /tmp " )
2026-01-23 12:26:53 +00:00
2025-07-26 04:31:17 +00:00
# Run a background task
2026-01-23 12:26:53 +00:00
>> > result = terminal_tool ( command = " python server.py " , background = True )
# With custom timeout
>> > result = terminal_tool ( command = " long_task.sh " , timeout = 300 )
2026-02-02 23:35:18 -08:00
# Force run after user confirmation
2026-02-02 23:46:41 -08:00
# Note: force parameter is internal only, not exposed to model API
2025-07-25 15:15:36 +00:00
"""
2026-01-23 12:26:53 +00:00
global _active_environments , _last_activity
2025-11-02 08:52:05 +08:00
2025-07-25 15:15:36 +00:00
try :
2026-01-23 12:26:53 +00:00
# Get configuration
config = _get_env_config ( )
env_type = config [ " env_type " ]
2026-02-10 06:49:58 +00:00
# Use task_id for environment isolation
effective_task_id = task_id or " default "
# Check per-task overrides (set by environments like TerminalBench2Env)
# before falling back to global env var config
overrides = _task_env_overrides . get ( effective_task_id , { } )
2026-01-23 12:26:53 +00:00
2026-02-10 06:49:58 +00:00
# Select image based on env type, with per-task override support
2026-01-23 12:26:53 +00:00
if env_type == " docker " :
2026-02-10 06:49:58 +00:00
image = overrides . get ( " docker_image " ) or config [ " docker_image " ]
2026-01-29 06:10:24 +00:00
elif env_type == " singularity " :
2026-02-10 06:49:58 +00:00
image = overrides . get ( " singularity_image " ) or config [ " singularity_image " ]
2026-01-23 12:26:53 +00:00
elif env_type == " modal " :
2026-02-10 06:49:58 +00:00
image = overrides . get ( " modal_image " ) or config [ " modal_image " ]
2026-01-23 12:26:53 +00:00
else :
image = " "
2026-02-10 06:49:58 +00:00
cwd = overrides . get ( " cwd " ) or config [ " cwd " ]
2026-01-23 12:26:53 +00:00
default_timeout = config [ " timeout " ]
effective_timeout = timeout or default_timeout
2025-11-02 08:52:05 +08:00
2026-01-31 06:30:48 +00:00
# For local environment in batch mode, create a unique subdirectory per task
2026-01-29 06:10:24 +00:00
# This prevents parallel tasks from overwriting each other's files
2026-01-31 06:30:48 +00:00
# In CLI mode (HERMES_QUIET), use the cwd directly without subdirectories
if env_type == " local " and not os . getenv ( " HERMES_QUIET " ) :
2026-01-29 06:10:24 +00:00
import uuid
with _env_lock :
if effective_task_id not in _task_workdirs :
task_workdir = Path ( cwd ) / f " hermes- { effective_task_id } - { uuid . uuid4 ( ) . hex [ : 8 ] } "
task_workdir . mkdir ( parents = True , exist_ok = True )
_task_workdirs [ effective_task_id ] = str ( task_workdir )
cwd = _task_workdirs [ effective_task_id ]
2026-01-23 12:26:53 +00:00
# Start cleanup thread
2025-11-04 03:32:43 -05:00
_start_cleanup_thread ( )
2026-01-23 12:26:53 +00:00
# Get or create environment
2026-02-08 05:00:47 +00:00
# Check under lock, but create OUTSIDE lock so we don't block
# other concurrent rollouts during slow Modal/Docker startup
needs_creation = False
2026-01-23 12:26:53 +00:00
with _env_lock :
if effective_task_id not in _active_environments :
2026-02-08 05:00:47 +00:00
needs_creation = True
else :
_last_activity [ effective_task_id ] = time . time ( )
env = _active_environments [ effective_task_id ]
2026-01-23 12:26:53 +00:00
2026-02-08 05:00:47 +00:00
if needs_creation :
2026-02-10 19:39:05 +00:00
# Disk usage warning only relevant for local/singularity backends
if env_type in ( " singularity " , " local " ) :
_check_disk_usage_warning ( )
2026-02-08 05:00:47 +00:00
if not os . getenv ( " HERMES_QUIET " ) :
print ( f " [Terminal] Creating new { env_type } environment for task { effective_task_id [ : 8 ] } ... " , flush = True )
try :
ssh_config = None
if env_type == " ssh " :
ssh_config = {
" host " : config . get ( " ssh_host " , " " ) ,
" user " : config . get ( " ssh_user " , " " ) ,
" port " : config . get ( " ssh_port " , 22 ) ,
" key " : config . get ( " ssh_key " , " " ) ,
}
new_env = _create_environment (
env_type = env_type ,
image = image ,
cwd = cwd ,
timeout = effective_timeout ,
ssh_config = ssh_config
)
except ImportError as e :
return json . dumps ( {
" output " : " " ,
" exit_code " : - 1 ,
" error " : f " Terminal tool disabled: mini-swe-agent not available ( { e } ) " ,
" status " : " disabled "
} , ensure_ascii = False )
# Store under lock (brief)
with _env_lock :
if effective_task_id not in _active_environments :
_active_environments [ effective_task_id ] = new_env
else :
# Another thread created it while we were building -- clean up ours
try :
if hasattr ( new_env , ' stop ' ) :
new_env . stop ( )
except Exception :
pass
_last_activity [ effective_task_id ] = time . time ( )
env = _active_environments [ effective_task_id ]
if not os . getenv ( " HERMES_QUIET " ) :
print ( f " [Terminal] { env_type } environment ready for task { effective_task_id [ : 8 ] } " , flush = True )
2025-11-02 08:52:05 +08:00
2026-02-02 23:35:18 -08:00
# Check for dangerous commands (only for local/ssh in interactive modes)
# Skip check if force=True (user has confirmed they want to run it)
if not force :
approval = _check_dangerous_command ( command , env_type )
if not approval [ " approved " ] :
# Command was blocked - return informative message
return json . dumps ( {
" output " : " " ,
" exit_code " : - 1 ,
" error " : approval . get ( " message " , " Command denied - potentially dangerous operation " ) ,
" status " : " blocked "
} , ensure_ascii = False )
2026-01-23 12:26:53 +00:00
# Prepare command for execution
2025-07-26 04:31:17 +00:00
if background :
2026-01-23 12:26:53 +00:00
# Run in background with nohup and redirect output
exec_command = f " nohup { command } > /tmp/bg_output.log 2>&1 & "
try :
result = env . execute ( exec_command , timeout = 10 )
return json . dumps ( {
" output " : " Background task started successfully " ,
" exit_code " : 0 ,
" error " : None
} , ensure_ascii = False )
except Exception as e :
return json . dumps ( {
" output " : " " ,
" exit_code " : - 1 ,
" error " : f " Failed to start background task: { str ( e ) } "
} , ensure_ascii = False )
else :
# Run foreground command with retry logic
max_retries = 3
retry_count = 0
result = None
2026-01-10 05:56:26 +00:00
2026-01-23 12:26:53 +00:00
while retry_count < = max_retries :
try :
result = env . execute ( command , timeout = effective_timeout )
except Exception as e :
error_str = str ( e ) . lower ( )
if " timeout " in error_str :
return json . dumps ( {
" output " : " " ,
" exit_code " : 124 ,
" error " : f " Command timed out after { effective_timeout } seconds "
} , ensure_ascii = False )
# Retry on transient errors
if retry_count < max_retries :
retry_count + = 1
wait_time = 2 * * retry_count
print ( f " ⚠️ Terminal: execution error, retrying in { wait_time } s (attempt { retry_count } / { max_retries } ) " )
2026-02-08 05:00:47 +00:00
print ( f " Command: { command [ : 200 ] } " )
print ( f " Error: { type ( e ) . __name__ } : { e } " )
print ( f " Task ID: { effective_task_id } , Backend: { env_type } " )
2026-01-23 12:26:53 +00:00
time . sleep ( wait_time )
continue
2026-02-08 05:00:47 +00:00
print ( f " ❌ Terminal: execution failed after { max_retries } retries " )
print ( f " Command: { command [ : 200 ] } " )
print ( f " Error: { type ( e ) . __name__ } : { e } " )
print ( f " Task ID: { effective_task_id } , Backend: { env_type } " )
2026-01-23 12:26:53 +00:00
return json . dumps ( {
" output " : " " ,
" exit_code " : - 1 ,
2026-02-08 05:00:47 +00:00
" error " : f " Command execution failed: { type ( e ) . __name__ } : { str ( e ) } "
2026-01-23 12:26:53 +00:00
} , ensure_ascii = False )
# Got a result
break
2026-01-10 05:56:26 +00:00
2026-01-23 12:26:53 +00:00
# Extract output
output = result . get ( " output " , " " )
returncode = result . get ( " returncode " , 0 )
2026-01-10 05:56:26 +00:00
2026-02-02 23:35:18 -08:00
# Add helpful message for sudo failures in messaging context
output = _handle_sudo_failure ( output , env_type )
2026-01-23 12:26:53 +00:00
# Truncate output if too long
MAX_OUTPUT_CHARS = 50000
if len ( output ) > MAX_OUTPUT_CHARS :
truncated_notice = f " \n \n ... [OUTPUT TRUNCATED - showing last { MAX_OUTPUT_CHARS } chars of { len ( output ) } total] ... "
output = truncated_notice + output [ - MAX_OUTPUT_CHARS : ]
2026-01-10 05:56:26 +00:00
2026-01-23 12:26:53 +00:00
return json . dumps ( {
" output " : output . strip ( ) if output else " " ,
" exit_code " : returncode ,
" error " : None
} , ensure_ascii = False )
2025-11-02 08:52:05 +08:00
2025-07-25 15:15:36 +00:00
except Exception as e :
return json . dumps ( {
2025-07-26 04:31:17 +00:00
" output " : " " ,
2025-07-25 15:15:36 +00:00
" exit_code " : - 1 ,
2026-01-23 12:26:53 +00:00
" error " : f " Failed to execute command: { str ( e ) } " ,
2025-07-25 15:15:36 +00:00
" status " : " error "
2025-11-05 03:47:17 +00:00
} , ensure_ascii = False )
2025-07-25 15:15:36 +00:00
2026-01-23 12:26:53 +00:00
def check_terminal_requirements ( ) - > bool :
""" Check if all requirements for the terminal tool are met. """
config = _get_env_config ( )
env_type = config [ " env_type " ]
2025-07-25 15:15:36 +00:00
try :
2026-01-23 12:26:53 +00:00
if env_type == " local " :
from minisweagent . environments . local import LocalEnvironment
return True
elif env_type == " docker " :
from minisweagent . environments . docker import DockerEnvironment
# Check if docker is available
import subprocess
result = subprocess . run ( [ " docker " , " version " ] , capture_output = True , timeout = 5 )
return result . returncode == 0
2026-01-29 06:10:24 +00:00
elif env_type == " singularity " :
from minisweagent . environments . singularity import SingularityEnvironment
# Check if singularity/apptainer is available
import subprocess
import shutil
executable = shutil . which ( " apptainer " ) or shutil . which ( " singularity " )
if executable :
result = subprocess . run ( [ executable , " --version " ] , capture_output = True , timeout = 5 )
return result . returncode == 0
return False
2026-01-23 12:26:53 +00:00
elif env_type == " modal " :
from minisweagent . environments . extra . swerex_modal import SwerexModalEnvironment
# Check for modal token
return os . getenv ( " MODAL_TOKEN_ID " ) is not None or Path . home ( ) . joinpath ( " .modal.toml " ) . exists ( )
else :
return False
2025-10-03 09:46:44 +00:00
except Exception as e :
2026-01-23 12:26:53 +00:00
print ( f " Terminal requirements check failed: { e } " )
2025-07-25 15:15:36 +00:00
return False
if __name__ == " __main__ " :
2026-01-23 12:26:53 +00:00
""" Simple test when run directly. """
print ( " Terminal Tool Module (mini-swe-agent backend) " )
print ( " = " * 50 )
2025-07-25 15:15:36 +00:00
2026-01-23 12:26:53 +00:00
config = _get_env_config ( )
print ( f " \n Current Configuration: " )
print ( f " Environment type: { config [ ' env_type ' ] } " )
print ( f " Docker image: { config [ ' docker_image ' ] } " )
print ( f " Modal image: { config [ ' modal_image ' ] } " )
print ( f " Working directory: { config [ ' cwd ' ] } " )
print ( f " Default timeout: { config [ ' timeout ' ] } s " )
print ( f " Lifetime: { config [ ' lifetime_seconds ' ] } s " )
if not check_terminal_requirements ( ) :
print ( " \n ❌ Requirements not met. Please check the messages above. " )
2025-07-25 15:15:36 +00:00
exit ( 1 )
2026-01-23 12:26:53 +00:00
print ( " \n ✅ All requirements met! " )
2025-07-26 04:31:17 +00:00
print ( " \n Available Tool: " )
2026-01-23 12:26:53 +00:00
print ( " - terminal_tool: Execute commands using mini-swe-agent environments " )
2025-07-25 15:15:36 +00:00
print ( " \n Usage Examples: " )
print ( " # Execute a command " )
2025-07-26 04:31:17 +00:00
print ( " result = terminal_tool(command= ' ls -la ' ) " )
2025-07-25 15:15:36 +00:00
print ( " " )
2025-07-26 04:31:17 +00:00
print ( " # Run a background task " )
2026-01-23 12:26:53 +00:00
print ( " result = terminal_tool(command= ' python server.py ' , background=True) " )
2025-07-25 15:15:36 +00:00
print ( " \n Environment Variables: " )
2026-02-02 19:13:41 -08:00
default_img = " nikolaik/python-nodejs:python3.11-nodejs20 "
print ( f " TERMINAL_ENV: { os . getenv ( ' TERMINAL_ENV ' , ' local ' ) } (local/docker/singularity/modal/ssh) " )
print ( f " TERMINAL_DOCKER_IMAGE: { os . getenv ( ' TERMINAL_DOCKER_IMAGE ' , default_img ) } " )
print ( f " TERMINAL_SINGULARITY_IMAGE: { os . getenv ( ' TERMINAL_SINGULARITY_IMAGE ' , f ' docker:// { default_img } ' ) } " )
print ( f " TERMINAL_MODAL_IMAGE: { os . getenv ( ' TERMINAL_MODAL_IMAGE ' , default_img ) } " )
2026-02-08 12:56:40 -08:00
print ( f " TERMINAL_CWD: { os . getenv ( ' TERMINAL_CWD ' , os . getcwd ( ) ) } " )
2026-01-23 12:26:53 +00:00
print ( f " TERMINAL_TIMEOUT: { os . getenv ( ' TERMINAL_TIMEOUT ' , ' 60 ' ) } " )
print ( f " TERMINAL_LIFETIME_SECONDS: { os . getenv ( ' TERMINAL_LIFETIME_SECONDS ' , ' 300 ' ) } " )