Files
hermes-agent/tools/environments/daytona.py
Teknium d0ffb111c2 refactor: codebase-wide lint cleanup — unused imports, dead code, and inefficient patterns (#5821)
Comprehensive cleanup across 80 files based on automated (ruff, pyflakes, vulture)
and manual analysis of the entire codebase.

Changes by category:

Unused imports removed (~95 across 55 files):
- Removed genuinely unused imports from all major subsystems
- agent/, hermes_cli/, tools/, gateway/, plugins/, cron/
- Includes imports in try/except blocks that were truly unused
  (vs availability checks which were left alone)

Unused variables removed (~25):
- Removed dead variables: connected, inner, channels, last_exc,
  source, new_server_names, verify, pconfig, default_terminal,
  result, pending_handled, temperature, loop
- Dropped unused argparse subparser assignments in hermes_cli/main.py
  (12 instances of add_parser() where result was never used)

Dead code removed:
- run_agent.py: Removed dead ternary (None if False else None) and
  surrounding unreachable branch in identity fallback
- run_agent.py: Removed write-only attribute _last_reported_tool
- hermes_cli/providers.py: Removed dead @property decorator on
  module-level function (decorator has no effect outside a class)
- gateway/run.py: Removed unused MCP config load before reconnect
- gateway/platforms/slack.py: Removed dead SessionSource construction

Undefined name bugs fixed (would cause NameError at runtime):
- batch_runner.py: Added missing logger = logging.getLogger(__name__)
- tools/environments/daytona.py: Added missing Dict and Path imports

Unnecessary global statements removed (14):
- tools/terminal_tool.py: 5 functions declared global for dicts
  they only mutated via .pop()/[key]=value (no rebinding)
- tools/browser_tool.py: cleanup thread loop only reads flag
- tools/rl_training_tool.py: 4 functions only do dict mutations
- tools/mcp_oauth.py: only reads the global
- hermes_time.py: only reads cached values

Inefficient patterns fixed:
- startswith/endswith tuple form: 15 instances of
  x.startswith('a') or x.startswith('b') consolidated to
  x.startswith(('a', 'b'))
- len(x)==0 / len(x)>0: 13 instances replaced with pythonic
  truthiness checks (not x / bool(x))
- in dict.keys(): 5 instances simplified to in dict
- Redefined unused name: removed duplicate _strip_mdv2 import in
  send_message_tool.py

Other fixes:
- hermes_cli/doctor.py: Replaced undefined logger.debug() with pass
- hermes_cli/config.py: Consolidated chained .endswith() calls

Test results: 3934 passed, 17 failed (all pre-existing on main),
19 skipped. Zero regressions.
2026-04-07 10:25:31 -07:00

301 lines
12 KiB
Python

"""Daytona cloud execution environment.
Uses the Daytona Python SDK to run commands in cloud sandboxes.
Supports persistent sandboxes: when enabled, sandboxes are stopped on cleanup
and resumed on next creation, preserving the filesystem across sessions.
"""
import logging
import time
import math
import shlex
import threading
import uuid
import warnings
from pathlib import Path
from typing import Dict, Optional
from tools.environments.base import BaseEnvironment
from tools.interrupt import is_interrupted
logger = logging.getLogger(__name__)
class DaytonaEnvironment(BaseEnvironment):
"""Daytona cloud sandbox execution backend.
Uses stopped/started sandbox lifecycle for filesystem persistence
instead of snapshots, making it faster and stateless on the host.
"""
def __init__(
self,
image: str,
cwd: str = "/home/daytona",
timeout: int = 60,
cpu: int = 1,
memory: int = 5120, # MB (hermes convention)
disk: int = 10240, # MB (Daytona platform max is 10GB)
persistent_filesystem: bool = True,
task_id: str = "default",
):
self._requested_cwd = cwd
super().__init__(cwd=cwd, timeout=timeout)
from daytona import (
Daytona,
CreateSandboxFromImageParams,
DaytonaError,
Resources,
SandboxState,
)
self._persistent = persistent_filesystem
self._task_id = task_id
self._SandboxState = SandboxState
self._daytona = Daytona()
self._sandbox = None
self._lock = threading.Lock()
memory_gib = max(1, math.ceil(memory / 1024))
disk_gib = max(1, math.ceil(disk / 1024))
if disk_gib > 10:
warnings.warn(
f"Daytona: requested disk ({disk_gib}GB) exceeds platform limit (10GB). "
f"Capping to 10GB. Set container_disk: 10240 in config to silence this.",
stacklevel=2,
)
disk_gib = 10
resources = Resources(cpu=cpu, memory=memory_gib, disk=disk_gib)
labels = {"hermes_task_id": task_id}
sandbox_name = f"hermes-{task_id}"
# Try to resume an existing sandbox for this task
if self._persistent:
# 1. Try name-based lookup (new path)
try:
self._sandbox = self._daytona.get(sandbox_name)
self._sandbox.start()
logger.info("Daytona: resumed sandbox %s for task %s",
self._sandbox.id, task_id)
except DaytonaError:
self._sandbox = None
except Exception as e:
logger.warning("Daytona: failed to resume sandbox for task %s: %s",
task_id, e)
self._sandbox = None
# 2. Legacy fallback: find sandbox created before the naming migration
if self._sandbox is None:
try:
page = self._daytona.list(labels=labels, page=1, limit=1)
if page.items:
self._sandbox = page.items[0]
self._sandbox.start()
logger.info("Daytona: resumed legacy sandbox %s for task %s",
self._sandbox.id, task_id)
except Exception as e:
logger.debug("Daytona: no legacy sandbox found for task %s: %s",
task_id, e)
self._sandbox = None
# Create a fresh sandbox if we don't have one
if self._sandbox is None:
self._sandbox = self._daytona.create(
CreateSandboxFromImageParams(
image=image,
name=sandbox_name,
labels=labels,
auto_stop_interval=0,
resources=resources,
)
)
logger.info("Daytona: created sandbox %s for task %s",
self._sandbox.id, task_id)
# Detect remote home dir first so mounts go to the right place.
self._remote_home = "/root"
try:
home = self._sandbox.process.exec("echo $HOME").result.strip()
if home:
self._remote_home = home
if self._requested_cwd in ("~", "/home/daytona"):
self.cwd = home
except Exception:
pass
logger.info("Daytona: resolved home to %s, cwd to %s", self._remote_home, self.cwd)
# Track synced files to avoid redundant uploads.
# Key: remote_path, Value: (mtime, size)
self._synced_files: Dict[str, tuple] = {}
# Upload credential files and skills directory into the sandbox.
self._sync_skills_and_credentials()
def _upload_if_changed(self, host_path: str, remote_path: str) -> bool:
"""Upload a file if its mtime/size changed since last sync."""
hp = Path(host_path)
try:
stat = hp.stat()
file_key = (stat.st_mtime, stat.st_size)
except OSError:
return False
if self._synced_files.get(remote_path) == file_key:
return False
try:
parent = str(Path(remote_path).parent)
self._sandbox.process.exec(f"mkdir -p {parent}")
self._sandbox.fs.upload_file(host_path, remote_path)
self._synced_files[remote_path] = file_key
return True
except Exception as e:
logger.debug("Daytona: upload failed %s: %s", host_path, e)
return False
def _sync_skills_and_credentials(self) -> None:
"""Upload changed credential files and skill files into the sandbox."""
container_base = f"{self._remote_home}/.hermes"
try:
from tools.credential_files import get_credential_file_mounts, iter_skills_files
for mount_entry in get_credential_file_mounts():
remote_path = mount_entry["container_path"].replace("/root/.hermes", container_base, 1)
if self._upload_if_changed(mount_entry["host_path"], remote_path):
logger.debug("Daytona: synced credential %s", remote_path)
for entry in iter_skills_files(container_base=container_base):
if self._upload_if_changed(entry["host_path"], entry["container_path"]):
logger.debug("Daytona: synced skill %s", entry["container_path"])
except Exception as e:
logger.debug("Daytona: could not sync skills/credentials: %s", e)
def _ensure_sandbox_ready(self):
"""Restart sandbox if it was stopped (e.g., by a previous interrupt)."""
self._sandbox.refresh_data()
if self._sandbox.state in (self._SandboxState.STOPPED, self._SandboxState.ARCHIVED):
self._sandbox.start()
logger.info("Daytona: restarted sandbox %s", self._sandbox.id)
def _exec_in_thread(self, exec_command: str, cwd: Optional[str], timeout: int) -> dict:
"""Run exec in a background thread with interrupt polling.
The Daytona SDK's exec(timeout=...) parameter is unreliable (the
server-side timeout is not enforced and the SDK has no client-side
fallback), so we wrap the command with the shell ``timeout`` utility
which reliably kills the process and returns exit code 124.
"""
# Wrap with shell `timeout` to enforce the deadline reliably.
# Add a small buffer so the shell timeout fires before any SDK-level
# timeout would, giving us a clean exit code 124.
timed_command = f"timeout {timeout} sh -c {shlex.quote(exec_command)}"
result_holder: dict = {"value": None, "error": None}
def _run():
try:
response = self._sandbox.process.exec(
timed_command, cwd=cwd,
)
result_holder["value"] = {
"output": response.result or "",
"returncode": response.exit_code,
}
except Exception as e:
result_holder["error"] = e
t = threading.Thread(target=_run, daemon=True)
t.start()
# Wait for timeout + generous buffer for network/SDK overhead
deadline = time.monotonic() + timeout + 10
while t.is_alive():
t.join(timeout=0.2)
if is_interrupted():
with self._lock:
try:
self._sandbox.stop()
except Exception:
pass
return {
"output": "[Command interrupted - Daytona sandbox stopped]",
"returncode": 130,
}
if time.monotonic() > deadline:
# Shell timeout didn't fire and SDK is hung — force stop
with self._lock:
try:
self._sandbox.stop()
except Exception:
pass
return self._timeout_result(timeout)
if result_holder["error"]:
return {"error": result_holder["error"]}
return result_holder["value"]
def execute(self, command: str, cwd: str = "", *,
timeout: Optional[int] = None,
stdin_data: Optional[str] = None) -> dict:
with self._lock:
self._ensure_sandbox_ready()
# Incremental sync before each command so mid-session credential
# refreshes and skill updates are picked up.
self._sync_skills_and_credentials()
if stdin_data is not None:
marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}"
while marker in stdin_data:
marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}"
command = f"{command} << '{marker}'\n{stdin_data}\n{marker}"
exec_command, sudo_stdin = self._prepare_command(command)
# Daytona sandboxes execute commands via the Daytona SDK and cannot
# pipe subprocess stdin directly the way a local Popen can. When a
# sudo password is present, use a shell-level pipe from printf so that
# the password feeds sudo -S without appearing as an echo argument
# embedded in the shell string. The password is still visible in the
# remote sandbox's command line, but it is not exposed on the user's
# local machine — which is the primary threat being mitigated.
if sudo_stdin is not None:
import shlex
exec_command = (
f"printf '%s\\n' {shlex.quote(sudo_stdin.rstrip())} | {exec_command}"
)
effective_cwd = cwd or self.cwd or None
effective_timeout = timeout or self.timeout
result = self._exec_in_thread(exec_command, effective_cwd, effective_timeout)
if "error" in result:
from daytona import DaytonaError
err = result["error"]
if isinstance(err, DaytonaError):
with self._lock:
try:
self._ensure_sandbox_ready()
except Exception:
return {"output": f"Daytona execution error: {err}", "returncode": 1}
result = self._exec_in_thread(exec_command, effective_cwd, effective_timeout)
if "error" not in result:
return result
return {"output": f"Daytona execution error: {err}", "returncode": 1}
return result
def cleanup(self):
with self._lock:
if self._sandbox is None:
return
try:
if self._persistent:
self._sandbox.stop()
logger.info("Daytona: stopped sandbox %s (filesystem preserved)",
self._sandbox.id)
else:
self._daytona.delete(self._sandbox)
logger.info("Daytona: deleted sandbox %s", self._sandbox.id)
except Exception as e:
logger.warning("Daytona: cleanup failed: %s", e)
self._sandbox = None