refactor: streamline cron job handling and update CLI commands

- Removed legacy cron daemon functionality, integrating cron job execution directly into the gateway process for improved efficiency.
- Updated CLI commands to reflect changes, replacing `hermes cron daemon` with `hermes cron status` and enhancing documentation for cron job management.
- Clarified messaging in the README and other documentation regarding the gateway's role in managing cron jobs.
- Removed obsolete terminal_hecate tool and related configurations to simplify the codebase.
This commit is contained in:
teknium1
2026-02-21 16:21:19 -08:00
parent 79b62497d1
commit 7cb6427dea
17 changed files with 182 additions and 623 deletions

View File

@@ -173,18 +173,6 @@ HERMES_OPENAI_API_KEY=
# HERMES_HUMAN_DELAY_MIN_MS=800 # Min delay in ms (custom mode)
# HERMES_HUMAN_DELAY_MAX_MS=2500 # Max delay in ms (custom mode)
# =============================================================================
# LEGACY/OPTIONAL API KEYS
# =============================================================================
# Morph API Key - For legacy Hecate terminal backend (terminal-hecate tool)
# Get at: https://morph.so/
MORPH_API_KEY=
# Hecate VM Settings (only if using terminal-hecate tool)
HECATE_VM_LIFETIME_SECONDS=300
HECATE_DEFAULT_SNAPSHOT_ID=snapshot_p5294qxt
# =============================================================================
# DEBUG OPTIONS
# =============================================================================

2
.gitignore vendored
View File

@@ -1,7 +1,5 @@
/venv/
/_pycache/
hecate/
hecate-lib/
*.pyc*
__pycache__/
.venv/

View File

@@ -202,8 +202,10 @@ The unified `hermes` command provides all functionality:
| `hermes doctor` | Diagnose issues |
| `hermes update` | Update to latest (checks for new config) |
| `hermes uninstall` | Uninstall (can keep configs for reinstall) |
| `hermes gateway` | Start messaging gateway |
| `hermes gateway` | Start gateway (messaging + cron scheduler) |
| `hermes gateway install` | Install gateway as system service |
| `hermes cron list` | View scheduled jobs |
| `hermes cron status` | Check if cron scheduler is running |
| `hermes version` | Show version info |
| `hermes pairing list/approve/revoke` | Manage DM pairing codes |

View File

