Files
hermes-agent/tests/tools/test_threaded_process_handle.py
alt-glitch d684d7ee7e feat(environments): unified spawn-per-call execution layer
Replace dual execution model (PersistentShellMixin + per-backend oneshot)
with spawn-per-call + session snapshot for all backends except ManagedModal.

Core changes:
- Every command spawns a fresh bash process; session snapshot (env vars,
  functions, aliases) captured at init and re-sourced before each command
- CWD persists via file-based read (local) or in-band stdout markers (remote)
- ProcessHandle protocol + _ThreadedProcessHandle adapter for SDK backends
- cancel_fn wired for Modal (sandbox.terminate) and Daytona (sandbox.stop)
- Shared utilities extracted: _pipe_stdin, _popen_bash, _load_json_store,
  _save_json_store, _file_mtime_key, _SYNC_INTERVAL_SECONDS
- Rate-limited file sync unified in base _before_execute() with _sync_files() hook
- execute_oneshot() removed; all 11 call sites in code_execution_tool.py
  migrated to execute()
- Daytona timeout wrapper replaced with SDK-native timeout parameter
- persistent_shell.py deleted (291 lines)

Backend-specific:
- Local: process-group kill via os.killpg, file-based CWD read
- Docker: -e env flags only on init_session, not per-command
- SSH: shlex.quote transport, ControlMaster connection reuse
- Singularity: apptainer exec with instance://, no forced --pwd
- Modal: _AsyncWorker + _ThreadedProcessHandle, cancel_fn -> sandbox.terminate
- Daytona: SDK-level timeout (not shell wrapper), cancel_fn -> sandbox.stop
- ManagedModal: unchanged (gateway owns execution); docstring added explaining why
2026-04-08 17:23:15 -07:00

145 lines
3.6 KiB
Python

"""Tests for _ThreadedProcessHandle — the adapter for SDK backends."""
import threading
import time
from tools.environments.base import _ThreadedProcessHandle
class TestBasicExecution:
def test_successful_execution(self):
def exec_fn():
return ("hello world", 0)
handle = _ThreadedProcessHandle(exec_fn)
handle.wait(timeout=5)
assert handle.returncode == 0
output = handle.stdout.read()
assert "hello world" in output
def test_nonzero_exit_code(self):
def exec_fn():
return ("error occurred", 42)
handle = _ThreadedProcessHandle(exec_fn)
handle.wait(timeout=5)
assert handle.returncode == 42
output = handle.stdout.read()
assert "error occurred" in output
def test_exception_in_exec_fn(self):
def exec_fn():
raise RuntimeError("boom")
handle = _ThreadedProcessHandle(exec_fn)
handle.wait(timeout=5)
assert handle.returncode == 1
def test_empty_output(self):
def exec_fn():
return ("", 0)
handle = _ThreadedProcessHandle(exec_fn)
handle.wait(timeout=5)
assert handle.returncode == 0
output = handle.stdout.read()
assert output == ""
class TestPolling:
def test_poll_returns_none_while_running(self):
event = threading.Event()
def exec_fn():
event.wait(timeout=5)
return ("done", 0)
handle = _ThreadedProcessHandle(exec_fn)
assert handle.poll() is None
event.set()
handle.wait(timeout=5)
assert handle.poll() == 0
def test_poll_returns_returncode_when_done(self):
def exec_fn():
return ("ok", 0)
handle = _ThreadedProcessHandle(exec_fn)
handle.wait(timeout=5)
assert handle.poll() == 0
class TestCancelFn:
def test_cancel_fn_called_on_kill(self):
called = threading.Event()
def cancel():
called.set()
def exec_fn():
time.sleep(10)
return ("", 0)
handle = _ThreadedProcessHandle(exec_fn, cancel_fn=cancel)
handle.kill()
assert called.is_set()
def test_cancel_fn_none_is_safe(self):
def exec_fn():
return ("ok", 0)
handle = _ThreadedProcessHandle(exec_fn, cancel_fn=None)
handle.kill() # should not raise
handle.wait(timeout=5)
assert handle.returncode == 0
def test_cancel_fn_exception_swallowed(self):
def cancel():
raise RuntimeError("cancel failed")
def exec_fn():
return ("ok", 0)
handle = _ThreadedProcessHandle(exec_fn, cancel_fn=cancel)
handle.kill() # should not raise despite cancel raising
handle.wait(timeout=5)
class TestStdoutPipe:
def test_stdout_is_readable(self):
def exec_fn():
return ("line1\nline2\nline3\n", 0)
handle = _ThreadedProcessHandle(exec_fn)
handle.wait(timeout=5)
lines = handle.stdout.readlines()
assert len(lines) == 3
assert lines[0] == "line1\n"
def test_stdout_iterable(self):
def exec_fn():
return ("a\nb\nc\n", 0)
handle = _ThreadedProcessHandle(exec_fn)
handle.wait(timeout=5)
collected = list(handle.stdout)
assert len(collected) == 3
def test_unicode_output(self):
def exec_fn():
return ("hello 世界 🌍\n", 0)
handle = _ThreadedProcessHandle(exec_fn)
handle.wait(timeout=5)
output = handle.stdout.read()
assert "世界" in output
assert "🌍" in output