Compare commits

..

1 Commits

Author SHA1 Message Date
Metatron
7aeb84e3fc fix: disable terminal toolset for cloud providers in cron jobs (closes #379)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m24s
When the cron scheduler resolves a cloud provider (Nous, OpenRouter,
Anthropic), the agent still had the terminal toolset available. This
caused nightwatch-health-monitor and similar jobs to attempt SSH into
remote VPSes (Ezra, Allegro, Bezalel) without local SSH keys.

Fix: use is_local_endpoint() from agent/model_metadata.py to check the
runtime base_url. When it's a cloud endpoint (not localhost/private IP),
append 'terminal' to disabled_toolsets. Local endpoints (Ollama, llama.cpp)
retain terminal access as before.

Also logs when terminal is disabled for observability.
2026-04-13 18:27:02 -04:00
3 changed files with 16 additions and 659 deletions

View File

@@ -1,18 +0,0 @@
{
"agents": {
"ezra": {
"host": "143.198.27.163",
"hermes_path": "/root/wizards/ezra/hermes-agent/venv/bin/hermes",
"username": "root"
},
"timmy": {
"host": "timmy",
"hermes_path": "/root/wizards/timmy/hermes-agent/venv/bin/hermes",
"username": "root"
}
},
"validation_timeout": 30,
"command_timeout": 300,
"max_retries": 2,
"retry_delay": 5
}

View File

@@ -1,551 +0,0 @@
"""
VPS Agent Dispatch Worker for Hermes Cron System
This module provides a dispatch worker that SSHs into remote VPS machines
and runs hermes commands. It ensures that:
1. Remote dispatch only counts as success when the remote hermes command actually launches
2. Stale per-agent hermes binary paths are configurable/validated before queue drain
3. Failed remote launches remain in the queue (or are marked failed) instead of being reported as OK
"""
import json
import logging
import os
import subprocess
import sys
import time
from pathlib import Path
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
from enum import Enum
logger = logging.getLogger(__name__)
class DispatchStatus(Enum):
"""Status of a dispatch operation."""
PENDING = "pending"
VALIDATING = "validating"
DISPATCHING = "dispatching"
SUCCESS = "success"
FAILED = "failed"
RETRYING = "retrying"
@dataclass
class DispatchResult:
"""Result of a dispatch operation."""
status: DispatchStatus
message: str
exit_code: Optional[int] = None
stdout: Optional[str] = None
stderr: Optional[str] = None
execution_time: Optional[float] = None
hermes_path: Optional[str] = None
validated: bool = False
class HermesPathValidator:
"""Validates hermes binary paths on remote VPS machines."""
def __init__(self, ssh_key_path: Optional[str] = None):
self.ssh_key_path = ssh_key_path or os.path.expanduser("~/.ssh/id_rsa")
self.timeout = 30 # SSH timeout in seconds
def validate_hermes_path(self, host: str, hermes_path: str,
username: str = "root") -> DispatchResult:
"""
Validate that the hermes binary exists and is executable on the remote host.
Args:
host: Remote host IP or hostname
hermes_path: Path to hermes binary on remote host
username: SSH username
Returns:
DispatchResult with validation status
"""
start_time = time.time()
# Build SSH command to check hermes binary
ssh_cmd = [
"ssh",
"-i", self.ssh_key_path,
"-o", "StrictHostKeyChecking=no",
"-o", "ConnectTimeout=10",
"-o", "BatchMode=yes",
f"{username}@{host}",
f"test -x {hermes_path} && echo 'VALID' || echo 'INVALID'"
]
try:
result = subprocess.run(
ssh_cmd,
capture_output=True,
text=True,
timeout=self.timeout
)
execution_time = time.time() - start_time
if result.returncode == 0 and "VALID" in result.stdout:
return DispatchResult(
status=DispatchStatus.SUCCESS,
message=f"Hermes binary validated at {hermes_path}",
exit_code=0,
execution_time=execution_time,
hermes_path=hermes_path,
validated=True
)
else:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Hermes binary not found or not executable: {hermes_path}",
exit_code=result.returncode,
stdout=result.stdout,
stderr=result.stderr,
execution_time=execution_time,
hermes_path=hermes_path,
validated=False
)
except subprocess.TimeoutExpired:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"SSH timeout validating hermes path on {host}",
execution_time=time.time() - start_time,
hermes_path=hermes_path,
validated=False
)
except Exception as e:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Error validating hermes path: {str(e)}",
execution_time=time.time() - start_time,
hermes_path=hermes_path,
validated=False
)
class VPSAgentDispatcher:
"""Dispatches hermes commands to remote VPS agents."""
def __init__(self, config_path: Optional[str] = None):
self.config_path = config_path or os.path.expanduser("~/.hermes/dispatch_config.json")
self.validator = HermesPathValidator()
self.config = self._load_config()
def _load_config(self) -> Dict[str, Any]:
"""Load dispatch configuration."""
try:
if os.path.exists(self.config_path):
with open(self.config_path, 'r') as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load dispatch config: {e}")
# Default configuration
return {
"agents": {
"ezra": {
"host": "143.198.27.163",
"hermes_path": "/root/wizards/ezra/hermes-agent/venv/bin/hermes",
"username": "root"
},
"timmy": {
"host": "timmy",
"hermes_path": "/root/wizards/timmy/hermes-agent/venv/bin/hermes",
"username": "root"
}
},
"validation_timeout": 30,
"command_timeout": 300,
"max_retries": 2,
"retry_delay": 5
}
def save_config(self):
"""Save dispatch configuration."""
try:
config_dir = Path(self.config_path).parent
config_dir.mkdir(parents=True, exist_ok=True)
with open(self.config_path, 'w') as f:
json.dump(self.config, f, indent=2)
# Set secure permissions
os.chmod(self.config_path, 0o600)
except Exception as e:
logger.error(f"Failed to save dispatch config: {e}")
def get_agent_config(self, agent_name: str) -> Optional[Dict[str, Any]]:
"""Get configuration for a specific agent."""
return self.config.get("agents", {}).get(agent_name)
def update_agent_config(self, agent_name: str, host: str, hermes_path: str,
username: str = "root"):
"""Update configuration for a specific agent."""
if "agents" not in self.config:
self.config["agents"] = {}
self.config["agents"][agent_name] = {
"host": host,
"hermes_path": hermes_path,
"username": username
}
self.save_config()
def validate_agent(self, agent_name: str) -> DispatchResult:
"""Validate that an agent's hermes binary is accessible."""
agent_config = self.get_agent_config(agent_name)
if not agent_config:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Agent configuration not found: {agent_name}"
)
return self.validator.validate_hermes_path(
host=agent_config["host"],
hermes_path=agent_config["hermes_path"],
username=agent_config.get("username", "root")
)
def dispatch_command(self, agent_name: str, command: str,
validate_first: bool = True) -> DispatchResult:
"""
Dispatch a command to a remote VPS agent.
Args:
agent_name: Name of the agent to dispatch to
command: Command to execute
validate_first: Whether to validate hermes path before dispatching
Returns:
DispatchResult with execution status
"""
agent_config = self.get_agent_config(agent_name)
if not agent_config:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Agent configuration not found: {agent_name}"
)
# Validate hermes path if requested
if validate_first:
validation_result = self.validate_agent(agent_name)
if validation_result.status != DispatchStatus.SUCCESS:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Validation failed: {validation_result.message}",
hermes_path=agent_config["hermes_path"],
validated=False
)
# Build SSH command to execute hermes command
ssh_cmd = [
"ssh",
"-i", self.validator.ssh_key_path,
"-o", "StrictHostKeyChecking=no",
"-o", "ConnectTimeout=10",
f"{agent_config.get('username', 'root')}@{agent_config['host']}",
f"cd /root/wizards/{agent_name}/hermes-agent && source venv/bin/activate && {command}"
]
start_time = time.time()
try:
result = subprocess.run(
ssh_cmd,
capture_output=True,
text=True,
timeout=self.config.get("command_timeout", 300)
)
execution_time = time.time() - start_time
if result.returncode == 0:
return DispatchResult(
status=DispatchStatus.SUCCESS,
message=f"Command executed successfully on {agent_name}",
exit_code=0,
stdout=result.stdout,
stderr=result.stderr,
execution_time=execution_time,
hermes_path=agent_config["hermes_path"],
validated=validate_first
)
else:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Command failed on {agent_name}: {result.stderr}",
exit_code=result.returncode,
stdout=result.stdout,
stderr=result.stderr,
execution_time=execution_time,
hermes_path=agent_config["hermes_path"],
validated=validate_first
)
except subprocess.TimeoutExpired:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Command timeout on {agent_name}",
execution_time=time.time() - start_time,
hermes_path=agent_config["hermes_path"],
validated=validate_first
)
except Exception as e:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Error executing command on {agent_name}: {str(e)}",
execution_time=time.time() - start_time,
hermes_path=agent_config["hermes_path"],
validated=validate_first
)
def dispatch_hermes_command(self, agent_name: str, hermes_command: str,
validate_first: bool = True) -> DispatchResult:
"""
Dispatch a hermes command to a remote VPS agent.
Args:
agent_name: Name of the agent to dispatch to
hermes_command: Hermes command to execute (e.g., "hermes cron list")
validate_first: Whether to validate hermes path before dispatching
Returns:
DispatchResult with execution status
"""
agent_config = self.get_agent_config(agent_name)
if not agent_config:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Agent configuration not found: {agent_name}"
)
# Build full hermes command
full_command = f"{agent_config['hermes_path']} {hermes_command}"
return self.dispatch_command(agent_name, full_command, validate_first)
class DispatchQueue:
"""Queue for managing dispatch operations."""
def __init__(self, queue_file: Optional[str] = None):
self.queue_file = queue_file or os.path.expanduser("~/.hermes/dispatch_queue.json")
self.queue: List[Dict[str, Any]] = self._load_queue()
def _load_queue(self) -> List[Dict[str, Any]]:
"""Load queue from file."""
try:
if os.path.exists(self.queue_file):
with open(self.queue_file, 'r') as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load dispatch queue: {e}")
return []
def save_queue(self):
"""Save queue to file."""
try:
queue_dir = Path(self.queue_file).parent
queue_dir.mkdir(parents=True, exist_ok=True)
with open(self.queue_file, 'w') as f:
json.dump(self.queue, f, indent=2)
# Set secure permissions
os.chmod(self.queue_file, 0o600)
except Exception as e:
logger.error(f"Failed to save dispatch queue: {e}")
def add_item(self, agent_name: str, command: str, priority: int = 0,
max_retries: int = 3) -> str:
"""
Add an item to the dispatch queue.
Returns:
Queue item ID
"""
item_id = f"dispatch_{int(time.time())}_{len(self.queue)}"
item = {
"id": item_id,
"agent_name": agent_name,
"command": command,
"priority": priority,
"max_retries": max_retries,
"retry_count": 0,
"status": DispatchStatus.PENDING.value,
"created_at": time.time(),
"last_attempt": None,
"result": None
}
self.queue.append(item)
self.save_queue()
return item_id
def get_next_item(self) -> Optional[Dict[str, Any]]:
"""Get the next item from the queue (highest priority, oldest first)."""
if not self.queue:
return None
# Sort by priority (descending) and created_at (ascending)
sorted_queue = sorted(
self.queue,
key=lambda x: (-x.get("priority", 0), x.get("created_at", 0))
)
# Find first pending item
for item in sorted_queue:
if item.get("status") == DispatchStatus.PENDING.value:
return item
return None
def update_item(self, item_id: str, status: DispatchStatus,
result: Optional[DispatchResult] = None):
"""Update a queue item."""
for item in self.queue:
if item.get("id") == item_id:
item["status"] = status.value
item["last_attempt"] = time.time()
if result:
item["result"] = {
"status": result.status.value,
"message": result.message,
"exit_code": result.exit_code,
"stdout": result.stdout,
"stderr": result.stderr,
"execution_time": result.execution_time,
"hermes_path": result.hermes_path,
"validated": result.validated
}
# Update retry count if failed
if status == DispatchStatus.FAILED:
item["retry_count"] = item.get("retry_count", 0) + 1
self.save_queue()
break
def remove_item(self, item_id: str):
"""Remove an item from the queue."""
self.queue = [item for item in self.queue if item.get("id") != item_id]
self.save_queue()
def get_failed_items(self) -> List[Dict[str, Any]]:
"""Get all failed items that can be retried."""
return [
item for item in self.queue
if item.get("status") == DispatchStatus.FAILED.value
and item.get("retry_count", 0) < item.get("max_retries", 3)
]
def get_stats(self) -> Dict[str, Any]:
"""Get queue statistics."""
total = len(self.queue)
pending = sum(1 for item in self.queue if item.get("status") == DispatchStatus.PENDING.value)
success = sum(1 for item in self.queue if item.get("status") == DispatchStatus.SUCCESS.value)
failed = sum(1 for item in self.queue if item.get("status") == DispatchStatus.FAILED.value)
return {
"total": total,
"pending": pending,
"success": success,
"failed": failed,
"retryable": len(self.get_failed_items())
}
def process_dispatch_queue(dispatcher: VPSAgentDispatcher,
queue: DispatchQueue,
batch_size: int = 5) -> Dict[str, Any]:
"""
Process items from the dispatch queue.
Args:
dispatcher: VPS agent dispatcher
queue: Dispatch queue
batch_size: Number of items to process in this batch
Returns:
Processing statistics
"""
processed = 0
success = 0
failed = 0
for _ in range(batch_size):
item = queue.get_next_item()
if not item:
break
item_id = item["id"]
agent_name = item["agent_name"]
command = item["command"]
# Update status to dispatching
queue.update_item(item_id, DispatchStatus.DISPATCHING)
# Dispatch the command
result = dispatcher.dispatch_hermes_command(
agent_name=agent_name,
hermes_command=command,
validate_first=True
)
# Update queue with result
if result.status == DispatchStatus.SUCCESS:
queue.update_item(item_id, DispatchStatus.SUCCESS, result)
success += 1
else:
# Check if we should retry
item_data = next((i for i in queue.queue if i.get("id") == item_id), None)
if item_data and item_data.get("retry_count", 0) < item_data.get("max_retries", 3):
queue.update_item(item_id, DispatchStatus.FAILED, result)
failed += 1
else:
# Max retries reached, remove from queue
queue.remove_item(item_id)
failed += 1
processed += 1
return {
"processed": processed,
"success": success,
"failed": failed,
"queue_stats": queue.get_stats()
}
# Example usage and testing
if __name__ == "__main__":
# Set up logging
logging.basicConfig(level=logging.INFO)
# Create dispatcher and queue
dispatcher = VPSAgentDispatcher()
queue = DispatchQueue()
# Example: Add items to queue
queue.add_item("ezra", "cron list")
queue.add_item("timmy", "cron status")
# Process queue
stats = process_dispatch_queue(dispatcher, queue)
print(f"Processing stats: {stats}")
# Show queue stats
queue_stats = queue.get_stats()
print(f"Queue stats: {queue_stats}")