@@ -316,12 +316,17 @@ hermes doctor # Diagnose issues
hermes update # Update to latest version
hermes uninstall # Uninstall (can keep configs for later reinstall)
# Messaging, skills, cron
hermes gateway # Start messaging gateway
# Gateway (messaging + cron scheduler)
hermes gateway # Run gateway in foreground
hermes gateway install # Install as system service (messaging + cron)
hermes gateway status # Check service status
# Skills, cron, misc
hermes skills search k8s # Search skill registries
hermes skills install ... # Install a skill (with security scan)
hermes skills list # List installed skills
hermes cron list # View scheduled jobs
hermes cron status # Check if cron scheduler is running
hermes pairing list # View/manage DM pairing codes
hermes version # Show version info
```
@@ -505,7 +510,7 @@ sessions/
Schedule tasks to run automatically:
```bash
# In the CLI
# In the CLI (/cron slash commands)
/cron add 30m "Remind me to check the build"
/cron add "every 2h" "Check server status"
/cron add "0 9 * * *" "Morning briefing"
@@ -513,14 +518,20 @@ Schedule tasks to run automatically:
/cron remove <job_id>
```
The agent can also self-schedule using `schedule_cronjob` tool.
The agent can also self-schedule using the `schedule_cronjob` tool from any platform (CLI, Telegram, Discord, etc.).
**Cron execution is handled by the gateway daemon.** The gateway ticks the scheduler every 60 seconds, running any due jobs in isolated agent sessions:
**Run the scheduler:**
```bash
hermes cron daemon # Built-in daemon
# Or add to system cron for reliability
hermes gateway install # Install as system service (recommended)
hermes gateway # Or run in foreground
hermes cron list # View scheduled jobs
hermes cron status # Check if gateway is running
```
Even if no messaging platforms are configured, the gateway stays running for cron. A file lock prevents duplicate execution if multiple processes overlap.
### 🛡️ Exec Approval (Messaging Platforms)
When the agent tries to run a potentially dangerous command (rm -rf, chmod 777, etc.) on Telegram/Discord/WhatsApp, instead of blocking it silently, it asks the user for approval:

27
cli.py
View File

@@ -277,8 +277,8 @@ from run_agent import AIAgent
from model_tools import get_tool_definitions, get_toolset_for_tool
from toolsets import get_all_toolsets, get_toolset_info, resolve_toolset, validate_toolset
# Cron job system for scheduled tasks
from cron import create_job, list_jobs, remove_job, get_job, run_daemon as run_cron_daemon, tick as cron_tick
# Cron job system for scheduled tasks (CRUD only — execution is handled by the gateway)
from cron import create_job, list_jobs, remove_job, get_job
# Resource cleanup imports for safe shutdown (terminal VMs, browser sessions)
from tools.terminal_tool import cleanup_all_environments as _cleanup_all_terminals
@@ -2475,8 +2475,6 @@ def main(
compact: bool = False,
list_tools: bool = False,
list_toolsets: bool = False,
cron_daemon: bool = False,
cron_tick_once: bool = False,
gateway: bool = False,
):
"""
@@ -2495,37 +2493,18 @@ def main(
compact: Use compact display mode
list_tools: List available tools and exit
list_toolsets: List available toolsets and exit
cron_daemon: Run as cron daemon (check and execute due jobs continuously)
cron_tick_once: Run due cron jobs once and exit (for system cron integration)
Examples:
python cli.py # Start interactive mode
python cli.py --toolsets web,terminal # Use specific toolsets
python cli.py -q "What is Python?" # Single query mode
python cli.py --list-tools # List tools and exit
python cli.py --cron-daemon # Run cron scheduler daemon
python cli.py --cron-tick-once # Check and run due jobs once
"""
# Signal to terminal_tool that we're in interactive mode
# This enables interactive sudo password prompts with timeout
os.environ["HERMES_INTERACTIVE"] = "1"
# Handle cron daemon mode (runs before CLI initialization)
if cron_daemon:
print("Starting Hermes Cron Daemon...")
print("Jobs will be checked every 60 seconds.")
print("Press Ctrl+C to stop.\n")
run_cron_daemon(check_interval=60, verbose=True)
return
# Handle cron tick (single run for system cron integration)
if cron_tick_once:
jobs_run = cron_tick(verbose=True)
if jobs_run:
print(f"Executed {jobs_run} job(s)")
return
# Handle gateway mode (messaging platforms)
# Handle gateway mode (messaging + cron)
if gateway:
import asyncio
from gateway.run import start_gateway

View File

@@ -6,12 +6,12 @@ This module provides scheduled task execution, allowing the agent to:
- Self-schedule reminders and follow-up tasks
- Execute tasks in isolated sessions (no prior context)
Usage:
# Run due jobs (for system cron integration)
python -c "from cron import tick; tick()"
# Or via CLI
python cli.py --cron-daemon
Cron jobs are executed automatically by the gateway daemon:
hermes gateway install # Install as system service (recommended)
hermes gateway # Or run in foreground
The gateway ticks the scheduler every 60 seconds. A file lock prevents
duplicate execution if multiple processes overlap.
"""
from cron.jobs import (
@@ -22,7 +22,7 @@ from cron.jobs import (
update_job,
JOBS_FILE,
)
from cron.scheduler import tick, run_daemon
from cron.scheduler import tick
__all__ = [
"create_job",
@@ -31,6 +31,5 @@ __all__ = [
"remove_job",
"update_job",
"tick",
"run_daemon",
"JOBS_FILE",
]

View File

@@ -1,15 +1,17 @@
"""
Cron job scheduler - executes due jobs.
This module provides:
- tick(): Run all due jobs once (for system cron integration)
- run_daemon(): Run continuously, checking every 60 seconds
Provides tick() which checks for due jobs and runs them. The gateway
calls this every 60 seconds from a background thread.
Uses a file-based lock (~/.hermes/cron/.tick.lock) so only one tick
runs at a time if multiple processes overlap.
"""
import fcntl
import logging
import os
import sys
import time
import traceback
from datetime import datetime
from pathlib import Path
@@ -22,6 +24,10 @@ sys.path.insert(0, str(Path(__file__).parent.parent))
from cron.jobs import get_due_jobs, mark_job_run, save_job_output
# File-based lock prevents concurrent ticks from gateway + daemon + systemd timer
_LOCK_DIR = Path.home() / ".hermes" / "cron"
_LOCK_FILE = _LOCK_DIR / ".tick.lock"
def run_job(job: dict) -> tuple[bool, str, Optional[str]]:
"""
@@ -105,86 +111,56 @@ def tick(verbose: bool = True) -> int:
"""
Check and run all due jobs.
This is designed to be called by system cron every minute:
*/1 * * * * cd ~/hermes-agent && python -c "from cron import tick; tick()"
Uses a file lock so only one tick runs at a time, even if the gateway's
in-process ticker and a standalone daemon or manual tick overlap.
Args:
verbose: Whether to print status messages
Returns:
Number of jobs executed
Number of jobs executed (0 if another tick is already running)
"""
due_jobs = get_due_jobs()
if verbose and not due_jobs:
logger.info("%s - No jobs due", datetime.now().strftime('%H:%M:%S'))
return 0
if verbose:
logger.info("%s - %s job(s) due", datetime.now().strftime('%H:%M:%S'), len(due_jobs))
executed = 0
for job in due_jobs:
try:
success, output, error = run_job(job)
# Save output to file
output_file = save_job_output(job["id"], output)
if verbose:
logger.info("Output saved to: %s", output_file)
# Mark job as run (handles repeat counting, next_run computation)
mark_job_run(job["id"], success, error)
executed += 1
except Exception as e:
logger.error("Error processing job %s: %s", job['id'], e)
mark_job_run(job["id"], False, str(e))
return executed
_LOCK_DIR.mkdir(parents=True, exist_ok=True)
def run_daemon(check_interval: int = 60, verbose: bool = True):
"""
Run the cron daemon continuously.
Checks for due jobs every `check_interval` seconds.
Args:
check_interval: Seconds between checks (default: 60)
verbose: Whether to print status messages
"""
logger.info("Starting daemon (checking every %ss)", check_interval)
logger.info("Press Ctrl+C to stop")
try:
while True:
lock_fd = open(_LOCK_FILE, "w")
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except (OSError, IOError):
# Another tick is already running — skip silently
logger.debug("Tick skipped — another instance holds the lock")
return 0
try:
due_jobs = get_due_jobs()
if verbose and not due_jobs:
logger.info("%s - No jobs due", datetime.now().strftime('%H:%M:%S'))
return 0
if verbose:
logger.info("%s - %s job(s) due", datetime.now().strftime('%H:%M:%S'), len(due_jobs))
executed = 0
for job in due_jobs:
try:
tick(verbose=verbose)
success, output, error = run_job(job)
output_file = save_job_output(job["id"], output)
if verbose:
logger.info("Output saved to: %s", output_file)
mark_job_run(job["id"], success, error)
executed += 1
except Exception as e:
logger.error("Tick error: %s", e)
time.sleep(check_interval)
except KeyboardInterrupt:
logger.info("Daemon stopped")
logger.error("Error processing job %s: %s", job['id'], e)
mark_job_run(job["id"], False, str(e))
return executed
finally:
fcntl.flock(lock_fd, fcntl.LOCK_UN)
lock_fd.close()
if __name__ == "__main__":
# Allow running directly: python cron/scheduler.py [daemon|tick]
import argparse
parser = argparse.ArgumentParser(description="Hermes Cron Scheduler")
parser.add_argument("mode", choices=["daemon", "tick"], default="tick", nargs="?",
help="Mode: 'tick' to run once, 'daemon' to run continuously")
parser.add_argument("--interval", type=int, default=60,
help="Check interval in seconds for daemon mode")
parser.add_argument("--quiet", "-q", action="store_true",
help="Suppress status messages")
args = parser.parse_args()
if args.mode == "daemon":
run_daemon(check_interval=args.interval, verbose=not args.quiet)
else:
tick(verbose=not args.quiet)
tick(verbose=True)

View File

@@ -341,6 +341,8 @@ Without ffmpeg, Edge TTS audio is sent as a regular audio file (still playable,
## Cron Job Delivery
Cron jobs are executed automatically by the gateway daemon. When the gateway is running (via `hermes gateway` or `hermes gateway install`), it ticks the scheduler every 60 seconds and runs due jobs.
When scheduling cron jobs, you can specify where the output should be delivered:
```

View File

@@ -19,6 +19,7 @@ import os
import re
import sys
import signal
import threading
from pathlib import Path
from datetime import datetime
from typing import Dict, Optional, Any, List
@@ -153,8 +154,8 @@ class GatewayRunner:
logger.error("%s error: %s", platform.value, e)
if connected_count == 0:
logger.warning("No platforms connected. Check your configuration.")
return False
logger.warning("No messaging platforms connected.")
logger.info("Gateway will continue running for cron job execution.")
# Update delivery router with adapters
self.delivery_router.adapters = self.adapters
@@ -169,7 +170,8 @@ class GatewayRunner:
"platforms": [p.value for p in self.adapters.keys()],
})
logger.info("Gateway running with %s platform(s)", connected_count)
if connected_count > 0:
logger.info("Gateway running with %s platform(s)", connected_count)
logger.info("Press Ctrl+C to stop")
return True
@@ -1315,6 +1317,25 @@ class GatewayRunner:
return response
def _start_cron_ticker(stop_event: threading.Event, interval: int = 60):
"""
Background thread that ticks the cron scheduler at a regular interval.
Runs inside the gateway process so cronjobs fire automatically without
needing a separate `hermes cron daemon` or system cron entry.
"""
from cron.scheduler import tick as cron_tick
logger.info("Cron ticker started (interval=%ds)", interval)
while not stop_event.is_set():
try:
cron_tick(verbose=False)
except Exception as e:
logger.debug("Cron tick error: %s", e)
stop_event.wait(timeout=interval)
logger.info("Cron ticker stopped")
async def start_gateway(config: Optional[GatewayConfig] = None) -> bool:
"""
Start the gateway and run until interrupted.
@@ -1334,7 +1355,6 @@ async def start_gateway(config: Optional[GatewayConfig] = None) -> bool:
try:
loop.add_signal_handler(sig, signal_handler)
except NotImplementedError:
# Windows doesn't support add_signal_handler
pass
# Start the gateway
@@ -1342,8 +1362,23 @@ async def start_gateway(config: Optional[GatewayConfig] = None) -> bool:
if not success:
return False
# Start background cron ticker so scheduled jobs fire automatically
cron_stop = threading.Event()
cron_thread = threading.Thread(
target=_start_cron_ticker,
args=(cron_stop,),
daemon=True,
name="cron-ticker",
)
cron_thread.start()
# Wait for shutdown
await runner.wait_for_shutdown()
# Stop cron ticker cleanly
cron_stop.set()
cron_thread.join(timeout=5)
return True

View File

@@ -1,12 +1,15 @@
"""
Cron subcommand for hermes CLI.
Handles: hermes cron [list|daemon|tick]
Handles: hermes cron [list|status|tick]
Cronjobs are executed automatically by the gateway daemon (hermes gateway).
Install the gateway as a service for background execution:
hermes gateway install
"""
import sys
from pathlib import Path
from datetime import datetime
PROJECT_ROOT = Path(__file__).parent.parent.resolve()
sys.path.insert(0, str(PROJECT_ROOT))
@@ -22,7 +25,7 @@ def cron_list(show_all: bool = False):
if not jobs:
print(color("No scheduled jobs.", Colors.DIM))
print(color("Create one with: hermes cron add <schedule> <prompt>", Colors.DIM))
print(color("Create one with the /cron add command in chat, or via Telegram.", Colors.DIM))
return
print()
@@ -38,7 +41,6 @@ def cron_list(show_all: bool = False):
enabled = job.get("enabled", True)
next_run = job.get("next_run_at", "?")
# Repeat info
repeat_info = job.get("repeat", {})
repeat_times = repeat_info.get("times")
repeat_completed = repeat_info.get("completed", 0)
@@ -48,13 +50,11 @@ def cron_list(show_all: bool = False):
else:
repeat_str = ""
# Delivery targets
deliver = job.get("deliver", ["local"])
if isinstance(deliver, str):
deliver = [deliver]
deliver_str = ", ".join(deliver)
# Status indicator
if not enabled:
status = color("[disabled]", Colors.RED)
else:
@@ -67,32 +67,51 @@ def cron_list(show_all: bool = False):
print(f" Next run: {next_run}")
print(f" Deliver: {deliver_str}")
print()
def cron_daemon(interval: int = 60):
"""Run the cron daemon."""
from cron.scheduler import start_daemon
print(color("┌─────────────────────────────────────────────────────────┐", Colors.CYAN))
print(color("│ ⚕ Hermes Cron Daemon │", Colors.CYAN))
print(color("├─────────────────────────────────────────────────────────┤", Colors.CYAN))
print(color(" Press Ctrl+C to stop │", Colors.CYAN))
print(color("└─────────────────────────────────────────────────────────┘", Colors.CYAN))
print()
try:
start_daemon(interval=interval)
except KeyboardInterrupt:
# Warn if gateway isn't running
from hermes_cli.gateway import find_gateway_pids
if not find_gateway_pids():
print(color(" ⚠ Gateway is not running — jobs won't fire automatically.", Colors.YELLOW))
print(color(" Start it with: hermes gateway install", Colors.DIM))
print()
print(color("Cron daemon stopped.", Colors.YELLOW))
def cron_tick():
"""Run due jobs once (for system cron integration)."""
"""Run due jobs once and exit."""
from cron.scheduler import tick
tick(verbose=True)
def cron_status():
"""Show cron execution status."""
from cron.jobs import list_jobs
from hermes_cli.gateway import find_gateway_pids
print(f"[{datetime.now().isoformat()}] Running cron tick...")
tick()
print()
pids = find_gateway_pids()
if pids:
print(color("✓ Gateway is running — cron jobs will fire automatically", Colors.GREEN))
print(f" PID: {', '.join(map(str, pids))}")
else:
print(color("✗ Gateway is not running — cron jobs will NOT fire", Colors.RED))
print()
print(" To enable automatic execution:")
print(" hermes gateway install # Install as system service (recommended)")
print(" hermes gateway # Or run in foreground")
print()
jobs = list_jobs(include_disabled=False)
if jobs:
next_runs = [j.get("next_run_at") for j in jobs if j.get("next_run_at")]
print(f" {len(jobs)} active job(s)")
if next_runs:
print(f" Next run: {min(next_runs)}")
else:
print(" No active jobs")
print()
def cron_command(args):
@@ -103,14 +122,13 @@ def cron_command(args):
show_all = getattr(args, 'all', False)
cron_list(show_all)
elif subcmd == "daemon":
interval = getattr(args, 'interval', 60)
cron_daemon(interval)
elif subcmd == "tick":
cron_tick()
elif subcmd == "status":
cron_status()
else:
print(f"Unknown cron command: {subcmd}")
print("Usage: hermes cron [list|daemon|tick]")
print("Usage: hermes cron [list|status|tick]")
sys.exit(1)

View File

@@ -356,6 +356,7 @@ def run_gateway(verbose: bool = False):
print("┌─────────────────────────────────────────────────────────┐")
print("│ ⚕ Hermes Gateway Starting... │")
print("├─────────────────────────────────────────────────────────┤")
print("│ Messaging platforms + cron scheduler │")
print("│ Press Ctrl+C to stop │")
print("└─────────────────────────────────────────────────────────┘")
print()

View File

@@ -17,7 +17,7 @@ Usage:
hermes status # Show status of all components
hermes cron # Manage cron jobs
hermes cron list # List cron jobs
hermes cron daemon # Run cron daemon
hermes cron status # Check if cron scheduler is running
hermes doctor # Check configuration and dependencies
hermes version # Show version
hermes update # Update to latest version
@@ -808,12 +808,11 @@ For more help on a command:
cron_list = cron_subparsers.add_parser("list", help="List scheduled jobs")
cron_list.add_argument("--all", action="store_true", help="Include disabled jobs")
# cron daemon
cron_daemon = cron_subparsers.add_parser("daemon", help="Run cron daemon")
cron_daemon.add_argument("--interval", type=int, default=60, help="Check interval in seconds")
# cron status
cron_subparsers.add_parser("status", help="Check if cron scheduler is running")
# cron tick
cron_tick = cron_subparsers.add_parser("tick", help="Run due jobs once (for system cron)")
# cron tick (mostly for debugging)
cron_subparsers.add_parser("tick", help="Run due jobs once and exit")
cron_parser.set_defaults(func=cmd_cron)

View File

@@ -579,7 +579,7 @@ function Start-GatewayIfConfigured {
Write-Host ""
Write-Info "Messaging platform token detected!"
Write-Info "The gateway needs to be running for Hermes to send/receive messages."
Write-Info "The gateway handles messaging platforms and cron job execution."
Write-Host ""
$response = Read-Host "Would you like to start the gateway now? [Y/n]"
@@ -639,8 +639,8 @@ function Write-Completion {
Write-Host "View/edit configuration"
Write-Host " hermes config edit " -NoNewline -ForegroundColor Green
Write-Host "Open config in editor"
Write-Host " hermes gateway " -NoNewline -ForegroundColor Green
Write-Host "Run messaging gateway"
Write-Host " hermes gateway install " -NoNewline -ForegroundColor Green
Write-Host "Install gateway service (messaging + cron)"
Write-Host " hermes update " -NoNewline -ForegroundColor Green
Write-Host "Update to latest version"
Write-Host ""

View File

@@ -797,7 +797,7 @@ print_success() {
echo -e " ${GREEN}hermes setup${NC} Configure API keys & settings"
echo -e " ${GREEN}hermes config${NC} View/edit configuration"
echo -e " ${GREEN}hermes config edit${NC} Open config in editor"
echo -e " ${GREEN}hermes gateway${NC} Run messaging gateway"
echo -e " ${GREEN}hermes gateway install${NC} Install gateway service (messaging + cron)"
echo -e " ${GREEN}hermes update${NC} Update to latest version"
echo ""

View File

@@ -279,8 +279,8 @@ echo " hermes"
echo ""
echo "Other commands:"
echo " hermes status # Check configuration"
echo " hermes gateway # Start messaging gateway"
echo " hermes cron daemon # Run cron daemon"
echo " hermes gateway install # Install gateway service (messaging + cron)"
echo " hermes cron list # View scheduled jobs"
echo " hermes doctor # Diagnose issues"
echo ""

View File

@@ -7,7 +7,6 @@ Each module provides specialized functionality for different capabilities:
- web_tools: Web search, content extraction, and crawling
- terminal_tool: Command execution using mini-swe-agent (local/docker/modal backends)
- terminal_hecate: Command execution on MorphCloud/Hecate cloud VMs (alternative backend)
- vision_tools: Image analysis and understanding
- mixture_of_agents_tool: Multi-model collaborative reasoning
- image_generation_tool: Text-to-image generation with upscaling
@@ -36,13 +35,6 @@ from .terminal_tool import (
TERMINAL_TOOL_DESCRIPTION
)
# Alternative terminal tool (Hecate/MorphCloud cloud VMs)
from .terminal_hecate import (
terminal_hecate_tool,
check_hecate_requirements,
TERMINAL_HECATE_DESCRIPTION
)
from .vision_tools import (
vision_analyze_tool,
check_vision_requirements
@@ -184,10 +176,6 @@ __all__ = [
'register_task_env_overrides',
'clear_task_env_overrides',
'TERMINAL_TOOL_DESCRIPTION',
# Terminal tools (Hecate/MorphCloud backend)
'terminal_hecate_tool',
'check_hecate_requirements',
'TERMINAL_HECATE_DESCRIPTION',
# Vision tools
'vision_analyze_tool',
'check_vision_requirements',

View File

@@ -1,437 +0,0 @@
#!/usr/bin/env python3
"""
Terminal Hecate Tool Module
A terminal tool that executes commands on MorphCloud/Hecate VMs.
Uses E2B-style cloud VMs for execution with automatic lifecycle management.
Features:
- Direct SSH command execution on cloud VMs
- Background task support
- VM lifecycle management with TTL
- Automatic cleanup after inactivity
Usage:
from terminal_hecate import terminal_hecate_tool
# Execute a simple command
result = terminal_hecate_tool("ls -la")
# Execute in background
result = terminal_hecate_tool("python server.py", background=True)
"""
import json
import os
import time
import threading
import atexit
from typing import Optional, Dict, Any
# Tool description for LLM
TERMINAL_HECATE_DESCRIPTION = """Execute commands on a secure cloud Linux VM environment (Hecate/MorphCloud).
**Environment:**
- Minimal Debian-based OS with internet access
- Automatic VM lifecycle management (creates on-demand, reuses, cleans up)
- Filesystem is persisted between tool calls but environment variables, venvs, etc are reset.
**Command Execution:**
- Simple commands: Just provide the 'command' parameter
- Background processes: Set 'background': True for servers/long-running tasks
- Command timeout: Optional 'timeout' parameter in seconds
**Examples:**
- Run command: `{"command": "ls -la"}`
- Background task: `{"command": "source path/to/my/venv/bin/activate && python server.py", "background": True}`
- With timeout: `{"command": "long_task.sh", "timeout": 300}`
**Best Practices:**
- Run servers/long processes in background
- Monitor disk usage for large tasks
- Install whatever tools you need with sudo apt-get
- Do not be afraid to run pip with --break-system-packages
**Things to avoid**
- Do NOT use interactive tools such as tmux, vim, nano, python repl - you will get stuck. Even git sometimes becomes interactive if the output is large. If you're not sure pipe to cat.
"""
# Global state for VM lifecycle management
_active_instances: Dict[str, Any] = {}
_last_activity: Dict[str, float] = {}
_instance_lock = threading.Lock()
_cleanup_thread = None
_cleanup_running = False
def _cleanup_inactive_vms(vm_lifetime_seconds: int = 300):
"""Clean up VMs that have been inactive for longer than vm_lifetime_seconds."""
global _active_instances, _last_activity
current_time = time.time()
tasks_to_cleanup = []
with _instance_lock:
for task_id, last_time in list(_last_activity.items()):
if current_time - last_time > vm_lifetime_seconds:
tasks_to_cleanup.append(task_id)
for task_id in tasks_to_cleanup:
try:
if task_id in _active_instances:
instance = _active_instances[task_id]
if hasattr(instance, 'terminate'):
instance.terminate()
elif hasattr(instance, 'stop'):
instance.stop()
elif hasattr(instance, 'delete'):
instance.delete()
del _active_instances[task_id]
print(f"[VM Cleanup] Terminated inactive VM for task: {task_id}")
if task_id in _last_activity:
del _last_activity[task_id]
except Exception as e:
# 404 errors are benign - VM already cleaned up by TTL
error_str = str(e)
if "404" in error_str or "InstanceNotFoundError" in error_str or "not found" in error_str.lower():
print(f"[VM Cleanup] VM for task {task_id} already cleaned up (likely TTL expiration)")
else:
print(f"[VM Cleanup] Error cleaning up VM for task {task_id}: {e}")
# Always remove from tracking dicts to prevent infinite retry loops
if task_id in _active_instances:
del _active_instances[task_id]
if task_id in _last_activity:
del _last_activity[task_id]
def _cleanup_thread_worker():
"""Background thread worker that periodically cleans up inactive VMs."""
global _cleanup_running
while _cleanup_running:
try:
vm_lifetime = int(os.getenv("HECATE_VM_LIFETIME_SECONDS", "300"))
_cleanup_inactive_vms(vm_lifetime)
except Exception as e:
print(f"[VM Cleanup] Error in cleanup thread: {e}")
for _ in range(60):
if not _cleanup_running:
break
time.sleep(1)
def _start_cleanup_thread():
"""Start the background cleanup thread if not already running."""
global _cleanup_thread, _cleanup_running
with _instance_lock:
if _cleanup_thread is None or not _cleanup_thread.is_alive():
_cleanup_running = True
_cleanup_thread = threading.Thread(target=_cleanup_thread_worker, daemon=True)
_cleanup_thread.start()
def _stop_cleanup_thread():
"""Stop the background cleanup thread."""
global _cleanup_running
_cleanup_running = False
if _cleanup_thread is not None:
_cleanup_thread.join(timeout=5)
def cleanup_vm(task_id: str):
"""Manually clean up a specific VM by task_id."""
global _active_instances, _last_activity
with _instance_lock:
try:
if task_id in _active_instances:
instance = _active_instances[task_id]
if hasattr(instance, 'terminate'):
instance.terminate()
elif hasattr(instance, 'stop'):
instance.stop()
elif hasattr(instance, 'delete'):
instance.delete()
del _active_instances[task_id]
print(f"[VM Cleanup] Manually terminated VM for task: {task_id}")
if task_id in _last_activity:
del _last_activity[task_id]
except Exception as e:
# 404 errors are benign - VM already cleaned up by TTL
error_str = str(e)
if "404" in error_str or "InstanceNotFoundError" in error_str or "not found" in error_str.lower():
print(f"[VM Cleanup] VM for task {task_id} already cleaned up (likely TTL expiration)")
else:
print(f"[VM Cleanup] Error manually cleaning up VM for task {task_id}: {e}")
atexit.register(_stop_cleanup_thread)
def _execute_command(instance, command: str, timeout: Optional[int] = None) -> Dict[str, Any]:
"""
Execute a command on the VM instance using instance.exec() for proper stderr capture.
Args:
instance: MorphVM instance
command: Command to execute
timeout: Optional timeout in seconds (Note: exec() may not support timeout directly)
Returns:
dict with stdout, stderr, returncode
"""
try:
# Use instance.exec() which properly captures both stdout and stderr
# (unlike ssh.run() which doesn't capture stderr correctly)
result = instance.exec(command)
# Debug logging only for verbose mode or unusual cases
# Note: Non-zero exit codes are normal (model's command failed) - not a tool error
if result.exit_code != 0 and not result.stdout and not result.stderr:
# Only log if we got absolutely no output - might indicate an issue
print(f"⚠️ Command returned exit={result.exit_code} with no output")
return {
"stdout": result.stdout or "",
"stderr": result.stderr or "",
"returncode": result.exit_code
}
except Exception as e:
# Check if it's a timeout
error_str = str(e).lower()
if "timeout" in error_str:
return {
"stdout": "",
"stderr": f"Command timed out after {timeout or 120} seconds",
"returncode": 124
}
return {
"stdout": "",
"stderr": f"Command execution failed: {str(e)}",
"returncode": -1
}
def terminal_hecate_tool(
command: str,
background: bool = False,
timeout: Optional[int] = None,
task_id: Optional[str] = None
) -> str:
"""
Execute a command on a MorphCloud/Hecate VM without session persistence.
Args:
command: The command to execute
background: Whether to run in background (default: False)
timeout: Command timeout in seconds (default: 120)
task_id: Unique identifier for VM isolation (optional)
Returns:
str: JSON string with output, exit_code, and error fields
Examples:
# Execute a simple command
>>> result = terminal_hecate_tool(command="ls -la /tmp")
# Run a background task
>>> result = terminal_hecate_tool(command="python server.py", background=True)
# With custom timeout
>>> result = terminal_hecate_tool(command="long_task.sh", timeout=300)
"""
global _active_instances, _last_activity
try:
# Import required modules
try:
from morphcloud.api import MorphCloudClient
except ImportError as import_error:
return json.dumps({
"output": "",
"exit_code": -1,
"error": f"Terminal tool disabled: {import_error}",
"status": "disabled"
}, ensure_ascii=False)
# Get configuration
vm_ttl_seconds = int(os.getenv("HECATE_VM_TTL_SECONDS", "1200"))
snapshot_id = os.getenv("HECATE_DEFAULT_SNAPSHOT_ID", "snapshot_defv9tjg")
# Check API key
morph_api_key = os.getenv("MORPH_API_KEY")
if not morph_api_key:
return json.dumps({
"output": "",
"exit_code": -1,
"error": "MORPH_API_KEY environment variable not set",
"status": "disabled"
}, ensure_ascii=False)
# Use task_id for VM isolation
effective_task_id = task_id or "default"
# Start cleanup thread
_start_cleanup_thread()
# Get or create VM instance
with _instance_lock:
if effective_task_id not in _active_instances:
morph_client = MorphCloudClient(api_key=morph_api_key)
_active_instances[effective_task_id] = morph_client.instances.start(
snapshot_id=snapshot_id,
ttl_seconds=vm_ttl_seconds,
ttl_action="stop"
)
# Update last activity time
_last_activity[effective_task_id] = time.time()
instance = _active_instances[effective_task_id]
# Wait for instance to be ready
instance.wait_until_ready()
# Prepare command for execution
if background:
# Run in background with nohup and redirect output
exec_command = f"nohup {command} > /tmp/bg_output.log 2>&1 &"
result = _execute_command(instance, exec_command, timeout=10)
# For background tasks, return immediately with info
if result["returncode"] == 0:
return json.dumps({
"output": "Background task started successfully",
"exit_code": 0,
"error": None
}, ensure_ascii=False)
else:
# Include stderr in output but don't set error (command failure, not tool failure)
bg_output = result["stdout"]
if result["stderr"]:
bg_output = f"{bg_output}\n{result['stderr']}" if bg_output else result["stderr"]
return json.dumps({
"output": bg_output,
"exit_code": result["returncode"],
"error": None # Only set for actual tool failures
}, ensure_ascii=False)
else:
# Run foreground command with retry logic for transient failures
max_retries = 3
retry_count = 0
result = None
while retry_count <= max_retries:
result = _execute_command(instance, command, timeout=timeout)
# Check if we should retry (only for transient errors, not normal results)
stdout = result.get("stdout", "")
stderr = result.get("stderr", "")
returncode = result.get("returncode", 0)
should_retry = False
retry_reason = ""
# NOTE: Empty output with exit_code=0 is NORMAL for many commands:
# - File writes: cat > file, echo > file
# - Directory ops: mkdir, cd
# - Silent installs: pip install --quiet
# So we do NOT retry on exit_code=0, even with empty output.
# Only retry on special error codes that suggest transient/infra issues
if not stdout and not stderr and returncode in [-1, 124]:
should_retry = True
retry_reason = f"transient error (code {returncode})"
if should_retry and retry_count < max_retries:
retry_count += 1
wait_time = 2 ** retry_count # Exponential backoff: 2s, 4s, 8s
print(f"⚠️ Terminal: {retry_reason}, retrying in {wait_time}s (attempt {retry_count}/{max_retries})")
time.sleep(wait_time)
continue
# Got a result (success or normal command failure) - exit retry loop
break
# Combine stdout and stderr for output
output = result["stdout"]
if result["stderr"] and result["returncode"] != 0:
output = f"{output}\n{result['stderr']}" if output else result["stderr"]
# Truncate output if too long (max 50,000 chars to avoid context explosion)
MAX_OUTPUT_CHARS = 50000
if len(output) > MAX_OUTPUT_CHARS:
truncated_notice = f"\n\n... [OUTPUT TRUNCATED - showing last {MAX_OUTPUT_CHARS} chars of {len(output)} total] ..."
output = truncated_notice + output[-MAX_OUTPUT_CHARS:]
# NOTE: error is only set for FUNCTIONAL tool failures (VM issues, timeouts, etc.)
# Non-zero exit codes from the model's commands are NOT tool failures -
# the model can self-correct. The exit_code field tells the model if the command succeeded.
# Retries that eventually succeed also don't count as failures.
return json.dumps({
"output": output.strip(),
"exit_code": result["returncode"],
"error": None # Only set for actual tool failures, not command failures
}, ensure_ascii=False)
except Exception as e:
return json.dumps({
"output": "",
"exit_code": -1,
"error": f"Failed to execute command: {str(e)}",
"status": "error"
}, ensure_ascii=False)
def check_hecate_requirements() -> bool:
"""Check if all requirements for the Hecate terminal tool are met."""
required_vars = ["MORPH_API_KEY"]
missing_required = [var for var in required_vars if not os.getenv(var)]
if missing_required:
print(f"Missing required environment variables: {', '.join(missing_required)}")
return False
try:
from morphcloud.api import MorphCloudClient
return True
except Exception as e:
print(f"MorphCloud not available: {e}")
return False
if __name__ == "__main__":
"""Simple test when run directly."""
print("Terminal Hecate Tool Module (MorphCloud/E2B)")
print("=" * 40)
if not check_hecate_requirements():
print("Requirements not met. Please check the messages above.")
exit(1)
print("All requirements met!")
print("\nAvailable Tool:")
print(" - terminal_hecate_tool: Execute commands on cloud VMs")
print("\nUsage Examples:")
print(" # Execute a command")
print(" result = terminal_hecate_tool(command='ls -la')")
print(" ")
print(" # Run a background task")
print(" result = terminal_hecate_tool(command='python server.py', background=True)")
print("\nEnvironment Variables:")
print(f" MORPH_API_KEY: {'Set' if os.getenv('MORPH_API_KEY') else 'Not set'}")
print(f" HECATE_VM_TTL_SECONDS: {os.getenv('HECATE_VM_TTL_SECONDS', '1200')} (default: 1200 / 20 minutes)")
print(f" HECATE_VM_LIFETIME_SECONDS: {os.getenv('HECATE_VM_LIFETIME_SECONDS', '300')} (default: 300 / 5 minutes)")
print(f" HECATE_DEFAULT_SNAPSHOT_ID: {os.getenv('HECATE_DEFAULT_SNAPSHOT_ID', 'snapshot_defv9tjg')}")