View File

@@ -37,6 +37,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent))
from hermes_constants import get_hermes_home
from hermes_cli.config import load_config
from hermes_time import now as _hermes_now
from agent.model_metadata import is_local_endpoint
logger = logging.getLogger(__name__)
@@ -653,12 +654,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
# AIAgent.__init__ is missing params the scheduler expects.
_validate_agent_interface()
# Check if this is a dispatch job
if job.get("type") == "dispatch" or "dispatch" in job.get("name", "").lower():
return _run_dispatch_job(job)
from run_agent import AIAgent
# Initialize SQLite session store so cron job messages are persisted
@@ -783,6 +778,20 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
},
)
# Build disabled toolsets — always exclude cronjob/messaging/clarify
# for cron sessions. When the runtime endpoint is cloud (not local),
# also disable terminal so the agent does not attempt SSH or shell
# commands that require local infrastructure (keys, filesystem). #379
_cron_disabled = ["cronjob", "messaging", "clarify"]
_runtime_base_url = turn_route["runtime"].get("base_url", "")
if not is_local_endpoint(_runtime_base_url):
_cron_disabled.append("terminal")
logger.info(
"Job '%s': cloud provider detected (%s), disabling terminal toolset",
job_name,
turn_route["runtime"].get("provider", "unknown"),
)
_agent_kwargs = _safe_agent_kwargs({
"model": turn_route["model"],
"api_key": turn_route["runtime"].get("api_key"),
@@ -798,7 +807,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
"providers_ignored": pr.get("ignore"),
"providers_order": pr.get("order"),
"provider_sort": pr.get("sort"),
"disabled_toolsets": ["cronjob", "messaging", "clarify"],
"disabled_toolsets": _cron_disabled,
"tool_choice": "required",
"quiet_mode": True,
"skip_memory": True, # Cron system prompts would corrupt user representations
@@ -1013,89 +1022,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
logger.debug("Job '%s': failed to close SQLite session store: %s", job_id, e)
def _run_dispatch_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
"""
Execute a dispatch job that SSHs into remote VPS machines.
Returns:
Tuple of (success, full_output_doc, final_response, error_message)
"""
from cron.dispatch_worker import VPSAgentDispatcher, DispatchQueue, process_dispatch_queue
job_id = job["id"]
job_name = job["name"]
logger.info("Running dispatch job '%s' (ID: %s)", job_name, job_id)
try:
# Load dispatch configuration
dispatcher = VPSAgentDispatcher()
queue = DispatchQueue()
# Get dispatch parameters from job
agent_name = job.get("agent_name", "ezra")
command = job.get("command", "cron list")
batch_size = job.get("batch_size", 5)
# Add command to queue if specified
if command:
queue.add_item(agent_name, command)
# Process the dispatch queue
stats = process_dispatch_queue(dispatcher, queue, batch_size)
# Generate output
output = f"""# Dispatch Job: {job_name}
**Job ID:** {job_id}
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
**Agent:** {agent_name}
**Command:** {command}
## Dispatch Results
- **Processed:** {stats['processed']}
- **Success:** {stats['success']}
- **Failed:** {stats['failed']}
## Queue Statistics
- **Total items:** {stats['queue_stats']['total']}
- **Pending:** {stats['queue_stats']['pending']}
- **Success:** {stats['queue_stats']['success']}
- **Failed:** {stats['queue_stats']['failed']}
- **Retryable:** {stats['queue_stats']['retryable']}
## Status
{"✅ All dispatches successful" if stats['failed'] == 0 else f"⚠️ {stats['failed']} dispatches failed"}
"""
success = stats['failed'] == 0
error_message = None if success else f"{stats['failed']} dispatches failed"
return (success, output, output, error_message)
except Exception as e:
error_msg = f"Dispatch job failed: {str(e)}"
logger.error(error_msg, exc_info=True)
output = f"""# Dispatch Job: {job_name}
**Job ID:** {job_id}
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
**Status:** ❌ Failed
## Error
{error_msg}
"""
return (False, output, output, error_msg)
def tick(verbose: bool = True, adapters=None, loop=None) -> int:
"""
Check and run all due jobs.